Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

ClusteringOperator.cpp 7.29 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//================================================================================
// Name        : ClusteringOperator.cpp
// Author      : Alessio Netti
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description :
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include "ClusteringOperator.h"

ClusteringOperator::ClusteringOperator(const std::string& name) : OperatorTemplate(name) {
    _modelIn = "";
    _modelOut = "";
    _aggregationWindow = 0;
    _numComponents = 3;
    _outlierCut = 2.0f;
    _reuseModel = false;
    _trainingPending = true;
    _trainingSet = cv::Mat();
    _currentfVector = cv::Mat();
}

ClusteringOperator::ClusteringOperator(const ClusteringOperator& other) : OperatorTemplate(other) {
    _modelIn = other._modelIn;
    _modelOut = "";
    _aggregationWindow = other._aggregationWindow;
    _numComponents = other._numComponents;
    _outlierCut = other._outlierCut;
    _reuseModel = other._reuseModel;
    _trainingPending = true;
    _trainingSet = cv::Mat();
    _currentfVector = cv::Mat();
}

ClusteringOperator::~ClusteringOperator() {
    _gmm.release();
}

restResponse_t ClusteringOperator::REST(const string& action, const unordered_map<string, string>& queries) {
    restResponse_t resp;
    if(action=="train") {
        resp.response = "Re-training triggered for gaussian mixture model " + this->_name + "!\n";
        this->_trainingPending = true;
63
64
    } else if(action=="means") {
        resp.response = printMeans();
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
    } else
        throw invalid_argument("Unknown plugin action " + action + " requested!");
    return resp;
}

void ClusteringOperator::execOnInit() {
    bool useDefault=true;
    if(_modelIn!="") {
        try {
            _gmm = cv::ml::EM::load(_modelIn);
            if(!_gmm->isTrained() || _units.empty() || _units[0]->getSubUnits().empty() || _units[0]->getSubUnits()[0]->getInputs().size()!=(uint64_t)_gmm->getMeans().size().width) 
                LOG(error) << "Operator " + _name + ": incompatible model, falling back to default!";
            else {
                _trainingPending = false;
                useDefault = false;
            }
        } catch(const std::exception& e) {
            LOG(error) << "Operator " + _name + ": cannot load model from file, falling back to default!"; }
    }
    if(useDefault) {
        _gmm = cv::ml::EM::create();
        _gmm->setClustersNumber(_numComponents);
    }
}

void ClusteringOperator::printConfig(LOG_LEVEL ll) {
    LOG_VAR(ll) << "            Window:          " << _aggregationWindow;
    LOG_VAR(ll) << "            Input Path:      " << (_modelIn!="" ? _modelIn : std::string("none"));
    LOG_VAR(ll) << "            Output Path:     " << (_modelOut!="" ? _modelOut : std::string("none"));
    LOG_VAR(ll) << "            Clusters:        " << _numComponents;
    LOG_VAR(ll) << "            Outlier Cut:     " << _outlierCut;
    LOG_VAR(ll) << "            Reuse Model:     " << (_reuseModel ? "enabled" : "disabled");
    OperatorTemplate<ClusteringSensorBase>::printConfig(ll);
}

void ClusteringOperator::compute(U_Ptr unit) {
    _trainingSet = cv::Mat();
    
    for(const auto& su : unit->getSubUnits()) {
        computeFeatureVector(su);
        _trainingSet.push_back(_currentfVector);
    }
    
    if(!_trainingSet.empty()) {
        if (_trainingPending || !_reuseModel) {
            if(_gmm.empty())
                throw std::runtime_error("Operator " + _name + ": cannot perform training, missing model!");
            if(!_gmm->trainEM(_trainingSet))
                throw std::runtime_error("Operator " + _name + ": model training failed!");
            _trainingPending = false;
            LOG(debug) << "Operator " + _name + ": model training performed.";
            if(_modelOut!="") {
                try {
                    _gmm->save(_modelOut);
                } catch(const std::exception& e) {
                    LOG(error) << "Operator " + _name + ": cannot save the model to a file!"; }
            }
        }
        
        if(_gmm.empty() || !_gmm->isTrained())
            throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");

        std::vector<std::shared_ptr<UnitTemplate<ClusteringSensorBase>>> subUnits = unit->getSubUnits();
        cv::Vec2d res;
129
130
131
132
133
        int64_t label;
        bool outlier;
        std::vector<cv::Mat> covs;
        _gmm->getCovs(covs);
        
134
135
136
137
        reading_t predict;
        predict.timestamp = getTimestamp();
        
        for(unsigned int idx=0; idx<subUnits.size(); idx++) {
138
139
140
141
            res = _gmm->predict2(_trainingSet.row(idx), cv::noArray());
            label = (int64_t)res[1];
            outlier = isOutlier(_trainingSet.row(idx), _gmm->getMeans().row(label), covs[label]);
            predict.value = outlier ? OUTLIER_ID : label;
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
            subUnits[idx]->getOutputs()[0]->storeReading(predict);
        }
    }
}

void ClusteringOperator::computeFeatureVector(U_Ptr unit) {
    _currentfVector = cv::Mat(1, unit->getInputs().size(), CV_32F);
    std::vector<ClusteringSBPtr>& inputs = unit->getInputs();
    for(size_t idx=0; idx<inputs.size(); idx++) {
        _mean=0;
        _buffer.clear();
        if(!_queryEngine.querySensor(inputs[idx]->getName(), _aggregationWindow, 0, _buffer) || _buffer.empty())
            throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
        
        // Computing MEAN
        for(const auto& v : _buffer)
            _mean += v.value;
        _mean /= _buffer.size();
        
        // Casting and storing the statistical features
        _currentfVector.at<float>(idx) = (float)_mean;
    }
}
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182

bool ClusteringOperator::isOutlier(cv::Mat vec1, cv::Mat vec2, cv::Mat cov) {
    double dist = 0.0;
    try {
        cv::Mat iCov;
        cv::invert(cov, iCov, cv::DECOMP_SVD);
        dist = cv::Mahalanobis(vec1, vec2, iCov);
    } catch(const std::exception& e) {
        return false;
    }
    return dist > _outlierCut;
}

std::string ClusteringOperator::printMeans() {
    std::ostringstream out;
    if(_gmm.empty() || !_gmm->isTrained())
        out << "Model is uninitialized or not trained.\n";
    else {
Alessio Netti's avatar
Alessio Netti committed
183
        for(size_t idx=0; idx<(size_t)_gmm->getMeans().size().height; idx++)
184
185
186
187
            out << "Component " << idx << " :" << _gmm->getMeans().row(idx) << "\n";
    }
    return out.str();
}