RegressorOperator.cpp 13 KB
Newer Older
Alessio Netti's avatar
Alessio Netti committed
1
//================================================================================
2
// Name        : RegressorOperator.cpp
Alessio Netti's avatar
Alessio Netti committed
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
Alessio Netti's avatar
Alessio Netti committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Copyright   : Leibniz Supercomputing Centre
// Description :
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

28
#include "RegressorOperator.h"
Alessio Netti's avatar
Alessio Netti committed
29

30
RegressorOperator::RegressorOperator(const std::string& name) : OperatorTemplate(name) {
Alessio Netti's avatar
Alessio Netti committed
31
32
33
34
35
36
    _modelIn = "";
    _modelOut = "";
    _aggregationWindow = 0;
    _targetDistance = 1;
    _trainingSamples = 256;
    _trainingPending = true;
37
    _importances = false;
Alessio Netti's avatar
Alessio Netti committed
38
    _includeTarget = true;
Alessio Netti's avatar
Alessio Netti committed
39
40
41
42
43
    _trainingSet = nullptr;
    _responseSet = nullptr;
    _currentfVector = nullptr;
}

44
RegressorOperator::RegressorOperator(const RegressorOperator& other) : OperatorTemplate(other) {
Alessio Netti's avatar
Alessio Netti committed
45
    _modelIn = other._modelIn;
46
    _modelOut = "";
Alessio Netti's avatar
Alessio Netti committed
47
48
49
    _aggregationWindow = other._aggregationWindow;
    _targetDistance = other._targetDistance;
    _trainingSamples = other._trainingSamples;
50
    _importances = other._importances;
Alessio Netti's avatar
Alessio Netti committed
51
    _includeTarget = true;
52
53
54
55
    _trainingPending = true;
    _trainingSet = nullptr;
    _responseSet = nullptr;
    _currentfVector = nullptr;
Alessio Netti's avatar
Alessio Netti committed
56
57
}

58
RegressorOperator::~RegressorOperator() {
Alessio Netti's avatar
Alessio Netti committed
59
60
61
62
63
64
65
66
67
68
69
70
    if(_trainingSet)
        delete _trainingSet;
    if(_responseSet)
        delete _responseSet;
    if(_currentfVector)
        delete _currentfVector;
    _rForest.release();
    _currentfVector = nullptr;
    _trainingSet = nullptr;
    _responseSet = nullptr;
}

71
restResponse_t RegressorOperator::REST(const string& action, const unordered_map<string, string>& queries) {
Alessio Netti's avatar
Alessio Netti committed
72
73
    restResponse_t resp;
    if(action=="train") {
74
        resp.response = "Re-training triggered for model " + this->_name + "!\n";
Alessio Netti's avatar
Alessio Netti committed
75
        this->_trainingPending = true;
76
77
    } else if(action=="importances") {
        resp.response = getImportances();
Alessio Netti's avatar
Alessio Netti committed
78
79
80
81
82
    } else
        throw invalid_argument("Unknown plugin action " + action + " requested!");
    return resp;
}

83
void RegressorOperator::execOnInit() {
Alessio Netti's avatar
Alessio Netti committed
84
85
86
87
    bool useDefault=true;
    if(_modelIn!="") {
        try {
            _rForest = cv::ml::RTrees::load(_modelIn);
88
            if(!_rForest->isTrained() || _units.empty() || _units[0]->getInputs().size()*REG_NUMFEATURES!=(uint64_t)_rForest->getVarCount()) 
89
                LOG(error) << "Operator " + _name + ": incompatible model, falling back to default!";
Alessio Netti's avatar
Alessio Netti committed
90
91
92
93
94
            else {
                _trainingPending = false;
                useDefault = false;
            }
        } catch(const std::exception& e) {
95
            LOG(error) << "Operator " + _name + ": cannot load model from file, falling back to default!"; }
Alessio Netti's avatar
Alessio Netti committed
96
    }
97
    if(useDefault) {
Alessio Netti's avatar
Alessio Netti committed
98
        _rForest = cv::ml::RTrees::create();
99
100
        _rForest->setCalculateVarImportance(_importances);
    }
Alessio Netti's avatar
Alessio Netti committed
101
102
}

103
void RegressorOperator::printConfig(LOG_LEVEL ll) {
Alessio Netti's avatar
Alessio Netti committed
104
105
106
107
108
    LOG_VAR(ll) << "            Window:          " << _aggregationWindow;
    LOG_VAR(ll) << "            Target Distance: " << _targetDistance;
    LOG_VAR(ll) << "            Training Sample: " << _trainingSamples;
    LOG_VAR(ll) << "            Input Path:      " << (_modelIn!="" ? _modelIn : std::string("none"));
    LOG_VAR(ll) << "            Output Path:     " << (_modelOut!="" ? _modelOut : std::string("none"));
109
    LOG_VAR(ll) << "            Importances:     " << (_importances ? "enabled" : "disabled");
110
    OperatorTemplate<RegressorSensorBase>::printConfig(ll);
Alessio Netti's avatar
Alessio Netti committed
111
112
}

113
void RegressorOperator::compute(U_Ptr unit) {
Alessio Netti's avatar
Alessio Netti committed
114
115
116
117
118
119
120
121
    computeFeatureVector(unit);
    if (_trainingPending && _streaming) {
        if (!_trainingSet)
            _trainingSet = new cv::Mat();
        if (!_responseSet)
            _responseSet = new cv::Mat();
        _trainingSet->push_back(*_currentfVector);
        _responseSet->push_back(_currentTarget);
122
        if ((uint64_t)_trainingSet->size().height >= _trainingSamples + _targetDistance)
123
            trainRandomForest(false);
Alessio Netti's avatar
Alessio Netti committed
124
125
    }
    if(_rForest.empty() || !(_rForest->isTrained() || (_trainingPending && _streaming))) 
126
        throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");
Alessio Netti's avatar
Alessio Netti committed
127
128
129
130
131
132
133
134
    if(_rForest->isTrained()) {
        reading_t predict;
        predict.value = (int64_t) _rForest->predict(*_currentfVector);
        predict.timestamp = getTimestamp();
        unit->getOutputs()[0]->storeReading(predict);
    }
}

135
void RegressorOperator::trainRandomForest(bool categorical) {
Alessio Netti's avatar
Alessio Netti committed
136
    if(!_trainingSet || _rForest.empty())
137
        throw std::runtime_error("Operator " + _name + ": cannot perform training, missing model!");
138
    if((uint64_t)_responseSet->size().height <= _targetDistance)
139
        throw std::runtime_error("Operator " + _name + ": cannot perform training, insufficient data!");
Alessio Netti's avatar
Alessio Netti committed
140
141
142
    // Shifting the training and response sets so as to obtain the desired prediction distance
    *_responseSet = _responseSet->rowRange(_targetDistance, _responseSet->size().height-1);
    *_trainingSet = _trainingSet->rowRange(0, _trainingSet->size().height-1-_targetDistance);
143
    shuffleTrainingSet();
144
145
146
147
148
149
150

    cv::Mat varType = cv::Mat(_trainingSet->size().width + 1, 1, CV_8U);
    varType.setTo(cv::Scalar(cv::ml::VAR_NUMERICAL));
    varType.at<unsigned char>(_trainingSet->size().width, 0) = categorical ? cv::ml::VAR_CATEGORICAL : cv::ml::VAR_NUMERICAL;
    cv::Ptr<cv::ml::TrainData> td = cv::ml::TrainData::create(*_trainingSet, cv::ml::ROW_SAMPLE, *_responseSet, cv::noArray(), cv::noArray(), cv::noArray(), varType);
    
    if(!_rForest->train(td))
151
        throw std::runtime_error("Operator " + _name + ": model training failed!");
152
153
    
    td.release();
Alessio Netti's avatar
Alessio Netti committed
154
155
156
157
158
    delete _trainingSet;
    _trainingSet = nullptr;
    delete _responseSet;
    _responseSet = nullptr;
    _trainingPending = false;
159
    LOG(info) << "Operator " + _name + ": model training performed.";
160
    LOG(info) << getImportances();
Alessio Netti's avatar
Alessio Netti committed
161
162
163
164
    if(_modelOut!="") {
        try {
            _rForest->save(_modelOut);
        } catch(const std::exception& e) {
165
            LOG(error) << "Operator " + _name + ": cannot save the model to a file!"; }
Alessio Netti's avatar
Alessio Netti committed
166
167
168
    }
}

169
void RegressorOperator::shuffleTrainingSet() {
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
    if(!_trainingSet || !_responseSet)
        return;
    size_t idx1, idx2, swaps = _trainingSet->size().height / 2;
    std::random_device dev;
    std::mt19937 rng(dev());
    rng.seed((unsigned)getTimestamp());
    std::uniform_int_distribution<std::mt19937::result_type> dist(0, _trainingSet->size().height-1);
    cv::Mat swapBuf;
    for(uint64_t nS=0; nS<swaps; nS++) {
        idx1 = dist(rng);
        idx2 = dist(rng);
        
        _trainingSet->row(idx1).copyTo(swapBuf);
        _trainingSet->row(idx2).copyTo(_trainingSet->row(idx1));
        swapBuf.copyTo(_trainingSet->row(idx2));

        _responseSet->row(idx1).copyTo(swapBuf);
        _responseSet->row(idx2).copyTo(_responseSet->row(idx1));
        swapBuf.copyTo(_responseSet->row(idx2));
        //LOG(debug) << "Swapping " << idx1 << " and " << idx2;
    }
}

193
void RegressorOperator::computeFeatureVector(U_Ptr unit) {
Alessio Netti's avatar
Alessio Netti committed
194
195
196
197
    if(!_currentfVector)
        _currentfVector = new cv::Mat(1, unit->getInputs().size()*REG_NUMFEATURES, CV_32F);
    int64_t val;
    size_t qId, qMod, idx, fIdx;
198
199
    uint64_t endTs = getTimestamp();
    uint64_t startTs = endTs - _aggregationWindow;
Alessio Netti's avatar
Alessio Netti committed
200
201
    std::vector<RegressorSBPtr>& inputs = unit->getInputs();
    for(idx=0; idx<inputs.size(); idx++) {
202
        _mean=0; _std=0; _diffsum=0; _qtl25=0; _qtl75=0; _latest=0;
203
        _buffer.clear();
204
        if(!_queryEngine.querySensor(inputs[idx]->getName(), startTs, endTs, _buffer, false) || _buffer.empty())
205
            throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
206
        _latest = _buffer.back().value;
Alessio Netti's avatar
Alessio Netti committed
207
        if (inputs[idx]->getTrainingTarget())
208
            _currentTarget = (float)_latest;
Alessio Netti's avatar
Alessio Netti committed
209

Alessio Netti's avatar
Alessio Netti committed
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
        if(!inputs[idx]->getTrainingTarget() || _includeTarget) {
            // Computing MEAN and SUM OF DIFFERENCES
            val = _buffer.front().value;
            for (const auto &v : _buffer) {
                _mean += v.value;
                _diffsum += v.value - val;
                val = v.value;
            }
            _mean /= _buffer.size();

            // Computing STD
            for (const auto &v : _buffer) {
                val = v.value - _mean;
                _std += val * val;
            }
            _std = sqrt(_std / _buffer.size());

            // I know, sorting is costly; here, we assume that the aggregation window of sensor data is going to be relatively
            // small, in which case the O(log(N)) complexity of the std::sort implementation converges to O(N)
            std::sort(_buffer.begin(), _buffer.end(),
                      [](const reading_t &lhs, const reading_t &rhs) { return lhs.value < rhs.value; });
            // Computing 25th PERCENTILE
            qId = (_buffer.size() * 25) / 100;
            qMod = (_buffer.size() * 25) % 100;
            _qtl25 = (qMod == 0 || qId == _buffer.size() - 1) ? _buffer[qId].value :
                     (_buffer[qId].value + _buffer[qId + 1].value) / 2;
            // Computing 75th PERCENTILE
            qId = (_buffer.size() * 75) / 100;
            qMod = (_buffer.size() * 75) % 100;
            _qtl75 = (qMod == 0 || qId == _buffer.size() - 1) ? _buffer[qId].value :
                     (_buffer[qId].value + _buffer[qId + 1].value) / 2;

            fIdx = idx * REG_NUMFEATURES;
            // Casting and storing the statistical features
            _currentfVector->at<float>(fIdx) = (float) _mean;
            _currentfVector->at<float>(fIdx + 1) = (float) _std;
            _currentfVector->at<float>(fIdx + 2) = (float) _diffsum;
            _currentfVector->at<float>(fIdx + 3) = (float) _qtl25;
            _currentfVector->at<float>(fIdx + 4) = (float) _qtl75;
249
            _currentfVector->at<float>(fIdx + 5) = (float) _latest;
Alessio Netti's avatar
Alessio Netti committed
250
        } else {
Alessio Netti's avatar
Alessio Netti committed
251
            fIdx = idx * REG_NUMFEATURES;
Alessio Netti's avatar
Alessio Netti committed
252
253
254
255
256
257
            _currentfVector->at<float>(fIdx) = 0.0f;
            _currentfVector->at<float>(fIdx + 1) = 0.0f;
            _currentfVector->at<float>(fIdx + 2) = 0.0f;
            _currentfVector->at<float>(fIdx + 3) = 0.0f;
            _currentfVector->at<float>(fIdx + 4) = 0.0f;
            _currentfVector->at<float>(fIdx + 5) = 0.0f;
Alessio Netti's avatar
Alessio Netti committed
258
259
260
261
262
263
264
        }
    }
    //LOG(error) << "Target: " << _currentTarget;
    //LOG(error) << "Vector: ";
    //for(idx=0; idx<_currentfVector->size().width;idx++)
    //    LOG(error) << _currentfVector->at<float>(idx);
}
265

266
std::string RegressorOperator::getImportances() {
267
    if(!_importances || _rForest.empty() || !_rForest->getCalculateVarImportance())
268
        return "Operator " + _name + ": feature importances are not available.";
269
270
271
    std::vector<ImportancePair> impLabels;
    cv::Mat_<float> impValues; 
    _rForest->getVarImportance().convertTo(impValues, CV_32F);
272
    if(impValues.empty() || _units.empty() || impValues.total()!=REG_NUMFEATURES*_units[0]->getInputs().size())
273
        return "Operator " + _name + ": error when computing feature importances.";
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
    
    for(size_t idx=0; idx<impValues.total(); idx++) {
        ImportancePair pair;
        pair.name = _units[0]->getInputs().at(idx/REG_NUMFEATURES)->getName();
        pair.value = impValues.at<float>(idx);
        switch(idx%REG_NUMFEATURES) {
            case 0:
                pair.name += " - mean";
                break;
            case 1:
                pair.name += " - std";
                break;
            case 2:
                pair.name += " - diffsum";
                break;
            case 3:
                pair.name += " - qtl25";
                break;
            case 4:
                pair.name += " - qtl75";
                break;
Alessio Netti's avatar
Alessio Netti committed
295
296
297
            case 5:
                pair.name += " - latest";
                break;
298
299
300
301
302
303
304
            default:
                break;
        }
        impLabels.push_back(pair);
    }
    
    std::sort(impLabels.begin(), impLabels.end(), [ ](const ImportancePair& lhs, const ImportancePair& rhs) { return lhs.value > rhs.value; });
305
    std::string outString="Operator " + _name + ": listing feature importances for unit " + _units[0]->getName() + ":\n";
306
307
308
309
    for(const auto& imp : impLabels)
        outString +=  "    " + imp.name + " - " + std::to_string(imp.value) + "\n";
    return outString;
}