24.09., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

ClusteringOperator.cpp 9.07 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
//================================================================================
// Name        : ClusteringOperator.cpp
// Author      : Alessio Netti
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description :
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include "ClusteringOperator.h"

ClusteringOperator::ClusteringOperator(const std::string& name) : OperatorTemplate(name) {
    _modelIn = "";
    _modelOut = "";
    _aggregationWindow = 0;
34 35
    _lookbackWindow = 0;
    _numWindows = 0;
36 37 38 39 40
    _numComponents = 3;
    _outlierCut = 2.0f;
    _reuseModel = false;
    _trainingPending = true;
    _trainingSet = cv::Mat();
41
    _tempSet = cv::Mat();
42 43 44 45 46 47 48
    _currentfVector = cv::Mat();
}

ClusteringOperator::ClusteringOperator(const ClusteringOperator& other) : OperatorTemplate(other) {
    _modelIn = other._modelIn;
    _modelOut = "";
    _aggregationWindow = other._aggregationWindow;
49 50
    _lookbackWindow = other._lookbackWindow;
    _numWindows = other._numWindows;
51 52 53 54 55
    _numComponents = other._numComponents;
    _outlierCut = other._outlierCut;
    _reuseModel = other._reuseModel;
    _trainingPending = true;
    _trainingSet = cv::Mat();
56
    _tempSet = cv::Mat();
57 58 59 60 61 62 63 64 65 66 67 68
    _currentfVector = cv::Mat();
}

ClusteringOperator::~ClusteringOperator() {
    _gmm.release();
}

restResponse_t ClusteringOperator::REST(const string& action, const unordered_map<string, string>& queries) {
    restResponse_t resp;
    if(action=="train") {
        resp.response = "Re-training triggered for gaussian mixture model " + this->_name + "!\n";
        this->_trainingPending = true;
69 70
    } else if(action=="means") {
        resp.response = printMeans();
71 72
    } else if(action=="covs") {
        resp.response = printCovs();
73 74 75 76 77 78
    } else
        throw invalid_argument("Unknown plugin action " + action + " requested!");
    return resp;
}

void ClusteringOperator::execOnInit() {
79 80 81 82 83
    if(_interval==0 || _lookbackWindow==0 || _lookbackWindow <= _aggregationWindow)
        _numWindows = 0;
    else
        _numWindows = (_lookbackWindow - _aggregationWindow) / ((uint64_t)_interval * 1000000);
    
84 85 86 87
    bool useDefault=true;
    if(_modelIn!="") {
        try {
            _gmm = cv::ml::EM::load(_modelIn);
88 89
            if(!_gmm->isTrained() || _units.empty() || _units[0]->getSubUnits().empty() || 
            _units[0]->getSubUnits()[0]->getInputs().size()!=(uint64_t)_gmm->getMeans().size().width) 
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
                LOG(error) << "Operator " + _name + ": incompatible model, falling back to default!";
            else {
                _trainingPending = false;
                useDefault = false;
            }
        } catch(const std::exception& e) {
            LOG(error) << "Operator " + _name + ": cannot load model from file, falling back to default!"; }
    }
    if(useDefault) {
        _gmm = cv::ml::EM::create();
        _gmm->setClustersNumber(_numComponents);
    }
}

void ClusteringOperator::printConfig(LOG_LEVEL ll) {
    LOG_VAR(ll) << "            Window:          " << _aggregationWindow;
106
    LOG_VAR(ll) << "            Lookback window: " << _lookbackWindow;
107 108 109 110 111 112 113 114 115
    LOG_VAR(ll) << "            Input Path:      " << (_modelIn!="" ? _modelIn : std::string("none"));
    LOG_VAR(ll) << "            Output Path:     " << (_modelOut!="" ? _modelOut : std::string("none"));
    LOG_VAR(ll) << "            Clusters:        " << _numComponents;
    LOG_VAR(ll) << "            Outlier Cut:     " << _outlierCut;
    LOG_VAR(ll) << "            Reuse Model:     " << (_reuseModel ? "enabled" : "disabled");
    OperatorTemplate<ClusteringSensorBase>::printConfig(ll);
}

void ClusteringOperator::compute(U_Ptr unit) {
116 117 118
    if(_numWindows==0)
        _trainingSet = cv::Mat();
    _tempSet = cv::Mat();
119 120
    for(const auto& su : unit->getSubUnits()) {
        computeFeatureVector(su);
121
        _tempSet.push_back(_currentfVector);
122
    }
123 124 125 126
    if(_trainingSet.empty())
        _trainingSet = _tempSet;
    else
        cv::vconcat(_tempSet, _trainingSet, _trainingSet);
127
    
128 129 130 131 132 133 134 135 136 137 138 139 140
    // Performing training if the conditions are met
    if ((_trainingPending || !_reuseModel) && _trainingSet.size().height/unit->getSubUnits().size() > _numWindows) {
        if(_gmm.empty())
            throw std::runtime_error("Operator " + _name + ": cannot perform training, missing model!");
        if(!_gmm->trainEM(_trainingSet))
            throw std::runtime_error("Operator " + _name + ": model training failed!");
        _trainingPending = false;
        LOG(debug) << "Operator " + _name + ": model training performed using " << _trainingSet.size().height << " points.";
        if(_modelOut!="") {
            try {
                _gmm->save(_modelOut);
            } catch(const std::exception& e) {
                LOG(error) << "Operator " + _name + ": cannot save the model to a file!"; }
141
        }
142 143 144 145 146
    }
    
    // Checking that the operator is not in any invalid state
    if(_gmm.empty() || !(_gmm->isTrained() || (_trainingPending && _streaming && _numWindows>0)))
        throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");
147

148 149 150
    // Performing prediction
    if(_gmm->isTrained()) {
        std::vector <std::shared_ptr<UnitTemplate<ClusteringSensorBase>>> subUnits = unit->getSubUnits();
151
        cv::Vec2d res;
152 153
        int64_t label;
        bool outlier;
154
        std::vector <cv::Mat> covs;
155
        _gmm->getCovs(covs);
156

157 158
        reading_t predict;
        predict.timestamp = getTimestamp();
159 160

        for (unsigned int idx = 0; idx < subUnits.size(); idx++) {
161
            res = _gmm->predict2(_trainingSet.row(idx), cv::noArray());
162
            label = (int64_t) res[1];
163 164
            outlier = isOutlier(_trainingSet.row(idx), _gmm->getMeans().row(label), covs[label]);
            predict.value = outlier ? OUTLIER_ID : label;
165 166 167
            subUnits[idx]->getOutputs()[0]->storeReading(predict);
        }
    }
168 169 170 171 172 173 174
    
    if(_numWindows==0)
        _trainingSet = cv::Mat();
    // Removing the oldest time window if lookback is enabled
    else if(_trainingSet.size().height/unit->getSubUnits().size() > _numWindows)
        _trainingSet = _trainingSet.rowRange(0, _numWindows * unit->getSubUnits().size());
    _tempSet = cv::Mat();
175 176
}

177
void ClusteringOperator::computeFeatureVector(U_Ptr unit, uint64_t offset) {
178 179
    _currentfVector = cv::Mat(1, unit->getInputs().size(), CV_32F);
    std::vector<ClusteringSBPtr>& inputs = unit->getInputs();
180 181
    uint64_t endTs = getTimestamp() - offset;
    uint64_t startTs = endTs - _aggregationWindow;
182 183 184
    for(size_t idx=0; idx<inputs.size(); idx++) {
        _mean=0;
        _buffer.clear();
185
        if(!_queryEngine.querySensor(inputs[idx]->getName(), startTs, endTs, _buffer, false) || _buffer.empty())
186 187 188 189 190 191 192 193 194 195 196
            throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
        
        // Computing MEAN
        for(const auto& v : _buffer)
            _mean += v.value;
        _mean /= _buffer.size();
        
        // Casting and storing the statistical features
        _currentfVector.at<float>(idx) = (float)_mean;
    }
}
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214

bool ClusteringOperator::isOutlier(cv::Mat vec1, cv::Mat vec2, cv::Mat cov) {
    double dist = 0.0;
    try {
        cv::Mat iCov;
        cv::invert(cov, iCov, cv::DECOMP_SVD);
        dist = cv::Mahalanobis(vec1, vec2, iCov);
    } catch(const std::exception& e) {
        return false;
    }
    return dist > _outlierCut;
}

std::string ClusteringOperator::printMeans() {
    std::ostringstream out;
    if(_gmm.empty() || !_gmm->isTrained())
        out << "Model is uninitialized or not trained.\n";
    else {
Alessio Netti's avatar
Alessio Netti committed
215
        for(size_t idx=0; idx<(size_t)_gmm->getMeans().size().height; idx++)
216 217 218 219 220 221 222 223 224 225 226 227 228 229
            out << "Component " << idx << ":\n" << _gmm->getMeans().row(idx) << "\n";
    }
    return out.str();
}

std::string ClusteringOperator::printCovs() {
    std::ostringstream out;
    if(_gmm.empty() || !_gmm->isTrained())
        out << "Model is uninitialized or not trained.\n";
    else {
        std::vector<cv::Mat> covs;
        _gmm->getCovs(covs);
        for(size_t idx=0; idx<(size_t)covs.size(); idx++)
            out << "Component " << idx << ":\n" << covs[idx] << "\n";
230 231 232
    }
    return out.str();
}