2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit 95139a6b authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: integrated grouped sensor query feature

- Uses an overloaded version of the querySensor() method
parent aa6fd6cf
...@@ -45,6 +45,8 @@ struct qeJobData { ...@@ -45,6 +45,8 @@ struct qeJobData {
//Typedef for the callback used to retrieve sensors //Typedef for the callback used to retrieve sensors
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool); typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
//Typedef for the callback used to retrieve sensors
typedef bool (*QueryEngineGroupCallback)(const vector<string>&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
//Typedef for the job retrieval callback //Typedef for the job retrieval callback
typedef bool (*QueryEngineJobCallback)(const string&, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool); typedef bool (*QueryEngineJobCallback)(const string&, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
//Typedef for the metadata retrieval callback //Typedef for the metadata retrieval callback
...@@ -140,6 +142,18 @@ public: ...@@ -140,6 +142,18 @@ public:
*/ */
void setQueryCallback(QueryEngineCallback cb) { _callback = cb; } void setQueryCallback(QueryEngineCallback cb) { _callback = cb; }
/**
* @brief Sets the internal callback to retrieve sensor data in groups
*
* This method sets the internal callback that will be used by the QueryEngine to retrieve sensor
* data and thus implement an abstraction layer. The callback must store all values for the input
* vector of sensor names, in the given time range, into the "buffer" vector. If such vector has
* not been supplied (NULL), the callback must allocate and return a new one.
*
* @param cb Pointer to a function of type QueryEngineCallback
*/
void setGroupQueryCallback(QueryEngineGroupCallback cb) { _gCallback = cb; }
/** /**
* @brief Sets the internal callback to retrieve job data * @brief Sets the internal callback to retrieve job data
* *
...@@ -233,6 +247,22 @@ public: ...@@ -233,6 +247,22 @@ public:
return _callback(name, startTs, endTs, buffer, rel); return _callback(name, startTs, endTs, buffer, rel);
} }
/**
* @brief Perform a sensor query
*
* This is an overloaded version of the querySensor() method. It accepts a vector of sensor names
* instead of a single sensor. These will be queried collectively, and the result is returned.
*
* @return True if successful, false otherwise
*/
bool querySensor(const vector<string>& names, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true) {
if(!_gCallback)
throw runtime_error("Query Engine: callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
throw invalid_argument("Query Engine: invalid time range!");
return _gCallback(names, startTs, endTs, buffer, rel);
}
/** /**
* @brief Perform a job query * @brief Perform a job query
* *
...@@ -341,6 +371,8 @@ private: ...@@ -341,6 +371,8 @@ private:
shared_ptr<map<string, SBasePtr>> _sensorMap; shared_ptr<map<string, SBasePtr>> _sensorMap;
// Callback used to retrieve sensor data // Callback used to retrieve sensor data
QueryEngineCallback _callback; QueryEngineCallback _callback;
// Callback used to retrieve sensor data in groups
QueryEngineGroupCallback _gCallback;
// Callback used to retrieve job data // Callback used to retrieve job data
QueryEngineJobCallback _jCallback; QueryEngineJobCallback _jCallback;
// Callback used to retrieve metadata // Callback used to retrieve metadata
......
...@@ -133,59 +133,79 @@ bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_ ...@@ -133,59 +133,79 @@ bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_
return true; return true;
} }
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) { bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
// Returning NULL if the query engine is being updated // Returning NULL if the query engine is being updated
if(queryEngine.updating.load()) return false; if(queryEngine.updating.load()) return false;
++queryEngine.access; ++queryEngine.access;
std::string topic=name;
// Getting the topic of the queried sensor from the Navigator std::list<DCDB::SensorId> topics;
// If not found, we try to use the input name as topic std::string topic;
try { sensorCache_t& sensorMap = mySensorCache.getSensorMap();
topic = queryEngine.getNavigator()->getNodeTopic(name); size_t successCtr = 0;
} catch(const std::domain_error& e) {} for(const auto& name : names) {
DCDB::SensorId sid; // Getting the topic of the queried sensor from the Navigator
// Creating a SID to perform the query // If not found, we try to use the input name as topic
if(!sid.mqttTopicConvert(topic)) { try {
--queryEngine.access; topic = queryEngine.getNavigator()->getNodeTopic(name);
return false; } catch (const std::domain_error &e) { topic = name; }
} DCDB::SensorId sid;
if(mySensorCache.getSensorMap().count(sid) > 0) { // Creating a SID to perform the query
CacheEntry &entry = mySensorCache.getSensorMap()[sid]; if (sid.mqttTopicConvert(topic)) {
if (entry.getView(startTs, endTs, buffer, rel)) { if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) {
--queryEngine.access; // Data was found, can continue to next SID
return true; successCtr++;
} else {
// This happens only if no data was found in the local cache
topics.push_back(sid);
}
} }
} }
// If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
try { if(!topics.empty()) {
DCDB::PublicSensor publicSensor; try {
publicSensor.name = name; std::list <DCDB::SensorDataStoreReading> results;
publicSensor.pattern = topic; uint64_t now = getTimestamp();
std::list <DCDB::SensorDataStoreReading> results; //Converting relative timestamps to absolute
DCDB::Sensor sensor(dcdbConn, publicSensor); uint64_t startTsInt = rel ? now - startTs : startTs;
uint64_t now = getTimestamp(); uint64_t endTsInt = rel ? now - endTs : endTs;
//Converting relative timestamps to absolute DCDB::TimeStamp start(startTsInt), end(endTsInt);
uint64_t startTsInt = rel ? now - startTs : startTs; uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
uint64_t endTsInt = rel ? now - endTs : endTs; // If timestamps are equal we perform a fuzzy query
DCDB::TimeStamp start(startTsInt), end(endTsInt); if(startTsInt == endTsInt) {
sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000); topics.front().setRsvd(startWs);
if(results.empty()) { mySensorDataStore->fuzzyQuery(results, topics, start, 3600000000000);
--queryEngine.access; }
return false; // Else, we iterate over the weekstamps (if more than one) and perform range queries
} else {
reading_t reading; for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
for (const auto &r : results) { topics.front().setRsvd(currWs);
reading.value = r.value; mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE);
reading.timestamp = r.timeStamp.getRaw(); }
buffer.push_back(reading); }
if (!results.empty()) {
successCtr++;
reading_t reading;
for (const auto &r : results) {
reading.value = r.value;
reading.timestamp = r.timeStamp.getRaw();
buffer.push_back(reading);
}
}
} }
catch (const std::exception &e) {}
} }
catch(const std::exception& e) {
--queryEngine.access;
return false;
}
--queryEngine.access; --queryEngine.access;
return true; return successCtr>0;
}
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
// Returning NULL if the query engine is being updated
if(queryEngine.updating.load()) return false;
std::vector<std::string> nameWrapper;
nameWrapper.push_back(name);
return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel);
} }
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) { bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
...@@ -715,6 +735,7 @@ int main(int argc, char* const argv[]) { ...@@ -715,6 +735,7 @@ int main(int argc, char* const argv[]) {
queryEngine.setJobFilter(analyticsSettings.jobFilter); queryEngine.setJobFilter(analyticsSettings.jobFilter);
queryEngine.setSensorHierarchy(analyticsSettings.hierarchy); queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
queryEngine.setQueryCallback(sensorQueryCallback); queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
queryEngine.setMetadataQueryCallback(metadataQueryCallback); queryEngine.setMetadataQueryCallback(metadataQueryCallback);
queryEngine.setJobQueryCallback(jobQueryCallback); queryEngine.setJobQueryCallback(jobQueryCallback);
if(!analyticsController->initialize(settings, argv[argc - 1])) if(!analyticsController->initialize(settings, argv[argc - 1]))
......
...@@ -92,6 +92,16 @@ bool sensorQueryCallback(const string &name, const uint64_t startTs, const uint6 ...@@ -92,6 +92,16 @@ bool sensorQueryCallback(const string &name, const uint64_t startTs, const uint6
return found; return found;
} }
bool sensorGroupQueryCallback(const std::vector<string> &names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t> &buffer, const bool rel) {
// Returning NULL if the query engine is being updated
if (_queryEngine.updating.load())
return false;
bool outcome = false;
for(const auto& name : names)
outcome = outcome || sensorQueryCallback(name, startTs, endTs, buffer, rel);
return outcome;
}
bool metadataQueryCallback(const string &name, SensorMetadata &buffer) { bool metadataQueryCallback(const string &name, SensorMetadata &buffer) {
// Returning NULL if the query engine is being updated // Returning NULL if the query engine is being updated
if (_queryEngine.updating.load()) if (_queryEngine.updating.load())
...@@ -306,6 +316,7 @@ int main(int argc, char **argv) { ...@@ -306,6 +316,7 @@ int main(int argc, char **argv) {
_queryEngine.setJobFilter(analyticsSettings.jobFilter); _queryEngine.setJobFilter(analyticsSettings.jobFilter);
_queryEngine.setSensorHierarchy(analyticsSettings.hierarchy); _queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
_queryEngine.setQueryCallback(sensorQueryCallback); _queryEngine.setQueryCallback(sensorQueryCallback);
_queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
_queryEngine.setMetadataQueryCallback(metadataQueryCallback); _queryEngine.setMetadataQueryCallback(metadataQueryCallback);
if (!_operatorManager->load(argv[argc - 1], "dcdbpusher.conf", pluginSettings)) { if (!_operatorManager->load(argv[argc - 1], "dcdbpusher.conf", pluginSettings)) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment