Commit 8f1108f1 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: proper scaling in signatures plugin

parent 40a5404a
......@@ -917,6 +917,7 @@ Operators in this plugin support the following configuration parameters:
| window | Length in milliseconds of the time window that is used to retrieve recent readings for the input sensors, starting from the latest one, that are then aggregated in the signatures.
| trainingSamples | Number of samples for the sensors that are to be used to train the CS algorithm.
| numBlocks | Desired number of blocks in the signatures.
| scalingFactor | Scaling factor (and upper bound) used to compute the blocks. Default is 1000000.
| inputPath | Path of a file in which the order of the sensors and their upper/lower bounds are stored.
| outputPath | Path of a file to which the order of the sensors and their upper/lower bounds must be saved.
......
......@@ -56,7 +56,9 @@ void CSConfigurator::operatorAttributes(CSOperator& op, CFG_VAL config) {
else if(boost::iequals(val.first, "numBlocks"))
op.setNumBlocks(stoull(val.second.data()));
else if(boost::iequals(val.first, "trainingSamples"))
op.setTrainingSamples(stoull(val.second.data()));
op.setTrainingSamples(stoull(val.second.data()));
else if (boost::iequals(val.first, "scalingFactor"))
op.setScalingFactor(stoull(val.second.data()));
}
}
......
......@@ -33,9 +33,10 @@ CSOperator::CSOperator(const std::string& name) : OperatorTemplate(name) {
_aggregationWindow = 0;
_trainingSamples = 3600;
_numBlocks = 20;
_scalingFactor = 1000000;
_reuseModel = true;
_trainingPending = true;
_trainingReady = false;
_trainingReady = -1;
}
CSOperator::CSOperator(const CSOperator& other) : OperatorTemplate(other) {
......@@ -44,9 +45,10 @@ CSOperator::CSOperator(const CSOperator& other) : OperatorTemplate(other) {
_aggregationWindow = other._aggregationWindow;
_trainingSamples = other._trainingSamples;
_numBlocks = other._numBlocks;
_scalingFactor = other._scalingFactor;
_reuseModel = other._reuseModel;
_trainingPending = true;
_trainingReady = false;
_trainingReady = -1;
}
CSOperator::~CSOperator() {}
......@@ -56,7 +58,7 @@ restResponse_t CSOperator::REST(const string& action, const unordered_map<string
if(action=="train") {
resp.response = "Re-training triggered for CS Signatures operator " + this->_name + "!\n";
this->_trainingPending = true;
this->_trainingReady = false;
this->_trainingReady = -1;
} else
throw invalid_argument("Unknown plugin action " + action + " requested!");
return resp;
......@@ -67,6 +69,7 @@ void CSOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Input Path: " << (_modelIn!="" ? _modelIn : std::string("none"));
LOG_VAR(ll) << " Output Path: " << (_modelOut!="" ? _modelOut : std::string("none"));
LOG_VAR(ll) << " Blocks: " << _numBlocks;
LOG_VAR(ll) << " Scaling factor: " << _scalingFactor;
LOG_VAR(ll) << " Training Sample: " << _trainingSamples;
LOG_VAR(ll) << " Reuse Model: " << (_reuseModel ? "enabled" : "disabled");
OperatorTemplate<CSSensorBase>::printConfig(ll);
......@@ -88,7 +91,7 @@ void CSOperator::execOnInit() {
LOG(error) << "Operator " + _name + ": incompatible CS data, falling back to default!";
else {
_trainingPending = false;
_trainingReady = false;
_trainingReady = -1;
useDefault = false;
}
} catch(const std::exception& e) {
......@@ -96,7 +99,7 @@ void CSOperator::execOnInit() {
}
if(useDefault) {
_trainingPending = true;
_trainingReady = false;
_trainingReady = -1;
_max.clear();
_min.clear();
_permVector.clear();
......@@ -116,18 +119,18 @@ void CSOperator::compute(U_Ptr unit) {
for(size_t idx=0; idx<unit->getInputs().size(); idx++)
accumulateData(_trainingData, unit->getInputs()[idx], idx, nowTs);
// Performing training once enough samples are obtained
if(!_trainingData.empty() && _trainingReady) {
if(!_trainingData.empty() && _trainingReady!=-1) {
if(!checkTrainingSet(_trainingData)) {
LOG(error) << "Operator " + _name + ": collected training set does not appear to be valid!";
_trainingData.clear();
_trainingPending = true;
_trainingReady = false;
_trainingReady = -1;
} else {
computeMinMax(_trainingData);
computePermutation(_trainingData);
_trainingData.clear();
_trainingPending = false;
_trainingReady = false;
_trainingReady = -1;
if (_modelOut != "" && !dumpToFile(_modelOut))
LOG(error) << "Operator " + _name + ": cannot save CS data to a file!";
}
......@@ -231,7 +234,7 @@ void CSOperator::accumulateData(std::vector<std::vector<reading_t>>& v, CSSBPtr
v[idx].insert(v[idx].end(), _buffer.begin(), _buffer.end());
// Triggering training if right amount of sensor readings is reached
if(v[idx].size() >= _trainingSamples)
_trainingReady = true;
_trainingReady = idx;
}
}
......@@ -242,9 +245,8 @@ void CSOperator::computePermutation(std::vector<std::vector<reading_t>>& v) {
// Evaluation parameters post-interpolation
// Beware of the accuracy loss: casting timestamps to doubles should result in a loss only
// at the level of microseconds, but if there are issues, then check this
//TODO: do not use sensor 0 as reference
double startEval=(double)v[0].front().timestamp;
double stepEval=(double)(v[0].back().timestamp - v[0].front().timestamp) / (double)_trainingSamples;
double startEval=(double)v[_trainingReady].front().timestamp;
double stepEval=(double)(v[_trainingReady].back().timestamp - v[_trainingReady].front().timestamp) / (double)_trainingSamples;
double startInterp, stepInterp;
for(size_t idx=0; idx<v.size(); idx++) {
std::vector<reading_t>& vals = v[idx];
......@@ -420,7 +422,7 @@ void CSOperator::computeSignature(U_Ptr unit, uint64_t nowTs) {
void CSOperator::normalize(std::vector<reading_t> &v, size_t idx) {
int64_t denom = _max[idx]!=_min[idx] ? (_max[idx] - _min[idx]) : 1;
for(size_t idx2=0; idx2<v.size(); idx2++)
v[idx2].value = (v[idx2].value - _min[idx]) / denom;
v[idx2].value = (v[idx2].value - _min[idx]) * _scalingFactor / denom;
}
// Computes average sensor values
......
......@@ -66,6 +66,7 @@ public:
void setAggregationWindow(unsigned long long a) { _aggregationWindow = a; }
void setTrainingSamples(unsigned long long s) { if(s>100) _trainingSamples = s; }
void setNumBlocks(unsigned long long b) { if(b>0) _numBlocks = b; }
void setScalingFactor(unsigned long long sf) { if(sf>0) _scalingFactor = sf; }
void setReuseModel(bool r) { _reuseModel = r; }
void triggerTraining() { _trainingPending = true; }
......@@ -75,6 +76,7 @@ public:
unsigned long long getAggregationWindow() { return _aggregationWindow; }
unsigned long long getTrainingSamples() { return _trainingSamples; }
unsigned long long getNumBlocks() { return _numBlocks; }
unsigned long long getScalingFactor() { return _scalingFactor; }
bool getReuseModel() { return _reuseModel; }
void printConfig(LOG_LEVEL ll) override;
......@@ -102,9 +104,11 @@ protected:
unsigned long long _aggregationWindow;
unsigned long long _trainingSamples;
unsigned long long _numBlocks;
bool _reuseModel;
unsigned long long _scalingFactor;
unsigned long long _trainingReady;
bool _trainingPending;
bool _trainingReady;
bool _reuseModel;
// CS data
size_t _actualBlocks;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment