Commit 0674d775 authored by Alessio Netti's avatar Alessio Netti
Browse files

Integrated MetadataStore in Collectagent

- Also added a "pattern" field to SensorMetadata class for consistency
with the DCDB::PublicSensor class
parent 174997aa
......@@ -623,6 +623,7 @@ protected:
SensorMetadata* sm = s->getMetadata();
if(sm) {
sm->publicName = s->getMqtt();
sm->pattern = s->getMqtt();
sm->isVirtual = false;
}
}
......@@ -632,6 +633,7 @@ protected:
SensorMetadata* sm = s->getMetadata();
if(sm) {
sm->publicName = s->getMqtt();
sm->pattern = s->getMqtt();
sm->isVirtual = false;
}
}
......
......@@ -46,7 +46,7 @@ void AnalyticsController::stop() {
_initialized = false;
}
bool AnalyticsController::initialize(Configuration& settings, const string& configPath) {
bool AnalyticsController::initialize(Configuration& settings, const string& configPath, MetadataStore& metaStore) {
_settings = settings;
_configPath = configPath;
_navigator = make_shared<SensorNavigator>();
......@@ -54,18 +54,9 @@ bool AnalyticsController::initialize(Configuration& settings, const string& conf
// A sensor navigator is only built if operator plugins are expected to be instantiated
QueryEngine &_queryEngine = QueryEngine::getInstance();
if(_manager->probe(_configPath, "collectagent.conf")) {
vector <string> topics;
list <DCDB::PublicSensor> publicSensors;
// Fetching sensor names and topics from the Cassandra datastore
_dcdbCfg->getPublicSensorsVerbose(publicSensors);
std::string patternBuf;
for (const auto &s : publicSensors)
if (!s.is_virtual) {
patternBuf = s.pattern;
boost::algorithm::trim(patternBuf);
topics.push_back(patternBuf);
}
publicSensors.clear();
vector<string> topics;
for(const auto& kv : metaStore.getMap())
topics.push_back(kv.second.pattern);
// Building the sensor navigator
try {
......@@ -185,7 +176,7 @@ bool AnalyticsController::publishSensors() {
for (const auto &s : u->getBaseOutputs()) {
if (s->getPublish()) {
if (s->getMetadata()) {
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(*s->getMetadata(), s->getMqtt());
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(*s->getMetadata());
err = _dcdbCfg->publishSensor(ps);
} else {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
......
......@@ -40,6 +40,7 @@
#include "sensorcache.h"
#include "configuration.h"
#include "logging.h"
#include "metadatastore.h"
using namespace std;
......@@ -105,9 +106,10 @@ public:
*
* @param settings Settings class containing user-specified configuration parameters
* @param configPath Path to the configuration files for the data analytics framework
* @param metaStore MetadataStore object containing all public sensors to add in the sensor tree
* @return True if successful, false otherwise
*/
bool initialize(Configuration& settings, const string& configPath);
bool initialize(Configuration& settings, const string& configPath, MetadataStore& metaStore);
/**
* @brief Sets the cache to be used for sensors
......
......@@ -85,6 +85,7 @@ DCDB::Connection* dcdbConn;
DCDB::SensorDataStore *mySensorDataStore;
DCDB::JobDataStore *myJobDataStore;
DCDB::SensorConfig *mySensorConfig;
MetadataStore *metadataStore;
DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance();
logger_t lg;
......@@ -215,7 +216,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
LOG(error) << "Invalid metadata packed received!";
return 1;
}
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm, std::string(topic + DCDB_MET_LEN));
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
err = mySensorConfig->publishSensor(ps);
} else {
err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
......@@ -494,6 +495,15 @@ int main(int argc, char* const argv[]) {
mySensorDataStore->setTTL(cassandraSettings.ttl);
mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
// Fetching public sensor information from the Cassandra datastore
list<DCDB::PublicSensor> publicSensors;
metadataStore = new MetadataStore();
mySensorConfig->getPublicSensorsVerbose(publicSensors);
std::string patternBuf;
for (const auto &s : publicSensors)
if (!s.is_virtual)
metadataStore->store(s.pattern, Configuration::publicSensorToMetadata(s));
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
analyticsController->setCache(&mySensorCache);
queryEngine.setFilter(analyticsSettings.filter);
......@@ -501,7 +511,7 @@ int main(int argc, char* const argv[]) {
queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setJobQueryCallback(jobQueryCallback);
if(!analyticsController->initialize(settings, argv[argc - 1]))
if(!analyticsController->initialize(settings, argv[argc - 1], *metadataStore))
return EXIT_FAILURE;
LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
......@@ -632,6 +642,7 @@ int main(int argc, char* const argv[]) {
delete mySensorConfig;
dcdbConn->disconnect();
delete dcdbConn;
delete metadataStore;
LOG(info) << "Collect Agent closed. Bye bye...";
}
catch (const exception& e) {
......
......@@ -74,11 +74,11 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
}
}
DCDB::PublicSensor Configuration::metadataToPublicSensor(const SensorMetadata& sm, const string& pattern) {
DCDB::PublicSensor Configuration::metadataToPublicSensor(const SensorMetadata& sm) {
DCDB::PublicSensor ps;
ps.name = sm.publicName;
ps.is_virtual = sm.isVirtual;
ps.pattern = pattern;
ps.pattern = sm.pattern;
ps.unit = sm.unit;
ps.scaling_factor = sm.scale;
ps.ttl = sm.ttl;
......@@ -89,5 +89,21 @@ DCDB::PublicSensor Configuration::metadataToPublicSensor(const SensorMetadata& s
sensorMask = sensorMask | INTEGRABLE;
if(sm.monotonic)
sensorMask = sensorMask | MONOTONIC;
ps.sensor_mask = sensorMask;
return ps;
}
SensorMetadata Configuration::publicSensorToMetadata(const DCDB::PublicSensor& ps) {
SensorMetadata sm;
sm.publicName = ps.name;
sm.isVirtual = ps.is_virtual;
sm.pattern = ps.pattern;
sm.unit = ps.unit;
sm.scale = ps.scaling_factor;
sm.ttl = ps.ttl;
sm.interval = ps.interval;
sm.operations = ps.operations;
sm.integrable = ps.sensor_mask & INTEGRABLE;
sm.monotonic = ps.sensor_mask & MONOTONIC;
return sm;
}
......@@ -88,13 +88,20 @@ public:
virtual ~Configuration() {}
/**
* @brief
* @brief Converts a SensorMetadata object to a DCDB::PublicSensor one.
*
* @param s
* @param pattern
* @return
* @param s SensorMetadata object to be converted
* @return Output PublicSensor object
*/
static DCDB::PublicSensor metadataToPublicSensor(const SensorMetadata& s, const string& pattern);
static DCDB::PublicSensor metadataToPublicSensor(const SensorMetadata& s);
/**
* @brief Converts a DCDB::PublicSensor object to its SensorMetadata representation.
*
* @param ps The PublicSensor object to be converted
* @return Output SensorMetadata object
*/
static SensorMetadata publicSensorToMetadata(const DCDB::PublicSensor& ps);
// Additional configuration parameters to be parsed within the global block
std::string mqttListenHost = string(LISTENHOST);
......
......@@ -50,6 +50,7 @@ public:
integrable(false),
monotonic(false),
publicName(""),
pattern(""),
unit(""),
scale(1.0),
ttl(0),
......@@ -61,6 +62,7 @@ public:
this->integrable = other.integrable;
this->monotonic = other.monotonic;
this->publicName = other.publicName;
this->pattern = other.pattern;
this->unit = other.unit;
this->scale = other.scale;
this->ttl = other.ttl;
......@@ -73,6 +75,7 @@ public:
this->integrable = other.integrable;
this->monotonic = other.monotonic;
this->publicName = other.publicName;
this->pattern = other.pattern;
this->unit = other.unit;
this->scale = other.scale;
this->ttl = other.ttl;
......@@ -114,6 +117,8 @@ public:
this->unit = val.second.data();
} else if (boost::iequals(val.first, "publicName")) {
this->publicName = val.second.data();
} else if (boost::iequals(val.first, "pattern")) {
this->pattern = val.second.data();
} else if (boost::iequals(val.first, "scale")) {
this->scale = stod(val.second.data());
} else if (boost::iequals(val.first, "interval")) {
......@@ -155,6 +160,7 @@ public:
bool integrable;
bool monotonic;
string publicName;
string pattern;
string unit;
double scale;
uint64_t ttl;
......@@ -196,6 +202,7 @@ protected:
config.push_back(boost::property_tree::ptree::value_type("integrable", boost::property_tree::ptree(this->integrable ? "true" : "false")));
config.push_back(boost::property_tree::ptree::value_type("unit", boost::property_tree::ptree(this->unit)));
config.push_back(boost::property_tree::ptree::value_type("publicName", boost::property_tree::ptree(this->publicName)));
config.push_back(boost::property_tree::ptree::value_type("pattern", boost::property_tree::ptree(this->pattern)));
config.push_back(boost::property_tree::ptree::value_type("scale", boost::property_tree::ptree(to_string(this->scale))));
config.push_back(boost::property_tree::ptree::value_type("interval", boost::property_tree::ptree(to_string(this->interval / 1000000))));
config.push_back(boost::property_tree::ptree::value_type("ttl", boost::property_tree::ptree(to_string(this->ttl / 1000000))));
......
......@@ -686,6 +686,7 @@ protected:
SensorMetadata* sm = s->getMetadata();
if(sm) {
sm->publicName = s->getMqtt();
sm->pattern = s->getMqtt();
sm->isVirtual = false;
}
}
......@@ -1114,6 +1115,7 @@ protected:
SensorMetadata* sm = s->getMetadata();
if(sm) {
sm->publicName = s->getMqtt();
sm->pattern = s->getMqtt();
sm->isVirtual = false;
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment