24.09., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

RegressorOperator.cpp 12.6 KB
Newer Older
Alessio Netti's avatar
Alessio Netti committed
1
//================================================================================
2
// Name        : RegressorOperator.cpp
Alessio Netti's avatar
Alessio Netti committed
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
Alessio Netti's avatar
Alessio Netti committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
// Copyright   : Leibniz Supercomputing Centre
// Description :
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

28
#include "RegressorOperator.h"
Alessio Netti's avatar
Alessio Netti committed
29

30
RegressorOperator::RegressorOperator(const std::string& name) : OperatorTemplate(name) {
Alessio Netti's avatar
Alessio Netti committed
31 32 33 34 35 36
    _modelIn = "";
    _modelOut = "";
    _aggregationWindow = 0;
    _targetDistance = 1;
    _trainingSamples = 256;
    _trainingPending = true;
37
    _importances = false;
Alessio Netti's avatar
Alessio Netti committed
38
    _includeTarget = true;
Alessio Netti's avatar
Alessio Netti committed
39 40 41 42 43
    _trainingSet = nullptr;
    _responseSet = nullptr;
    _currentfVector = nullptr;
}

44
RegressorOperator::RegressorOperator(const RegressorOperator& other) : OperatorTemplate(other) {
Alessio Netti's avatar
Alessio Netti committed
45
    _modelIn = other._modelIn;
46
    _modelOut = "";
Alessio Netti's avatar
Alessio Netti committed
47 48 49
    _aggregationWindow = other._aggregationWindow;
    _targetDistance = other._targetDistance;
    _trainingSamples = other._trainingSamples;
50
    _importances = other._importances;
Alessio Netti's avatar
Alessio Netti committed
51
    _includeTarget = true;
52 53 54 55
    _trainingPending = true;
    _trainingSet = nullptr;
    _responseSet = nullptr;
    _currentfVector = nullptr;
Alessio Netti's avatar
Alessio Netti committed
56 57
}

58
RegressorOperator::~RegressorOperator() {
Alessio Netti's avatar
Alessio Netti committed
59 60 61 62 63 64 65 66 67 68 69 70
    if(_trainingSet)
        delete _trainingSet;
    if(_responseSet)
        delete _responseSet;
    if(_currentfVector)
        delete _currentfVector;
    _rForest.release();
    _currentfVector = nullptr;
    _trainingSet = nullptr;
    _responseSet = nullptr;
}

71
restResponse_t RegressorOperator::REST(const string& action, const unordered_map<string, string>& queries) {
Alessio Netti's avatar
Alessio Netti committed
72 73
    restResponse_t resp;
    if(action=="train") {
74
        resp.response = "Re-training triggered for model " + this->_name + "!\n";
Alessio Netti's avatar
Alessio Netti committed
75
        this->_trainingPending = true;
76 77
    } else if(action=="importances") {
        resp.response = getImportances();
Alessio Netti's avatar
Alessio Netti committed
78 79 80 81 82
    } else
        throw invalid_argument("Unknown plugin action " + action + " requested!");
    return resp;
}

83
void RegressorOperator::execOnInit() {
Alessio Netti's avatar
Alessio Netti committed
84 85 86 87
    bool useDefault=true;
    if(_modelIn!="") {
        try {
            _rForest = cv::ml::RTrees::load(_modelIn);
88
            if(!_rForest->isTrained() || _units.empty() || _units[0]->getInputs().size()*REG_NUMFEATURES!=(uint64_t)_rForest->getVarCount()) 
89
                LOG(error) << "Operator " + _name + ": incompatible model, falling back to default!";
Alessio Netti's avatar
Alessio Netti committed
90 91 92 93 94
            else {
                _trainingPending = false;
                useDefault = false;
            }
        } catch(const std::exception& e) {
95
            LOG(error) << "Operator " + _name + ": cannot load model from file, falling back to default!"; }
Alessio Netti's avatar
Alessio Netti committed
96
    }
97
    if(useDefault) {
Alessio Netti's avatar
Alessio Netti committed
98
        _rForest = cv::ml::RTrees::create();
99 100
        _rForest->setCalculateVarImportance(_importances);
    }
Alessio Netti's avatar
Alessio Netti committed
101 102
}

103
void RegressorOperator::printConfig(LOG_LEVEL ll) {
Alessio Netti's avatar
Alessio Netti committed
104 105 106 107 108
    LOG_VAR(ll) << "            Window:          " << _aggregationWindow;
    LOG_VAR(ll) << "            Target Distance: " << _targetDistance;
    LOG_VAR(ll) << "            Training Sample: " << _trainingSamples;
    LOG_VAR(ll) << "            Input Path:      " << (_modelIn!="" ? _modelIn : std::string("none"));
    LOG_VAR(ll) << "            Output Path:     " << (_modelOut!="" ? _modelOut : std::string("none"));
109
    LOG_VAR(ll) << "            Importances:     " << (_importances ? "enabled" : "disabled");
110
    OperatorTemplate<RegressorSensorBase>::printConfig(ll);
Alessio Netti's avatar
Alessio Netti committed
111 112
}

113
void RegressorOperator::compute(U_Ptr unit) {
Alessio Netti's avatar
Alessio Netti committed
114 115 116 117 118 119 120 121
    computeFeatureVector(unit);
    if (_trainingPending && _streaming) {
        if (!_trainingSet)
            _trainingSet = new cv::Mat();
        if (!_responseSet)
            _responseSet = new cv::Mat();
        _trainingSet->push_back(*_currentfVector);
        _responseSet->push_back(_currentTarget);
122
        if ((uint64_t)_trainingSet->size().height >= _trainingSamples + _targetDistance)
Alessio Netti's avatar
Alessio Netti committed
123 124 125
            trainRandomForest();
    }
    if(_rForest.empty() || !(_rForest->isTrained() || (_trainingPending && _streaming))) 
126
        throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");
Alessio Netti's avatar
Alessio Netti committed
127 128 129 130 131 132 133 134
    if(_rForest->isTrained()) {
        reading_t predict;
        predict.value = (int64_t) _rForest->predict(*_currentfVector);
        predict.timestamp = getTimestamp();
        unit->getOutputs()[0]->storeReading(predict);
    }
}

135
void RegressorOperator::trainRandomForest() {
Alessio Netti's avatar
Alessio Netti committed
136
    if(!_trainingSet || _rForest.empty())
137
        throw std::runtime_error("Operator " + _name + ": cannot perform training, missing model!");
138
    if((uint64_t)_responseSet->size().height <= _targetDistance)
139
        throw std::runtime_error("Operator " + _name + ": cannot perform training, insufficient data!");
Alessio Netti's avatar
Alessio Netti committed
140 141 142
    // Shifting the training and response sets so as to obtain the desired prediction distance
    *_responseSet = _responseSet->rowRange(_targetDistance, _responseSet->size().height-1);
    *_trainingSet = _trainingSet->rowRange(0, _trainingSet->size().height-1-_targetDistance);
143
    shuffleTrainingSet();
Alessio Netti's avatar
Alessio Netti committed
144
    if(!_rForest->train(*_trainingSet, cv::ml::ROW_SAMPLE, *_responseSet))
145
        throw std::runtime_error("Operator " + _name + ": model training failed!");
Alessio Netti's avatar
Alessio Netti committed
146 147 148 149 150
    delete _trainingSet;
    _trainingSet = nullptr;
    delete _responseSet;
    _responseSet = nullptr;
    _trainingPending = false;
151
    LOG(info) << "Operator " + _name + ": model training performed.";
152
    LOG(info) << getImportances();
Alessio Netti's avatar
Alessio Netti committed
153 154 155 156
    if(_modelOut!="") {
        try {
            _rForest->save(_modelOut);
        } catch(const std::exception& e) {
157
            LOG(error) << "Operator " + _name + ": cannot save the model to a file!"; }
Alessio Netti's avatar
Alessio Netti committed
158 159 160
    }
}

161
void RegressorOperator::shuffleTrainingSet() {
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
    if(!_trainingSet || !_responseSet)
        return;
    size_t idx1, idx2, swaps = _trainingSet->size().height / 2;
    std::random_device dev;
    std::mt19937 rng(dev());
    rng.seed((unsigned)getTimestamp());
    std::uniform_int_distribution<std::mt19937::result_type> dist(0, _trainingSet->size().height-1);
    cv::Mat swapBuf;
    for(uint64_t nS=0; nS<swaps; nS++) {
        idx1 = dist(rng);
        idx2 = dist(rng);
        
        _trainingSet->row(idx1).copyTo(swapBuf);
        _trainingSet->row(idx2).copyTo(_trainingSet->row(idx1));
        swapBuf.copyTo(_trainingSet->row(idx2));

        _responseSet->row(idx1).copyTo(swapBuf);
        _responseSet->row(idx2).copyTo(_responseSet->row(idx1));
        swapBuf.copyTo(_responseSet->row(idx2));
        //LOG(debug) << "Swapping " << idx1 << " and " << idx2;
    }
}

185
void RegressorOperator::computeFeatureVector(U_Ptr unit) {
Alessio Netti's avatar
Alessio Netti committed
186 187 188 189
    if(!_currentfVector)
        _currentfVector = new cv::Mat(1, unit->getInputs().size()*REG_NUMFEATURES, CV_32F);
    int64_t val;
    size_t qId, qMod, idx, fIdx;
190 191
    uint64_t endTs = getTimestamp();
    uint64_t startTs = endTs - _aggregationWindow;
Alessio Netti's avatar
Alessio Netti committed
192 193 194
    std::vector<RegressorSBPtr>& inputs = unit->getInputs();
    for(idx=0; idx<inputs.size(); idx++) {
        _mean=0; _std=0; _diffsum=0; _qtl25=0; _qtl75=0;
195
        _buffer.clear();
196
        if(!_queryEngine.querySensor(inputs[idx]->getName(), startTs, endTs, _buffer, false) || _buffer.empty())
197
            throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
Alessio Netti's avatar
Alessio Netti committed
198
        if (inputs[idx]->getTrainingTarget())
199
            _currentTarget = (float)_buffer.back().value;
Alessio Netti's avatar
Alessio Netti committed
200

Alessio Netti's avatar
Alessio Netti committed
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
        if(!inputs[idx]->getTrainingTarget() || _includeTarget) {
            // Computing MEAN and SUM OF DIFFERENCES
            val = _buffer.front().value;
            for (const auto &v : _buffer) {
                _mean += v.value;
                _diffsum += v.value - val;
                val = v.value;
            }
            _mean /= _buffer.size();

            // Computing STD
            for (const auto &v : _buffer) {
                val = v.value - _mean;
                _std += val * val;
            }
            _std = sqrt(_std / _buffer.size());

            // I know, sorting is costly; here, we assume that the aggregation window of sensor data is going to be relatively
            // small, in which case the O(log(N)) complexity of the std::sort implementation converges to O(N)
            std::sort(_buffer.begin(), _buffer.end(),
                      [](const reading_t &lhs, const reading_t &rhs) { return lhs.value < rhs.value; });
            // Computing 25th PERCENTILE
            qId = (_buffer.size() * 25) / 100;
            qMod = (_buffer.size() * 25) % 100;
            _qtl25 = (qMod == 0 || qId == _buffer.size() - 1) ? _buffer[qId].value :
                     (_buffer[qId].value + _buffer[qId + 1].value) / 2;
            // Computing 75th PERCENTILE
            qId = (_buffer.size() * 75) / 100;
            qMod = (_buffer.size() * 75) % 100;
            _qtl75 = (qMod == 0 || qId == _buffer.size() - 1) ? _buffer[qId].value :
                     (_buffer[qId].value + _buffer[qId + 1].value) / 2;

            fIdx = idx * REG_NUMFEATURES;
            // Casting and storing the statistical features
            _currentfVector->at<float>(fIdx) = (float) _mean;
            _currentfVector->at<float>(fIdx + 1) = (float) _std;
            _currentfVector->at<float>(fIdx + 2) = (float) _diffsum;
            _currentfVector->at<float>(fIdx + 3) = (float) _qtl25;
            _currentfVector->at<float>(fIdx + 4) = (float) _qtl75;
            _currentfVector->at<float>(fIdx + 5) = (float) _buffer[_buffer.size() - 1].value;
        } else {
Alessio Netti's avatar
Alessio Netti committed
242
            fIdx = idx * REG_NUMFEATURES;
Alessio Netti's avatar
Alessio Netti committed
243 244 245 246 247 248
            _currentfVector->at<float>(fIdx) = 0.0f;
            _currentfVector->at<float>(fIdx + 1) = 0.0f;
            _currentfVector->at<float>(fIdx + 2) = 0.0f;
            _currentfVector->at<float>(fIdx + 3) = 0.0f;
            _currentfVector->at<float>(fIdx + 4) = 0.0f;
            _currentfVector->at<float>(fIdx + 5) = 0.0f;
Alessio Netti's avatar
Alessio Netti committed
249 250 251 252 253 254 255
        }
    }
    //LOG(error) << "Target: " << _currentTarget;
    //LOG(error) << "Vector: ";
    //for(idx=0; idx<_currentfVector->size().width;idx++)
    //    LOG(error) << _currentfVector->at<float>(idx);
}
256

257
std::string RegressorOperator::getImportances() {
258
    if(!_importances || _rForest.empty() || !_rForest->getCalculateVarImportance())
259
        return "Operator " + _name + ": feature importances are not available.";
260 261 262
    std::vector<ImportancePair> impLabels;
    cv::Mat_<float> impValues; 
    _rForest->getVarImportance().convertTo(impValues, CV_32F);
263
    if(impValues.empty() || _units.empty() || impValues.total()!=REG_NUMFEATURES*_units[0]->getInputs().size())
264
        return "Operator " + _name + ": error when computing feature importances.";
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
    
    for(size_t idx=0; idx<impValues.total(); idx++) {
        ImportancePair pair;
        pair.name = _units[0]->getInputs().at(idx/REG_NUMFEATURES)->getName();
        pair.value = impValues.at<float>(idx);
        switch(idx%REG_NUMFEATURES) {
            case 0:
                pair.name += " - mean";
                break;
            case 1:
                pair.name += " - std";
                break;
            case 2:
                pair.name += " - diffsum";
                break;
            case 3:
                pair.name += " - qtl25";
                break;
            case 4:
                pair.name += " - qtl75";
                break;
Alessio Netti's avatar
Alessio Netti committed
286 287 288
            case 5:
                pair.name += " - latest";
                break;
289 290 291 292 293 294 295
            default:
                break;
        }
        impLabels.push_back(pair);
    }
    
    std::sort(impLabels.begin(), impLabels.end(), [ ](const ImportancePair& lhs, const ImportancePair& rhs) { return lhs.value > rhs.value; });
296
    std::string outString="Operator " + _name + ": listing feature importances for unit " + _units[0]->getName() + ":\n";
297 298 299 300
    for(const auto& imp : impLabels)
        outString +=  "    " + imp.name + " - " + std::to_string(imp.value) + "\n";
    return outString;
}