Commit e98e25a3 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: reducing verbosity upon failed queries

parent cc47c480
...@@ -340,12 +340,15 @@ public: ...@@ -340,12 +340,15 @@ public:
else { else {
units = new set<string>(); units = new set<string>();
for(const auto &u : uNames) { for(const auto &u : uNames) {
units->insert(u);
// The unit specified as input must belong to the domain of the outputs // The unit specified as input must belong to the domain of the outputs
if (!outputs.empty() && !nodeBelongsToPattern(u, outputs[0]->getName())) { if (!outputs.empty() && !nodeBelongsToPattern(u, outputs[0]->getName()))
delete units; LOG(debug) << "UnitGenerator: Node " + u + " does not belong to this unit domain!";
throw invalid_argument("UnitGenerator: Node " + u + " does not belong to this unit domain!"); else
} units->insert(u);
}
if(units->empty()) {
delete units;
throw invalid_argument("UnitGenerator: All input nodes do not belong to this unit domain!");
} }
} }
...@@ -357,8 +360,8 @@ public: ...@@ -357,8 +360,8 @@ public:
unitObjects->push_back(_generateUnit(u, inputs, outputs, inputMode, mqttPrefix, enforceTopics, relaxed)); unitObjects->push_back(_generateUnit(u, inputs, outputs, inputMode, mqttPrefix, enforceTopics, relaxed));
} catch(const exception& e) { } catch(const exception& e) {
if(units->size()>1) { if(units->size()>1) {
LOG(error) << e.what(); LOG(debug) << e.what();
LOG(error) << "UnitGenerator: cannot build unit " << u << "!"; LOG(debug) << "UnitGenerator: cannot build unit " << u << "!";
continue; continue;
} else { } else {
delete units; delete units;
...@@ -438,8 +441,8 @@ public: ...@@ -438,8 +441,8 @@ public:
} catch (const invalid_argument &e) { } catch (const invalid_argument &e) {
topUnit->clear(); topUnit->clear();
if(uNames.size()>1) { if(uNames.size()>1) {
LOG(error) << e.what(); LOG(debug) << e.what();
LOG(error) << "HierarchicalUnitGenerator: cannot build unit " << u << "!"; LOG(debug) << "HierarchicalUnitGenerator: cannot build unit " << u << "!";
continue; continue;
} else { } else {
delete unitObjects; delete unitObjects;
......
...@@ -57,8 +57,10 @@ void AggregatorOperator::compute(U_Ptr unit) { ...@@ -57,8 +57,10 @@ void AggregatorOperator::compute(U_Ptr unit) {
// Getting the most recent values as specified in _window // Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector // Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
elCtr = _buffer.size(); elCtr = _buffer.size();
if(!_queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative) || _buffer.size()<=elCtr) if(!_queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative) || _buffer.size()<=elCtr) {
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + in->getName() + "!"); LOG(debug) << "Operator " + _name + ": cannot read from sensor " + in->getName() + "!";
return;
}
} }
compute_internal(unit, _buffer); compute_internal(unit, _buffer);
} }
......
...@@ -117,8 +117,12 @@ void ClusteringOperator::compute(U_Ptr unit) { ...@@ -117,8 +117,12 @@ void ClusteringOperator::compute(U_Ptr unit) {
_trainingSet = cv::Mat(); _trainingSet = cv::Mat();
_tempSet = cv::Mat(); _tempSet = cv::Mat();
for(const auto& su : unit->getSubUnits()) { for(const auto& su : unit->getSubUnits()) {
computeFeatureVector(su); if(computeFeatureVector(su))
_tempSet.push_back(_currentfVector); _tempSet.push_back(_currentfVector);
}
if(_tempSet.empty()) {
LOG(debug) << "Operator " + _name + ": could not build any feature vector!";
return;
} }
if(_trainingSet.empty()) if(_trainingSet.empty())
_trainingSet = _tempSet; _trainingSet = _tempSet;
...@@ -174,7 +178,7 @@ void ClusteringOperator::compute(U_Ptr unit) { ...@@ -174,7 +178,7 @@ void ClusteringOperator::compute(U_Ptr unit) {
_tempSet = cv::Mat(); _tempSet = cv::Mat();
} }
void ClusteringOperator::computeFeatureVector(U_Ptr unit, uint64_t offset) { bool ClusteringOperator::computeFeatureVector(U_Ptr unit, uint64_t offset) {
_currentfVector = cv::Mat(1, unit->getInputs().size(), CV_32F); _currentfVector = cv::Mat(1, unit->getInputs().size(), CV_32F);
std::vector<ClusteringSBPtr>& inputs = unit->getInputs(); std::vector<ClusteringSBPtr>& inputs = unit->getInputs();
uint64_t endTs = getTimestamp() - offset; uint64_t endTs = getTimestamp() - offset;
...@@ -182,8 +186,10 @@ void ClusteringOperator::computeFeatureVector(U_Ptr unit, uint64_t offset) { ...@@ -182,8 +186,10 @@ void ClusteringOperator::computeFeatureVector(U_Ptr unit, uint64_t offset) {
for(size_t idx=0; idx<inputs.size(); idx++) { for(size_t idx=0; idx<inputs.size(); idx++) {
_mean=0; _mean=0;
_buffer.clear(); _buffer.clear();
if(!_queryEngine.querySensor(inputs[idx]->getName(), startTs, endTs, _buffer, false) || _buffer.empty()) if(!_queryEngine.querySensor(inputs[idx]->getName(), startTs, endTs, _buffer, false) || _buffer.empty()) {
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!"); LOG(debug) << "Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!";
return false;
}
// Computing MEAN // Computing MEAN
for(const auto& v : _buffer) for(const auto& v : _buffer)
...@@ -193,6 +199,7 @@ void ClusteringOperator::computeFeatureVector(U_Ptr unit, uint64_t offset) { ...@@ -193,6 +199,7 @@ void ClusteringOperator::computeFeatureVector(U_Ptr unit, uint64_t offset) {
// Casting and storing the statistical features // Casting and storing the statistical features
_currentfVector.at<float>(idx) = (float)_mean; _currentfVector.at<float>(idx) = (float)_mean;
} }
return true;
} }
bool ClusteringOperator::isOutlier(cv::Mat vec1, cv::Mat vec2, cv::Mat cov) { bool ClusteringOperator::isOutlier(cv::Mat vec1, cv::Mat vec2, cv::Mat cov) {
......
...@@ -80,7 +80,7 @@ public: ...@@ -80,7 +80,7 @@ public:
protected: protected:
virtual void compute(U_Ptr unit) override; virtual void compute(U_Ptr unit) override;
void computeFeatureVector(U_Ptr unit, uint64_t offset=0); bool computeFeatureVector(U_Ptr unit, uint64_t offset=0);
bool isOutlier(cv::Mat vec1, cv::Mat vec2, cv::Mat cov); bool isOutlier(cv::Mat vec1, cv::Mat vec2, cv::Mat cov);
std::string printMeans(); std::string printMeans();
std::string printCovs(); std::string printCovs();
......
...@@ -61,7 +61,7 @@ void FilesinkOperator::compute(U_Ptr unit) { ...@@ -61,7 +61,7 @@ void FilesinkOperator::compute(U_Ptr unit) {
// Clearing the buffer // Clearing the buffer
_buffer.clear(); _buffer.clear();
if(!_queryEngine.querySensor(in->getName(), 0, 0, _buffer) || _buffer.empty()) if(!_queryEngine.querySensor(in->getName(), 0, 0, _buffer) || _buffer.empty())
LOG(error) << "Operator " + _name + ": cannot read from sensor " + in->getName() + "!"; LOG(debug) << "Operator " + _name + ": cannot read from sensor " + in->getName() + "!";
else if(!in->writeFile(_buffer[_buffer.size()-1])) else if(!in->writeFile(_buffer[_buffer.size()-1]))
LOG(error) << "Operator " + _name + ": failed file write for sensor " << in->getName() << "!"; LOG(error) << "Operator " + _name + ": failed file write for sensor " << in->getName() << "!";
} }
......
...@@ -49,7 +49,9 @@ void ClassifierOperator::printConfig(LOG_LEVEL ll) { ...@@ -49,7 +49,9 @@ void ClassifierOperator::printConfig(LOG_LEVEL ll) {
} }
void ClassifierOperator::compute(U_Ptr unit) { void ClassifierOperator::compute(U_Ptr unit) {
computeFeatureVector(unit); // Not much to do without a valid feature vector
if(!computeFeatureVector(unit))
return;
if (_trainingPending && _streaming) { if (_trainingPending && _streaming) {
if (!_trainingSet) if (!_trainingSet)
_trainingSet = new cv::Mat(); _trainingSet = new cv::Mat();
......
...@@ -111,7 +111,9 @@ void RegressorOperator::printConfig(LOG_LEVEL ll) { ...@@ -111,7 +111,9 @@ void RegressorOperator::printConfig(LOG_LEVEL ll) {
} }
void RegressorOperator::compute(U_Ptr unit) { void RegressorOperator::compute(U_Ptr unit) {
computeFeatureVector(unit); // Not much to do without a valid feature vector
if(!computeFeatureVector(unit))
return;
if (_trainingPending && _streaming) { if (_trainingPending && _streaming) {
if (!_trainingSet) if (!_trainingSet)
_trainingSet = new cv::Mat(); _trainingSet = new cv::Mat();
...@@ -190,7 +192,7 @@ void RegressorOperator::shuffleTrainingSet() { ...@@ -190,7 +192,7 @@ void RegressorOperator::shuffleTrainingSet() {
} }
} }
void RegressorOperator::computeFeatureVector(U_Ptr unit) { bool RegressorOperator::computeFeatureVector(U_Ptr unit) {
if(!_currentfVector) if(!_currentfVector)
_currentfVector = new cv::Mat(1, unit->getInputs().size()*REG_NUMFEATURES, CV_32F); _currentfVector = new cv::Mat(1, unit->getInputs().size()*REG_NUMFEATURES, CV_32F);
int64_t val; int64_t val;
...@@ -201,8 +203,10 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) { ...@@ -201,8 +203,10 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) {
for(idx=0; idx<inputs.size(); idx++) { for(idx=0; idx<inputs.size(); idx++) {
_mean=0; _std=0; _diffsum=0; _qtl25=0; _qtl75=0; _latest=0; _mean=0; _std=0; _diffsum=0; _qtl25=0; _qtl75=0; _latest=0;
_buffer.clear(); _buffer.clear();
if(!_queryEngine.querySensor(inputs[idx]->getName(), startTs, endTs, _buffer, false) || _buffer.empty()) if(!_queryEngine.querySensor(inputs[idx]->getName(), startTs, endTs, _buffer, false) || _buffer.empty()) {
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!"); LOG(debug) << "Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!";
return false;
}
_latest = _buffer.back().value; _latest = _buffer.back().value;
if (inputs[idx]->getTrainingTarget()) if (inputs[idx]->getTrainingTarget())
_currentTarget = (float)_latest; _currentTarget = (float)_latest;
...@@ -261,6 +265,7 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) { ...@@ -261,6 +265,7 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) {
//LOG(error) << "Vector: "; //LOG(error) << "Vector: ";
//for(idx=0; idx<_currentfVector->size().width;idx++) //for(idx=0; idx<_currentfVector->size().width;idx++)
// LOG(error) << _currentfVector->at<float>(idx); // LOG(error) << _currentfVector->at<float>(idx);
return true;
} }
std::string RegressorOperator::getImportances() { std::string RegressorOperator::getImportances() {
......
...@@ -76,7 +76,7 @@ public: ...@@ -76,7 +76,7 @@ public:
protected: protected:
virtual void compute(U_Ptr unit) override; virtual void compute(U_Ptr unit) override;
void computeFeatureVector(U_Ptr unit); bool computeFeatureVector(U_Ptr unit);
void trainRandomForest(bool categorical=false); void trainRandomForest(bool categorical=false);
void shuffleTrainingSet(); void shuffleTrainingSet();
std::string getImportances(); std::string getImportances();
......
...@@ -82,7 +82,7 @@ uint64_t TesterOperator::compute_internal(U_Ptr unit) { ...@@ -82,7 +82,7 @@ uint64_t TesterOperator::compute_internal(U_Ptr unit) {
} }
} }
if(errorLog) if(errorLog)
LOG(error) << "Operator " << _name << ": could not read from one or more sensors!"; LOG(debug) << "Operator " << _name << ": could not read from one or more sensors!";
outR.value = (int64_t)elCtr; outR.value = (int64_t)elCtr;
unit->getOutputs()[0]->storeReading(outR); unit->getOutputs()[0]->storeReading(outR);
return elCtr; return elCtr;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment