Commit cdbe32b4 authored by Micha Müller's avatar Micha Müller
Browse files

Merge branch 'development' of https://gitlab.lrz.de/dcdb/dcdb into development

parents 488fdbf8 6a330f21
......@@ -627,7 +627,7 @@ Additionally, output sensors in analyzers of the Aggregator plugin accept the fo
| Value | Explanation |
|:----- |:----------- |
| operation | Operation to be performed over the input sensors. Can be "sum", "average", "maximum", "minimum", "std" or "percentiles".
| operation | Operation to be performed over the input sensors. Can be "sum", "average", "maximum", "minimum", "std", "percentiles" or "observations".
| percentile | Specific percentile to be computed when using the "percentiles" operation. Can be an integer in the (0,100) range.
## Job Aggregator Plugin <a name="jobaveragePlugin"></a>
......
......@@ -88,6 +88,9 @@ void AggregatorAnalyzer::compute_internal(U_Ptr unit, vector<reading_t> *buffer)
case AggregatorSensorBase::STD:
reading.value = computeStd(_buffer);
break;
case AggregatorSensorBase::OBS:
reading.value = computeObs(_buffer);
break;
default:
break;
}
......@@ -105,6 +108,10 @@ void AggregatorAnalyzer::compute_internal(U_Ptr unit, vector<reading_t> *buffer)
}
}
int64_t AggregatorAnalyzer::computeObs(vector<reading_t> *buffer) {
return buffer->size();
}
int64_t AggregatorAnalyzer::computeSum(vector<reading_t> *buffer) {
int64_t acc=0;
for(const auto& v : *buffer)
......
......@@ -60,6 +60,7 @@ protected:
void compute_internal(U_Ptr unit, vector<reading_t> *buffer);
// A separate method for each operation implies code redundancy, but also better efficiency and less useless
// variables used by specific operations lying around
int64_t computeObs(vector<reading_t> *buffer);
int64_t computeSum(vector<reading_t> *buffer);
int64_t computeAvg(vector<reading_t> *buffer);
int64_t computeMax(vector<reading_t> *buffer);
......
......@@ -51,6 +51,8 @@ void AggregatorConfigurator::sensorBase(AggregatorSensorBase& s, CFG_VAL config)
s.setOperation(AggregatorSensorBase::STD);
else if (opName == "percentiles")
s.setOperation(AggregatorSensorBase::QTL);
else if (opName == "observations")
s.setOperation(AggregatorSensorBase::OBS);
} else if (boost::iequals(val.first, "percentile")) {
size_t quantile = stoull(val.second.data());
if( quantile>0 && quantile<100 )
......
......@@ -47,7 +47,7 @@ class AggregatorSensorBase : public SensorBase {
public:
// Enum to identify aggregation operations
enum aggregationOps_t { SUM = 0, AVG = 1, MAX = 2, MIN = 3, STD = 4, QTL = 5 };
enum aggregationOps_t { SUM = 0, AVG = 1, MAX = 2, MIN = 3, STD = 4, QTL = 5, OBS = 6 };
// Constructor and destructor
AggregatorSensorBase(const std::string& name) : SensorBase(name) {
......@@ -100,6 +100,9 @@ protected:
case QTL:
opString = "percentiles";
break;
case OBS:
opString = "observations";
break;
default:
opString = "invalid";
break;
......
......@@ -51,6 +51,8 @@ void JobAggregatorConfigurator::sensorBase(AggregatorSensorBase& s, CFG_VAL conf
s.setOperation(AggregatorSensorBase::STD);
else if (opName == "percentiles")
s.setOperation(AggregatorSensorBase::QTL);
else if (opName == "observations")
s.setOperation(AggregatorSensorBase::OBS);
} else if (boost::iequals(val.first, "percentile")) {
size_t quantile = stoull(val.second.data());
if( quantile>0 && quantile<100 )
......
......@@ -212,6 +212,7 @@ void RegressorAnalyzer::computeFeatureVector(U_Ptr unit) {
_currentfVector->at<float>(fIdx+2) = (float)_diffsum;
_currentfVector->at<float>(fIdx+3) = (float)_qtl25;
_currentfVector->at<float>(fIdx+4) = (float)_qtl75;
_currentfVector->at<float>(fIdx+5) = (float)_buffer->at(_buffer->size()-1).value;
}
//LOG(error) << "Target: " << _currentTarget;
//LOG(error) << "Vector: ";
......@@ -248,6 +249,9 @@ std::string RegressorAnalyzer::getImportances() {
case 4:
pair.name += " - qtl75";
break;
case 5:
pair.name += " - latest";
break;
default:
break;
}
......
......@@ -36,7 +36,7 @@
#include "opencv4/opencv2/ml.hpp"
#include <math.h>
#define REG_NUMFEATURES 5
#define REG_NUMFEATURES 6
/**
* @brief Regressor analyzer plugin.
......
......@@ -398,9 +398,9 @@ protected:
uint64_t now_ms = now / 1000 / 1000;
uint64_t waitToStart = interval64 - (now_ms%interval64); //synchronize all measurements with other sensors
if(!waitToStart ){ // less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
return (now_ms + interval64)*1000*1000;
return (now_ms + interval64 + 10)*1000*1000;
}
return (now_ms + waitToStart)*1000*1000;
return (now_ms + waitToStart + 10)*1000*1000;
} else {
return now + MS_TO_NS(_interval);
}
......
......@@ -396,7 +396,7 @@ protected:
for (const auto &s : *sensors) {
if (!addedSensors.count(s)) {
SBase uIn(*in);
uIn.setMqtt(_navi->getNodeTopic(s));
uIn.setMqtt(s);
uIn.setName(s);
if (!(_navi->sensorExists(uIn.getName()) || relaxed)) {
delete sensors;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment