Commit 3d72b599 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: Operator Bugfixes

- Fixed a bug that prevented pipelines in collectagent deployments
- Incremented the tolerance threshold of fuzzy queries to 1h
- Fixed the assignment of MQTT topics to job sensors
parent 0a2230f1
......@@ -325,9 +325,11 @@ on the unit they belong to:
* **Root unit**: if the output sensors belong to the _root_ unit, that is, they do not belong to any level in the sensor
hierarchy and are uniquely defined, the respective topics are constructed like in DCDBPusher sensors, by concatenating
the MQTT prefix, operator part and sensor suffix that are defined;
* **Job unit**: if the output sensors belong to a _job_ unit in a job operator (see below), the MQTT topic is constructed
by concatenating the MQTT prefix, the operator part, a job suffix (e.g., /job1334) and finally the sensor suffix;
* **Any other unit**: if the output sensor belongs to any other unit in the sensor tree, its MQTT topic is constructed
by concatenating the MQTT prefix associated to the unit (which is defined as _the portion of the MQTT topic shared by all sensors
belonging to such unit_) and the sensor suffix. The middle part of the topic is padded accordingly to ensure a fixed length.
belonging to such unit_) and the sensor suffix.
#### Pipelining Operators <a name="pipelining"></a>
......
......@@ -238,7 +238,7 @@ protected:
// The job unit is generated as a hierarchical unit with the top level unit and the sub-units having
// the same set of output sensors
jobUnit = unitGen.generateHierarchicalUnit(jobTopic, jobData.nodes, uTemplate->getOutputs(), uTemplate->getInputs(),
uTemplate->getOutputs(), uTemplate->getInputMode(), jobTopic, this->_relaxed);
uTemplate->getOutputs(), uTemplate->getInputMode(), this->_mqttPart, this->_relaxed);
// Initializing sensors if necessary
jobUnit->init(this->_cacheSize, this->_flatten);
this->addToUnitCache(jobUnit);
......
......@@ -568,9 +568,6 @@ protected:
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, config.get_child("global")) {
if (boost::iequals(global.first, "mqttprefix")) {
_mqttPrefix = global.second.data();
if (_mqttPrefix[_mqttPrefix.length()-1] != '/') {
_mqttPrefix.append("/");
}
LOG(debug) << " Using own MQTT-Prefix " << _mqttPrefix;
} else if (boost::iequals(global.first, "cacheInterval")) {
_cacheInterval = stoul(global.second.data());
......
......@@ -144,6 +144,7 @@ void AnalyticsController::run() {
readings.push_back(DCDB::SensorDataStoreReading(sid, readingBuf.timestamp, readingBuf.value));
_sensorCache->storeSensor(sid, readingBuf.timestamp, readingBuf.value);
}
_sensorCache->getSensorMap()[sid].updateBatchSize(op->getMinValues());
_dcdbStore->insertBatch(readings);
_readingCtr += readings.size();
}
......
......@@ -148,7 +148,7 @@ bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint6
uint64_t startTsInt = rel ? now - startTs : startTs;
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 10000000000);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
if(results.empty())
return false;
reading_t reading;
......
......@@ -199,9 +199,6 @@ protected:
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, config.get_child("global")) {
if (boost::iequals(global.first, "mqttprefix")) {
_mqttPrefix = global.second.data();
if (_mqttPrefix[_mqttPrefix.length()-1] != '/') {
_mqttPrefix.append("/");
}
} else if (boost::iequals(global.first, "cacheInterval")) {
_cacheInterval = stoul(global.second.data());
_cacheInterval *= 1000;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment