Commit 5425c0f8 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: support for sensor metadata queries in QueryEngine

parent 1c33996d
......@@ -30,6 +30,7 @@
#include "sensornavigator.h"
#include "sensorbase.h"
#include "metadatastore.h"
#include <atomic>
using namespace std;
......@@ -46,6 +47,8 @@ struct qeJobData {
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
//Typedef for the job retrieval callback
typedef bool (*QueryEngineJobCallback)(const uint32_t, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
//Typedef for the metadata retrieval callback
typedef bool (*QueryEngineMetadataCallback)(const string&, SensorMetadata&);
/**
* @brief Class that grants query access to local and remote sensors
......@@ -135,7 +138,7 @@ public:
*
* @param cb Pointer to a function of type QueryEngineCallback
*/
void setQueryCallback(QueryEngineCallback cb) { _callback = cb; }
void setQueryCallback(QueryEngineCallback cb) { _callback = cb; }
/**
* @brief Sets the internal callback to retrieve job data
......@@ -148,6 +151,17 @@ public:
*/
void setJobQueryCallback(QueryEngineJobCallback jcb) { _jCallback = jcb; }
/**
* @brief Sets the internal callback to retrieve sensor metadata data
*
* This method sets the internal callback that will be used by the QueryEngine to retrieve sensor
* metadata and thus implement an abstraction layer. Behavior of the callback must be identical to
* that specified in setQueryCallback.
*
* @param mcb Pointer to a function of type QueryEngineMetadataCallback
*/
void setMetadataQueryCallback(QueryEngineMetadataCallback mcb) { _mCallback = mcb; }
/**
* @brief Returns the internal SensorNavigator object
*
......@@ -251,6 +265,24 @@ public:
return _jCallback(jobId, startTs, endTs, buffer, rel, range);
}
/**
* @brief Perform a sensor metadata query
*
* This method allows to retrieve the metadata of available sensors. The input "buffer" object
* allows to re-use memory over successive readings. Note that in order to use this method, a
* callback must have been set through the setMetadataQueryCallback method. If not, this
* method will throw an exception.
*
* @param name Name of the sensor to be queried
* @param buffer SensorMetadata object in which to store the result
* @return True if successful, false otherwise
*/
bool queryMetadata(const string& name, SensorMetadata& buffer) {
if(!_mCallback)
throw runtime_error("Query Engine: sensor metadata callback not set!");
return _mCallback(name, buffer);
}
/**
* @brief Locks access to the QueryEngine
*
......@@ -288,6 +320,7 @@ private:
_sensorMap = NULL;
_callback = NULL;
_jCallback = NULL;
_mCallback = NULL;
updating.store(false);
access.store(0);
}
......@@ -310,6 +343,8 @@ private:
QueryEngineCallback _callback;
// Callback used to retrieve job data
QueryEngineJobCallback _jCallback;
// Callback used to retrieve metadata
QueryEngineMetadataCallback _mCallback;
// String storing the current sensor hierarchy, used for convenience
string _sensorHierarchy;
// String storing the filter to be used when building a sensor navigator
......
......@@ -176,6 +176,38 @@ bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint6
return true;
}
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
// Returning NULL if the query engine is being updated
if(queryEngine.updating.load()) return false;
++queryEngine.access;
std::string topic=name;
// Getting the topic of the queried sensor from the Navigator
// If not found, we try to use the input name as topic
try {
topic = queryEngine.getNavigator()->getNodeTopic(name);
} catch(const std::domain_error& e) {}
if(metadataStore->getMap().count(topic)) {
buffer = metadataStore->get(topic);
} else {
// If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
try {
DCDB::PublicSensor publicSensor;
if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
--queryEngine.access;
return false;
}
buffer = Configuration::publicSensorToMetadata(publicSensor);
}
catch (const std::exception &e) {
--queryEngine.access;
return false;
}
}
--queryEngine.access;
return true;
}
/* Normal termination (SIGINT, CTRL+C) */
void sigHandler(int sig)
{
......@@ -526,6 +558,7 @@ int main(int argc, char* const argv[]) {
queryEngine.setJobFilter(analyticsSettings.jobFilter);
queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setMetadataQueryCallback(metadataQueryCallback);
queryEngine.setJobQueryCallback(jobQueryCallback);
if(!analyticsController->initialize(settings, argv[argc - 1]))
return EXIT_FAILURE;
......
......@@ -91,6 +91,26 @@ bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint6
return found;
}
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
// Returning NULL if the query engine is being updated
if(_queryEngine.updating.load()) return false;
bool found=false;
++_queryEngine.access;
shared_ptr<map<string, SBasePtr>> sensorMap = _queryEngine.getSensorMap();
if(sensorMap!=nullptr && sensorMap->count(name)>0) {
SBasePtr sensor = sensorMap->at(name);
if(!sensor->getMetadata()) {
found = false;
} else {
found = true;
buffer = *sensor->getMetadata();
}
}
--_queryEngine.access;
return found;
}
void sigHandler(int sig) {
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
if( sig == SIGINT )
......@@ -285,6 +305,7 @@ int main(int argc, char** argv) {
_queryEngine.setJobFilter(analyticsSettings.jobFilter);
_queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
_queryEngine.setQueryCallback(sensorQueryCallback);
_queryEngine.setMetadataQueryCallback(metadataQueryCallback);
if(!_operatorManager->load(argv[argc-1], "dcdbpusher.conf", pluginSettings)) {
LOG(fatal) << "Failed to load data analytics manager!";
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment