Commit a4041689 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: compute method of AnalyzerTemplate now accepts shared pointers to Units

parent 5a533b0b
...@@ -41,29 +41,29 @@ void AggregatorAnalyzer::printConfig(LOG_LEVEL ll) { ...@@ -41,29 +41,29 @@ void AggregatorAnalyzer::printConfig(LOG_LEVEL ll) {
AnalyzerTemplate<SensorBase>::printConfig(ll); AnalyzerTemplate<SensorBase>::printConfig(ll);
} }
void AggregatorAnalyzer::compute(int unitID) { void AggregatorAnalyzer::compute(U_Ptr unit) {
switch(_op) { switch(_op) {
case SUM: case SUM:
computeSum(unitID); computeSum(unit);
break; break;
case AVG: case AVG:
computeAvg(unitID); computeAvg(unit);
break; break;
case MIN: case MIN:
computeMin(unitID); computeMin(unit);
break; break;
case MAX: case MAX:
computeMax(unitID); computeMax(unit);
break; break;
default: default:
break; break;
} }
} }
void AggregatorAnalyzer::computeSum(int unitID) { void AggregatorAnalyzer::computeSum(U_Ptr unit) {
int64_t acc=0; int64_t acc=0;
for(const auto& in : _units[unitID]->getInputs()) { for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window // Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer); _buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) { if(!_buffer || _buffer->empty()) {
...@@ -78,13 +78,13 @@ void AggregatorAnalyzer::computeSum(int unitID) { ...@@ -78,13 +78,13 @@ void AggregatorAnalyzer::computeSum(int unitID) {
reading_t out; reading_t out;
out.timestamp = getTimestamp(); out.timestamp = getTimestamp();
out.value = acc; out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out); unit->getOutputs()[0]->storeReading(out);
} }
void AggregatorAnalyzer::computeAvg(int unitID) { void AggregatorAnalyzer::computeAvg(U_Ptr unit) {
int64_t acc=0, ctr=0; int64_t acc=0, ctr=0;
for(const auto& in : _units[unitID]->getInputs()) { for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window // Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer); _buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) { if(!_buffer || _buffer->empty()) {
...@@ -103,13 +103,13 @@ void AggregatorAnalyzer::computeAvg(int unitID) { ...@@ -103,13 +103,13 @@ void AggregatorAnalyzer::computeAvg(int unitID) {
reading_t out; reading_t out;
out.timestamp = getTimestamp(); out.timestamp = getTimestamp();
out.value = acc; out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out); unit->getOutputs()[0]->storeReading(out);
} }
void AggregatorAnalyzer::computeMax(int unitID) { void AggregatorAnalyzer::computeMax(U_Ptr unit) {
int64_t acc=0; int64_t acc=0;
for(const auto& in : _units[unitID]->getInputs()) { for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window // Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer); _buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) { if(!_buffer || _buffer->empty()) {
...@@ -125,14 +125,14 @@ void AggregatorAnalyzer::computeMax(int unitID) { ...@@ -125,14 +125,14 @@ void AggregatorAnalyzer::computeMax(int unitID) {
reading_t out; reading_t out;
out.timestamp = getTimestamp(); out.timestamp = getTimestamp();
out.value = acc; out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out); unit->getOutputs()[0]->storeReading(out);
} }
void AggregatorAnalyzer::computeMin(int unitID) { void AggregatorAnalyzer::computeMin(U_Ptr unit) {
int64_t acc=0; int64_t acc=0;
bool minInit=false; bool minInit=false;
for(const auto& in : _units[unitID]->getInputs()) { for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window // Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer); _buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) { if(!_buffer || _buffer->empty()) {
...@@ -150,6 +150,6 @@ void AggregatorAnalyzer::computeMin(int unitID) { ...@@ -150,6 +150,6 @@ void AggregatorAnalyzer::computeMin(int unitID) {
reading_t out; reading_t out;
out.timestamp = getTimestamp(); out.timestamp = getTimestamp();
out.value = acc; out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out); unit->getOutputs()[0]->storeReading(out);
} }
...@@ -47,13 +47,13 @@ public: ...@@ -47,13 +47,13 @@ public:
private: private:
void compute(int unitID) override; void compute(U_Ptr unit) override;
// A separate method for each operation implies code redundancy, but also better efficiency and less useless // A separate method for each operation implies code redundancy, but also better efficiency and less useless
// variables used by specific operations lying around // variables used by specific operations lying around
void computeSum(int unitID); void computeSum(U_Ptr unit);
void computeAvg(int unitID); void computeAvg(U_Ptr unit);
void computeMax(int unitID); void computeMax(U_Ptr unit);
void computeMin(int unitID); void computeMin(U_Ptr unit);
vector<reading_t> *_buffer = NULL; vector<reading_t> *_buffer = NULL;
unsigned long long _window; unsigned long long _window;
......
...@@ -266,18 +266,7 @@ protected: ...@@ -266,18 +266,7 @@ protected:
* *
*/ */
virtual void computeAsync() = 0; virtual void computeAsync() = 0;
/**
* @brief Data analytics computation logic
*
* This method contains the actual logic used by the analyzed, and is automatically called by
* the computeAsync method.
*
* @param unitID index of the unit over which computation must be performed
*/
virtual void compute(int unitID) = 0;
// Name of this analyzer // Name of this analyzer
string _name; string _name;
// MQTT part (see docs) of this analyzer // MQTT part (see docs) of this analyzer
......
...@@ -292,17 +292,12 @@ public: ...@@ -292,17 +292,12 @@ public:
if (!s->isInit()) if (!s->isInit())
s->initSensor(_cacheSize); s->initSensor(_cacheSize);
addUnit(tempUnit); compute(tempUnit);
int onDemandUnitID = _units.size() - 1; for (const auto &o : tempUnit->getOutputs()) {
compute(onDemandUnitID);
for (const auto &o : _units[onDemandUnitID]->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue())); outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue(); o->clearReadingQueue();
} }
_units.erase(_units.begin() + onDemandUnitID);
_baseUnits.erase(_baseUnits.begin() + onDemandUnitID);
} catch(const exception& e) { } catch(const exception& e) {
_onDemandLock.store(false); _onDemandLock.store(false);
throw; throw;
...@@ -416,10 +411,10 @@ protected: ...@@ -416,10 +411,10 @@ protected:
try { try {
if (_duplicate && _unitID >= 0) if (_duplicate && _unitID >= 0)
compute(_unitID); compute(_units[_unitID]);
else else
for (unsigned i = 0; i < _units.size(); i++) for (unsigned i = 0; i < _units.size(); i++)
compute((int)i); compute(_units[i]);
} catch(const exception& e) { } catch(const exception& e) {
LOG(error) << "Analyzer " + _name + ": internal error " + e.what() + " during computation!"; LOG(error) << "Analyzer " + _name + ": internal error " + e.what() + " during computation!";
} }
...@@ -432,6 +427,16 @@ protected: ...@@ -432,6 +427,16 @@ protected:
_pendingTasks--; _pendingTasks--;
} }
/**
* @brief Data analytics computation logic
*
* This method contains the actual logic used by the analyzed, and is automatically called by
* the computeAsync method.
*
* @param unit Shared pointer to unit to be processed
*/
virtual void compute(U_Ptr unit) = 0;
// Cache for frequently used units in ondemand mode // Cache for frequently used units in ondemand mode
map<string, U_Ptr>* _ondemandCache; map<string, U_Ptr>* _ondemandCache;
// Helper map to keep track of the cache insertion times // Helper map to keep track of the cache insertion times
......
...@@ -441,7 +441,7 @@ int main(int argc, char** argv) { ...@@ -441,7 +441,7 @@ int main(int argc, char** argv) {
//will only continue if interrupted by SIGINT and threads were stopped //will only continue if interrupted by SIGINT and threads were stopped
mqttThread.join(); mqttThread.join();
LOG(info) << "MQTTPusher stopped"; LOG(info) << "MQTTPusher stopped.";
LOG(info) << "Tearing down objects..."; LOG(info) << "Tearing down objects...";
_sensorMap.clear(); _sensorMap.clear();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment