Commit 40a5404a authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: fixes to signatures plugin

parent 3c197a34
......@@ -908,7 +908,7 @@ However, it supports the following additional REST API actions:
## CS Signatures Plugin <a name="csPlugin"></a>
The _CS Signatures_ plugin computes signatures from sensor data as described in the paper _"Correlation-wise Smoothing: Lightweight Knowledge Extraction for HPC Monitoring Data"_ by Netti et al. The signatures aggregate data both in time and across sensors, and are composed by a specified number of complex-valued _blocks_.
The _CS Signatures_ plugin computes signatures from sensor data, which aggregate data both in time and across sensors, and are composed by a specified number of complex-valued _blocks_.
Each of the blocks is then stored in two separate sensors, which contain respectively the real and imaginary part of the block. Like in the _Regressor_ and _Classifier_ plugins, the CS algorithm is trained using a specified number of samples, which are accumulated in memory, subsequently learning the correlations between sensors.
Operators in this plugin support the following configuration parameters:
......
......@@ -76,11 +76,6 @@ bool CSConfigurator::readUnits(CSOperator& op, std::vector<CSSBPtr>& protoInputs
std::vector<CSSBPtr>& protoGlobalOutputs, inputMode_t inputMode) {
vector <shared_ptr<UnitTemplate<CSSensorBase>>> *units = NULL;
if(protoInputs.empty()) {
LOG(error) << this->_operatorName << " " << op.getName() << ": No input sensors specified!";
return false;
}
if(protoOutputs.empty() || !protoGlobalOutputs.empty() || protoOutputs.size() > 2) {
LOG(error) << this->_operatorName << " " << op.getName() << ": Units must be flat with at most two output sensors!";
return false;
......@@ -98,10 +93,12 @@ bool CSConfigurator::readUnits(CSOperator& op, std::vector<CSSBPtr>& protoInputs
} else {
continue;
}
for(size_t i=0; i<op.getNumBlocks(); i++) {
auto outS = std::make_shared<CSSensorBase>(*s);
outS->setMqtt(outS->getMqtt() + std::to_string(i));
outS->setName(outS->getName() + std::to_string(i));
outS->setBlockID(i);
trueOutputs.push_back(outS);
}
}
......
......@@ -31,10 +31,11 @@ CSOperator::CSOperator(const std::string& name) : OperatorTemplate(name) {
_modelIn = "";
_modelOut = "";
_aggregationWindow = 0;
_trainingSamples = 10000;
_trainingSamples = 3600;
_numBlocks = 20;
_reuseModel = true;
_trainingPending = true;
_trainingReady = false;
}
CSOperator::CSOperator(const CSOperator& other) : OperatorTemplate(other) {
......@@ -45,6 +46,7 @@ CSOperator::CSOperator(const CSOperator& other) : OperatorTemplate(other) {
_numBlocks = other._numBlocks;
_reuseModel = other._reuseModel;
_trainingPending = true;
_trainingReady = false;
}
CSOperator::~CSOperator() {}
......@@ -54,6 +56,7 @@ restResponse_t CSOperator::REST(const string& action, const unordered_map<string
if(action=="train") {
resp.response = "Re-training triggered for CS Signatures operator " + this->_name + "!\n";
this->_trainingPending = true;
this->_trainingReady = false;
} else
throw invalid_argument("Unknown plugin action " + action + " requested!");
return resp;
......@@ -71,12 +74,21 @@ void CSOperator::printConfig(LOG_LEVEL ll) {
void CSOperator::execOnInit() {
bool useDefault=true;
// Establishing the training unit and the appropriate number of signature blocks
if(_streaming && !_units.empty()) {
_trainingUnit = _units[0]->getName();
_actualBlocks = _units[0]->getInputs().size() < _numBlocks ? _units[0]->getInputs().size() : _numBlocks;
} else {
_actualBlocks = _numBlocks;
}
if(_modelIn!="") {
try {
if(!readFromFile(_modelIn))
LOG(error) << "Operator " + _name + ": incompatible CS data, falling back to default!";
else {
_trainingPending = false;
_trainingReady = false;
useDefault = false;
}
} catch(const std::exception& e) {
......@@ -84,6 +96,7 @@ void CSOperator::execOnInit() {
}
if(useDefault) {
_trainingPending = true;
_trainingReady = false;
_max.clear();
_min.clear();
_permVector.clear();
......@@ -96,22 +109,28 @@ void CSOperator::compute(U_Ptr unit) {
uint64_t nowTs = getTimestamp();
// Training-related tasks
if(_trainingPending && _streaming && (_trainingUnit==unit->getName() || _trainingUnit=="")) {
_trainingUnit = unit->getName();
if(_trainingPending && _streaming && _trainingUnit==unit->getName()) {
// Fetching sensor data
if(_trainingData.empty())
_trainingData.resize(unit->getInputs().size());
for(size_t idx=0; idx<unit->getInputs().size(); idx++)
accumulateData(_trainingData, unit->getInputs()[idx], idx, nowTs);
// Performing training once enough samples are obtained
if(!_trainingData.empty() && _trainingData[0].size() >= _trainingSamples) {
computeMinMax(_trainingData);
computePermutation(_trainingData);
_actualBlocks = _trainingData.size() < _numBlocks ? _trainingData.size() : _numBlocks;
_blockLen = (float)_trainingData.size() / (float)_actualBlocks;
_trainingData.clear();
_trainingUnit = "";
_trainingPending = false;
if(!_trainingData.empty() && _trainingReady) {
if(!checkTrainingSet(_trainingData)) {
LOG(error) << "Operator " + _name + ": collected training set does not appear to be valid!";
_trainingData.clear();
_trainingPending = true;
_trainingReady = false;
} else {
computeMinMax(_trainingData);
computePermutation(_trainingData);
_trainingData.clear();
_trainingPending = false;
_trainingReady = false;
if (_modelOut != "" && !dumpToFile(_modelOut))
LOG(error) << "Operator " + _name + ": cannot save CS data to a file!";
}
}
}
......@@ -137,25 +156,20 @@ bool CSOperator::dumpToFile(std::string &path) {
if(_trainingPending || _permVector.empty())
return false;
// In JSON mode, sensors are arranged hierarchically by plugin->operator->sensor
for(size_t idx=0; idx<_numBlocks; idx++) {
// Saving CS data in terms of permutation index, minimum and maximum for each input sensor
for(size_t idx=0; idx<_actualBlocks; idx++) {
boost::property_tree::ptree group;
group.push_back(boost::property_tree::ptree::value_type("idx", std::to_string(_permVector[idx])));
group.push_back(boost::property_tree::ptree::value_type("min", std::to_string(_min[idx])));
group.push_back(boost::property_tree::ptree::value_type("max", std::to_string(_max[idx])));
blocks.add_child(std::to_string(idx), group);
}
root.add_child(std::to_string(_numBlocks), blocks);
root.add_child(std::to_string(_actualBlocks), blocks);
try {
boost::property_tree::write_json(data, root, true);
std::ofstream outFile(path);
if(!outFile.is_open())
return false;
else {
outFile << data.str();
outFile.close();
}
boost::property_tree::write_json(outFile, root, true);
outFile.close();
} catch(const std::exception &e) { return false; }
return true;
}
......@@ -167,19 +181,21 @@ bool CSOperator::readFromFile(std::string &path) {
} catch(const std::exception &e) { return false; }
// The root JSON node encodes the number of blocks and has to match with that of the operator
std::string blockString = std::to_string(_numBlocks);
std::string blockString = std::to_string(_actualBlocks);
if(config.find(blockString) == config.not_found())
return false;
std::vector<size_t> newPermVector(_numBlocks);
std::vector<int64_t> newMin(_numBlocks);
std::vector<int64_t> newMax(_numBlocks);
std::vector<size_t> newPermVector(_actualBlocks);
std::vector<int64_t> newMin(_actualBlocks);
std::vector<int64_t> newMax(_actualBlocks);
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config.get_child(blockString)) {
size_t blockID = std::stoull(val.first);
boost::property_tree::iptree &blk = val.second;
if(blk.find("idx")==blk.not_found() || blk.find("min")==blk.not_found() || blk.find("max")==blk.not_found())
return false;
if(blockID>=_actualBlocks)
return false;
BOOST_FOREACH(boost::property_tree::iptree::value_type &val2, blk) {
if (boost::iequals(val2.first, "idx")) {
......@@ -211,8 +227,12 @@ void CSOperator::accumulateData(std::vector<std::vector<reading_t>>& v, CSSBPtr
if(!_queryEngine.querySensor(s->getName(), startTs, endTs, _buffer, false))
return;
// We add the queried values only if they are actually "new"
if(!_buffer.empty() && _buffer[0].timestamp>v[idx].back().timestamp)
if(!_buffer.empty() && _buffer[0].timestamp>v[idx].back().timestamp) {
v[idx].insert(v[idx].end(), _buffer.begin(), _buffer.end());
// Triggering training if right amount of sensor readings is reached
if(v[idx].size() >= _trainingSamples)
_trainingReady = true;
}
}
// Applies the sorting stage of the CS method and finds a permutation vector
......@@ -220,13 +240,16 @@ void CSOperator::computePermutation(std::vector<std::vector<reading_t>>& v) {
// Each column of the matrix will be an interpolated sensor
cv::Mat sensorMatrix = cv::Mat(_trainingSamples, v.size(), CV_64F);
// Evaluation parameters post-interpolation
double startEval=v[0].front().timestamp;
double stepEval=(v[0].back().timestamp - v[0].front().timestamp) / _trainingSamples;
// Beware of the accuracy loss: casting timestamps to doubles should result in a loss only
// at the level of microseconds, but if there are issues, then check this
//TODO: do not use sensor 0 as reference
double startEval=(double)v[0].front().timestamp;
double stepEval=(double)(v[0].back().timestamp - v[0].front().timestamp) / (double)_trainingSamples;
double startInterp, stepInterp;
for(size_t idx=0; idx<v.size(); idx++) {
std::vector<reading_t>& vals = v[idx];
startInterp = startEval - vals.front().timestamp;
stepInterp = (vals.back().timestamp - vals.front().timestamp) / vals.size();
startInterp = startEval - (double)vals.front().timestamp;
stepInterp = (double)(vals.back().timestamp - vals.front().timestamp) / (double)vals.size();
// Copying element by element into a temporary vector - ugly and slow
std::vector<double> sValues(vals.size());
for(size_t idx2=0; idx2<vals.size(); idx2++)
......@@ -236,12 +259,14 @@ void CSOperator::computePermutation(std::vector<std::vector<reading_t>>& v) {
// Evaluating in the interpolated points and storing in the matrix
for(size_t idx2=0; idx2<_trainingSamples; idx2++)
sensorMatrix.at<double>(idx, idx2) = spline(stepEval*idx2);
sValues.clear();
}
// Calculating covariance matrix
cv::Mat covMatrix, meanMatrix;
cv::calcCovarMatrix(sensorMatrix, covMatrix, meanMatrix, cv::COVAR_COLS + cv::COVAR_SCALE + cv::COVAR_NORMAL, CV_64F);
sensorMatrix.release();
meanMatrix.release();
// Transforming the matrix
convertToCorrelation(covMatrix);
......@@ -250,6 +275,7 @@ void CSOperator::computePermutation(std::vector<std::vector<reading_t>>& v) {
for(size_t idx=0; idx<v.size(); idx++)
availSet.insert(idx);
// Correlation-based sorting
_permVector.clear();
double corrMax = -1000.0;
double corrCoef = 0.0;
......@@ -265,7 +291,6 @@ void CSOperator::computePermutation(std::vector<std::vector<reading_t>>& v) {
_permVector.push_back(corrIdx);
availSet.erase(corrIdx);
// Correlation-based sorting
while(!availSet.empty()) {
corrMax = -1000;
corrIdx = 0;
......@@ -314,7 +339,7 @@ void CSOperator::convertToCorrelation(cv::Mat &m) {
for(size_t i=0; i<m.size().height; i++) {
for(size_t j=0; j<m.size().width; j++) {
if(i!=j)
m.at<double>(i,j) = m.at<double>(i,j) / (sqrt(m.at<double>(i,i)*m.at<double>(j,j)) + 0.00001);
m.at<double>(i,j) = m.at<double>(i,j) / (sqrt(m.at<double>(i,i))*sqrt(m.at<double>(j,j)) + 0.00001);
}
}
// Getting global correlations
......@@ -322,13 +347,28 @@ void CSOperator::convertToCorrelation(cv::Mat &m) {
m.at<double>(i,i) = 0;
for(size_t j=0; j<m.size().width; j++) {
if(i!=j) {
m.at<double>(i, i) += m.at<double>(i, j);
m.at<double>(i,i) += m.at<double>(i,j);
}
}
m.at<double>(i,i) /= m.size().width - 1;
}
}
// Checks that the training set is actually valid
bool CSOperator::checkTrainingSet(std::vector<std::vector<reading_t>>& v) {
if(v.empty())
return false;
bool foundValid=false;
for(const auto& s : v) {
if(s.size() < 100) {
return false;
} else if(s.size() >= _trainingSamples) {
foundValid = true;
}
}
return foundValid;
}
// -------------------------------------- SIGNATURE COMPUTATION --------------------------------------
// Actual signature computation
......@@ -345,7 +385,7 @@ void CSOperator::computeSignature(U_Ptr unit, uint64_t nowTs) {
for(size_t idx=0; idx<unit->getInputs().size(); idx++) {
_buffer.clear();
if(!_queryEngine.querySensor(unit->getInputs()[idx]->getName(), startTs, endTs, _buffer, false)) {
LOG(debug) << "Operator " + _name + ": cannot read from sensor " << unit->getInputs()[idx]->getName() << " for unit " + unit->getName() + "!";
LOG(debug) << "Operator " + _name + ": cannot read from sensor " << unit->getInputs()[idx]->getName() << "!";
return;
}
normalize(_buffer, idx);
......@@ -356,6 +396,7 @@ void CSOperator::computeSignature(U_Ptr unit, uint64_t nowTs) {
// Computing blocks and storing result into output sensors
reading_t val;
val.timestamp = nowTs;
_blockLen = (float)unit->getInputs().size() / (float)_actualBlocks;
for(auto &s : unit->getOutputs()) {
if(s->getBlockID()<_actualBlocks) {
_bBegin = (size_t)floor(_blockLen*s->getBlockID());
......@@ -378,8 +419,8 @@ void CSOperator::computeSignature(U_Ptr unit, uint64_t nowTs) {
// Normalizes sensor data
void CSOperator::normalize(std::vector<reading_t> &v, size_t idx) {
int64_t denom = _max[idx]!=_min[idx] ? (_max[idx] - _min[idx]) : 1;
for(size_t idx=0; idx<v.size(); idx++)
v[idx].value = (v[idx].value - _min[idx]) / denom;
for(size_t idx2=0; idx2<v.size(); idx2++)
v[idx2].value = (v[idx2].value - _min[idx]) / denom;
}
// Computes average sensor values
......
......@@ -64,7 +64,7 @@ public:
void setInputPath(std::string in) { _modelIn = in; }
void setOutputPath(std::string out) { _modelOut = out; }
void setAggregationWindow(unsigned long long a) { _aggregationWindow = a; }
void setTrainingSamples(unsigned long long s) { if(s>0) _trainingSamples = s; }
void setTrainingSamples(unsigned long long s) { if(s>100) _trainingSamples = s; }
void setNumBlocks(unsigned long long b) { if(b>0) _numBlocks = b; }
void setReuseModel(bool r) { _reuseModel = r; }
......@@ -90,6 +90,7 @@ protected:
void accumulateData(std::vector<std::vector<reading_t>>& v, CSSBPtr s, size_t idx, uint64_t nowTs);
void computeMinMax(std::vector<std::vector<reading_t>>& v);
void computePermutation(std::vector<std::vector<reading_t>>& v);
bool checkTrainingSet(std::vector<std::vector<reading_t>>& v);
void normalize(std::vector<reading_t> &v, size_t idx);
int64_t getAvg(std::vector<reading_t> &v);
......@@ -103,6 +104,7 @@ protected:
unsigned long long _numBlocks;
bool _reuseModel;
bool _trainingPending;
bool _trainingReady;
// CS data
size_t _actualBlocks;
......
......@@ -69,6 +69,7 @@ public:
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
SensorBase::printConfig(ll, lg, leadingSpaces);
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << " Imaginary: " << (_imag ? "true" : "false");
}
protected:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment