Commit aa12d1dc authored by Alessio Netti's avatar Alessio Netti

Analytics: preventing dynamic units from being auto-published

parent 3a310364
......@@ -71,6 +71,7 @@ public:
_duplicate(false),
_streaming(true),
_sync(true),
_dynamic(false),
_unitID(-1),
_keepRunning(0),
_minValues(1),
......@@ -94,6 +95,7 @@ public:
_duplicate(other._duplicate),
_streaming(other._streaming),
_sync(other._sync),
_dynamic(other._dynamic),
_unitID(other._unitID),
_keepRunning(other._keepRunning),
_minValues(other._minValues),
......@@ -123,6 +125,7 @@ public:
_duplicate = other._duplicate;
_streaming = other._streaming;
_sync = other._sync;
_dynamic = other._dynamic;
_keepRunning = other._keepRunning;
_minValues = other._minValues;
_interval = other._interval;
......@@ -243,6 +246,7 @@ public:
unsigned getUnitCacheLimit() const { return _unitCacheLimit; }
unsigned getDelayInterval() const { return _delayInterval; }
int getUnitID() const { return _unitID; }
bool getDynamic() { return _dynamic; }
// Setter methods
void setName(const string& name) { _name = name; }
......@@ -288,6 +292,8 @@ protected:
bool _streaming;
// If true, the computation intervals are synchronized
bool _sync;
// Indicates whether the analyzer generates units dynamically at runtime, or only at initialization
bool _dynamic;
// ID of the units this analyzer works on
int _unitID;
// Determines if the analyzer can keep running or must terminate
......
......@@ -58,6 +58,7 @@ public:
_jobDataVec(nullptr) {
_unitAccess.store(false);
this->_dynamic = true;
}
/**
......@@ -69,6 +70,7 @@ public:
_jobDataVec(nullptr) {
_unitAccess.store(false);
this->_dynamic = true;
}
/**
......@@ -78,6 +80,7 @@ public:
JobAnalyzerTemplate& operator=(const JobAnalyzerTemplate& other) {
AnalyzerTemplate<S>::operator=(other);
_jobDataVec = nullptr;
this->_dynamic = true;
}
/**
......
......@@ -164,7 +164,7 @@ bool AnalyticsController::publishSensors() {
uint64_t publishCtr = 0;
for (auto &p : _analyticsPlugins)
for (const auto &a : p.configurator->getAnalyzers())
if(a->getStreaming()) {
if(a->getStreaming() && !a->getDynamic()) {
for (const auto &u : a->getUnits())
for (const auto &s : u->getBaseOutputs()) {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
......
......@@ -243,7 +243,7 @@ bool MQTTPusher::sendMappings() {
// Performing auto-publish for analytics output sensors
for(auto& p: _analyticsPlugins)
for(auto& a: p.configurator->getAnalyzers())
if(a->getStreaming()) {
if(a->getStreaming() && !a->getDynamic()) {
for (auto &u: a->getUnits())
for (auto &s: u->getBaseOutputs()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
......@@ -283,16 +283,19 @@ bool MQTTPusher::halt(unsigned short timeout) {
void MQTTPusher::computeMsgRate() {
// Computing number of sent MQTT messages per second
float msgRate = 0;
bool dynWarning = false;
for(auto& p : _plugins)
for(const auto& g : p.configurator->getSensorGroups())
msgRate += (float)g->getSensors().size() * ( 1000.0f / (float)g->getInterval() ) / (float)g->getMinValues();
for(auto& p : _analyticsPlugins)
for(const auto& a : p.configurator->getAnalyzers())
if(a->getStreaming()) {
for (const auto &u : a->getUnits())
msgRate += (float) u->getBaseOutputs().size() * (1000.0f / (float) a->getInterval()) / (float) a->getMinValues();
a->releaseUnits();
}
for(const auto& a : p.configurator->getAnalyzers()) {
if (a->getStreaming() && !a->getDynamic()) {
for (const auto &u : a->getUnits())
msgRate += (float) u->getBaseOutputs().size() * (1000.0f / (float) a->getInterval()) / (float) a->getMinValues();
a->releaseUnits();
} else if (a->getDynamic())
dynWarning = true;
}
// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
if(_maxNumberOfMessages >= 0 && _msgCap != MINIMUM) {
_msgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages ? DISABLED : ENABLED;
......@@ -307,4 +310,6 @@ void MQTTPusher::computeMsgRate() {
_maxNumberOfMessages = msgRate + 10;
LOGM(info) << "Enforcing message cap of " << _maxNumberOfMessages << " msg/s against actual " << msgRate << " msg/s.";
}
if(_msgCap!=DISABLED && dynWarning)
LOGM(warning) << "Attention! The computed message rate does not account for analyzers with dynamically-generated sensors.";
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment