In January 2021 we will introduce a 10 GB quota for project repositories. Higher limits for individual projects will be available on request. Please see https://doku.lrz.de/display/PUBLIC/GitLab for more information.

Commit 6bd5c76a authored by Alessio Netti's avatar Alessio Netti

Analytics: adding lookback to clustering plugin

- Can be used to increase the number of data points for model training
parent ea377daa
......@@ -871,6 +871,7 @@ label of the gaussian component to which each sub-unit belongs is stored in the
| Value | Explanation |
|:----- |:----------- |
| window | Length in milliseconds of the time window that is used to retrieve recent readings for the input sensors, starting from the latest one.
| lookbackWindow | Enables a lookback window (whose length is expressed in milliseconds) to collect additional points from previous time windows in order to perform model training. The additional points will be accumulated in memory until training can be performed. Disabled by default.
| numComponents | Number of gaussian components in the mixture model.
| reuseModel | Boolean value. If false, the GMM model is trained at each computation, otherwise only once or when the "train" REST API action is used. Default is false.
| outlierCut | Threshold used to determine outliers when performing the Mahalanobis distance.
......
......@@ -7,7 +7,7 @@ window 3600000
clustering cl1 {
default def1
window 5000
window 1800000
input {
......
......@@ -41,6 +41,8 @@ void ClusteringConfigurator::operatorAttributes(ClusteringOperator& op, CFG_VAL
{
if(boost::iequals(val.first, "window"))
op.setAggregationWindow(stoull(val.second.data()) * 1000000);
else if(boost::iequals(val.first, "lookbackWindow"))
op.setLookbackWindow(stoull(val.second.data()) * 1000000);
else if(boost::iequals(val.first, "inputPath"))
op.setInputPath(val.second.data());
else if(boost::iequals(val.first, "outputPath"))
......
......@@ -31,11 +31,14 @@ ClusteringOperator::ClusteringOperator(const std::string& name) : OperatorTempla
_modelIn = "";
_modelOut = "";
_aggregationWindow = 0;
_lookbackWindow = 0;
_numWindows = 0;
_numComponents = 3;
_outlierCut = 2.0f;
_reuseModel = false;
_trainingPending = true;
_trainingSet = cv::Mat();
_tempSet = cv::Mat();
_currentfVector = cv::Mat();
}
......@@ -43,11 +46,14 @@ ClusteringOperator::ClusteringOperator(const ClusteringOperator& other) : Operat
_modelIn = other._modelIn;
_modelOut = "";
_aggregationWindow = other._aggregationWindow;
_lookbackWindow = other._lookbackWindow;
_numWindows = other._numWindows;
_numComponents = other._numComponents;
_outlierCut = other._outlierCut;
_reuseModel = other._reuseModel;
_trainingPending = true;
_trainingSet = cv::Mat();
_tempSet = cv::Mat();
_currentfVector = cv::Mat();
}
......@@ -68,11 +74,17 @@ restResponse_t ClusteringOperator::REST(const string& action, const unordered_ma
}
void ClusteringOperator::execOnInit() {
if(_interval==0 || _lookbackWindow==0 || _lookbackWindow <= _aggregationWindow)
_numWindows = 0;
else
_numWindows = (_lookbackWindow - _aggregationWindow) / ((uint64_t)_interval * 1000000);
bool useDefault=true;
if(_modelIn!="") {
try {
_gmm = cv::ml::EM::load(_modelIn);
if(!_gmm->isTrained() || _units.empty() || _units[0]->getSubUnits().empty() || _units[0]->getSubUnits()[0]->getInputs().size()!=(uint64_t)_gmm->getMeans().size().width)
if(!_gmm->isTrained() || _units.empty() || _units[0]->getSubUnits().empty() ||
_units[0]->getSubUnits()[0]->getInputs().size()!=(uint64_t)_gmm->getMeans().size().width)
LOG(error) << "Operator " + _name + ": incompatible model, falling back to default!";
else {
_trainingPending = false;
......@@ -89,6 +101,7 @@ void ClusteringOperator::execOnInit() {
void ClusteringOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _aggregationWindow;
LOG_VAR(ll) << " Lookback window: " << _lookbackWindow;
LOG_VAR(ll) << " Input Path: " << (_modelIn!="" ? _modelIn : std::string("none"));
LOG_VAR(ll) << " Output Path: " << (_modelOut!="" ? _modelOut : std::string("none"));
LOG_VAR(ll) << " Clusters: " << _numComponents;
......@@ -98,59 +111,74 @@ void ClusteringOperator::printConfig(LOG_LEVEL ll) {
}
void ClusteringOperator::compute(U_Ptr unit) {
_trainingSet = cv::Mat();
if(_numWindows==0)
_trainingSet = cv::Mat();
_tempSet = cv::Mat();
for(const auto& su : unit->getSubUnits()) {
computeFeatureVector(su);
_trainingSet.push_back(_currentfVector);
_tempSet.push_back(_currentfVector);
}
if(_trainingSet.empty())
_trainingSet = _tempSet;
else
cv::vconcat(_tempSet, _trainingSet, _trainingSet);
if(!_trainingSet.empty()) {
if (_trainingPending || !_reuseModel) {
if(_gmm.empty())
throw std::runtime_error("Operator " + _name + ": cannot perform training, missing model!");
if(!_gmm->trainEM(_trainingSet))
throw std::runtime_error("Operator " + _name + ": model training failed!");
_trainingPending = false;
LOG(debug) << "Operator " + _name + ": model training performed.";
if(_modelOut!="") {
try {
_gmm->save(_modelOut);
} catch(const std::exception& e) {
LOG(error) << "Operator " + _name + ": cannot save the model to a file!"; }
}
// Performing training if the conditions are met
if ((_trainingPending || !_reuseModel) && _trainingSet.size().height/unit->getSubUnits().size() > _numWindows) {
if(_gmm.empty())
throw std::runtime_error("Operator " + _name + ": cannot perform training, missing model!");
if(!_gmm->trainEM(_trainingSet))
throw std::runtime_error("Operator " + _name + ": model training failed!");
_trainingPending = false;
LOG(debug) << "Operator " + _name + ": model training performed using " << _trainingSet.size().height << " points.";
if(_modelOut!="") {
try {
_gmm->save(_modelOut);
} catch(const std::exception& e) {
LOG(error) << "Operator " + _name + ": cannot save the model to a file!"; }
}
if(_gmm.empty() || !_gmm->isTrained())
throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");
}
// Checking that the operator is not in any invalid state
if(_gmm.empty() || !(_gmm->isTrained() || (_trainingPending && _streaming && _numWindows>0)))
throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");
std::vector<std::shared_ptr<UnitTemplate<ClusteringSensorBase>>> subUnits = unit->getSubUnits();
// Performing prediction
if(_gmm->isTrained()) {
std::vector <std::shared_ptr<UnitTemplate<ClusteringSensorBase>>> subUnits = unit->getSubUnits();
cv::Vec2d res;
int64_t label;
bool outlier;
std::vector<cv::Mat> covs;
std::vector <cv::Mat> covs;
_gmm->getCovs(covs);
reading_t predict;
predict.timestamp = getTimestamp();
for(unsigned int idx=0; idx<subUnits.size(); idx++) {
for (unsigned int idx = 0; idx < subUnits.size(); idx++) {
res = _gmm->predict2(_trainingSet.row(idx), cv::noArray());
label = (int64_t)res[1];
label = (int64_t) res[1];
outlier = isOutlier(_trainingSet.row(idx), _gmm->getMeans().row(label), covs[label]);
predict.value = outlier ? OUTLIER_ID : label;
subUnits[idx]->getOutputs()[0]->storeReading(predict);
}
}
if(_numWindows==0)
_trainingSet = cv::Mat();
// Removing the oldest time window if lookback is enabled
else if(_trainingSet.size().height/unit->getSubUnits().size() > _numWindows)
_trainingSet = _trainingSet.rowRange(0, _numWindows * unit->getSubUnits().size());
_tempSet = cv::Mat();
}
void ClusteringOperator::computeFeatureVector(U_Ptr unit) {
void ClusteringOperator::computeFeatureVector(U_Ptr unit, uint64_t offset) {
_currentfVector = cv::Mat(1, unit->getInputs().size(), CV_32F);
std::vector<ClusteringSBPtr>& inputs = unit->getInputs();
for(size_t idx=0; idx<inputs.size(); idx++) {
_mean=0;
_buffer.clear();
if(!_queryEngine.querySensor(inputs[idx]->getName(), _aggregationWindow, 0, _buffer) || _buffer.empty())
if(!_queryEngine.querySensor(inputs[idx]->getName(), _aggregationWindow - offset, offset, _buffer) || _buffer.empty())
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
// Computing MEAN
......
......@@ -60,15 +60,17 @@ public:
void setInputPath(std::string in) { _modelIn = in; }
void setOutputPath(std::string out) { _modelOut = out; }
void setAggregationWindow(unsigned long long a) { _aggregationWindow = a; }
void setLookbackWindow(unsigned long long w) { _lookbackWindow = w; }
void setNumComponents(unsigned long long n) { _numComponents = n; }
void setOutlierCut(float s) { _outlierCut = s; }
void setReuseModel(bool r) { _reuseModel = r; }
void triggerTraining() { _trainingPending = true; }
std::string getInputPath() { return _modelIn;}
std::string getOutputPath() { return _modelOut; }
unsigned long long getAggregationWindow() { return _aggregationWindow; }
unsigned long long getLookbackWindow() { return _lookbackWindow; }
unsigned long long getNumComponents() { return _numComponents; }
float getOutlierCut() { return _outlierCut; }
bool getReuseModel() { return _reuseModel; }
......@@ -78,13 +80,15 @@ public:
protected:
virtual void compute(U_Ptr unit) override;
void computeFeatureVector(U_Ptr unit);
void computeFeatureVector(U_Ptr unit, uint64_t offset=0);
bool isOutlier(cv::Mat vec1, cv::Mat vec2, cv::Mat cov);
std::string printMeans();
std::string _modelOut;
std::string _modelIn;
unsigned long long _aggregationWindow;
unsigned long long _lookbackWindow;
unsigned long long _numWindows;
unsigned long long _numComponents;
float _outlierCut;
bool _reuseModel;
......@@ -93,6 +97,7 @@ protected:
vector<reading_t> _buffer;
cv::Ptr<cv::ml::EM> _gmm;
cv::Mat _trainingSet;
cv::Mat _tempSet;
cv::Mat _currentfVector;
// Misc buffers
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment