Commit 30c355d3 authored by Alessio Netti's avatar Alessio Netti
Browse files

DA: integration of analytics in collectagent

- QueryEngine integration done (but not functional yet, debugging required)
- Changed behavior on exceptional conditions: if SensorNavigator or
AnalyticsManager initialization fails, termination follows
- Now the QueryEngine can also access output sensors of other Analyzers
- Minor bugfixes
parent acc15ee7
......@@ -65,7 +65,7 @@ void AggregatorAnalyzer::computeAvg(int unitID) {
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(_buffer->empty()) {
if(!_buffer || _buffer->empty()) {
LOG(error) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
......@@ -90,7 +90,7 @@ void AggregatorAnalyzer::computeMax(int unitID) {
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(_buffer->empty()) {
if(!_buffer || _buffer->empty()) {
LOG(error) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
......@@ -113,7 +113,7 @@ void AggregatorAnalyzer::computeMin(int unitID) {
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(_buffer->empty()) {
if(!_buffer || _buffer->empty()) {
LOG(error) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
......
......@@ -26,7 +26,6 @@ bool AnalyticsController::initialize(globalCA_t& settings, const string& configP
_configPath = configPath;
_navigator = make_shared<SensorNavigator>();
//TODO: make exceptional conditions consistent
// A sensor navigator is only built if data analytics plugins are expected to be instantiated
QueryEngine &_queryEngine = QueryEngine::getInstance();
if(_manager->probe(_configPath, "collectagent.conf")) {
......@@ -46,8 +45,8 @@ bool AnalyticsController::initialize(globalCA_t& settings, const string& configP
_navigator->buildTree(_settings.hierarchy, &names, &topics);
} catch (const std::invalid_argument &e) {
LOG(error) << e.what();
LOG(error) << "Failed to build sensor hierarchy tree, data analytics manager will not be initialized!";
return true;
LOG(error) << "Failed to build sensor hierarchy tree!";
return false;
}
LOG(info) << "Built a sensor hierarchy tree of size " << _navigator->getTreeSize() << " and depth "
<< _navigator->getTreeDepth() << ".";
......@@ -62,7 +61,7 @@ bool AnalyticsController::initialize(globalCA_t& settings, const string& configP
//TODO: find a better solution to disable the SensorBase default cache
_settings.pluginSettings.cacheInterval = 0;
if(!_manager->load(_configPath, "collectagent.conf", _settings.pluginSettings)) {
LOG(fatal) << "Failed to load data analytics manager! Collect agent is proceeding anyway.";
LOG(fatal) << "Failed to load data analytics manager!";
return false;
}
......@@ -112,6 +111,7 @@ void AnalyticsController::run() {
for (auto &p : _analyticsPlugins) {
if (_doHalt) break;
for (const auto &a : p.configurator->getAnalyzers())
if(a->getStreaming())
for (const auto &u : a->getUnits())
for (const auto &s : u->getBaseOutputs())
if (s->getSizeOfReadingQueue() >= a->getMinValues() && sid.mqttTopicConvert(s->getMqtt())) {
......@@ -139,6 +139,7 @@ bool AnalyticsController::publishSensors() {
uint64_t publishCtr = 0;
for (auto &p : _analyticsPlugins)
for (const auto &a : p.configurator->getAnalyzers())
if(a->getStreaming())
for (const auto &u : a->getUnits())
for (const auto &s : u->getBaseOutputs()) {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
......
......@@ -39,6 +39,7 @@
#include <dcdb/sensordatastore.h>
#include <dcdb/sensorconfig.h>
#include <dcdb/version.h>
#include <dcdb/sensor.h>
#include "version.h"
#include "configuration.h"
......@@ -66,10 +67,58 @@ DCDB::Connection* dcdbConn;
DCDB::SensorDataStore *mySensorDataStore;
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance();
logger_t lg;
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
return buffer;
std::string topic;
try {
topic = queryEngine.getNavigator()->getNodeTopic(name);
} catch(const std::domain_error& e) {
return NULL;
}
std::vector <reading_t> *output = NULL;
DCDB::SensorId sid;
sid.mqttTopicConvert(topic);
if(mySensorCache.getSensorMap().count(sid) > 0) {
CacheEntry &entry = mySensorCache.getSensorMap()[sid];
output = entry.getView(startTs, endTs, buffer, rel, true);
if (output->size() > 0)
return output;
}
// If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
try {
DCDB::PublicSensor publicSensor;
publicSensor.name = name;
publicSensor.pattern = topic;
std::list <DCDB::SensorDataStoreReading> results;
DCDB::Sensor sensor(dcdbConn, publicSensor);
uint64_t now = getTimestamp();
//Converting relative timestamps to absolute
uint64_t startTsInt = rel ? now - startTs : startTs;
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
if(output)
output->clear();
else if(buffer) {
buffer->clear();
output = buffer;
} else
output = new std::vector<reading_t>();
reading_t reading;
//TODO: fix when result contains only partial time range of the query
for (const auto &r : results) {
reading.value = r.value;
reading.timestamp = r.timeStamp.getRaw();
output->push_back(reading);
}
}
catch(const std::exception& e) {
if(!buffer && output) delete output;
return NULL;
}
return output;
}
/* Normal termination (SIGINT, CTRL+C) */
......@@ -423,7 +472,7 @@ int main(int argc, char* const argv[]) {
// Setting the size of the sensor cache
// Conversion from milliseconds to nanoseconds
mySensorCache.setMaxHistory(settings.pluginSettings.cacheInterval * 1000000);
mySensorCache.setMaxHistory(uint64_t(settings.pluginSettings.cacheInterval) * 1000000);
//Allocate and initialize connection to Cassandra.
dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
......@@ -461,7 +510,7 @@ int main(int argc, char* const argv[]) {
analyticsController->setCache(&mySensorCache);
if(!analyticsController->initialize(settings, argv[argc - 1]))
return EXIT_FAILURE;
QueryEngine::getInstance().setQueryCallback(sensorQueryCallback);
queryEngine.setQueryCallback(sensorQueryCallback);
LOG_LEVEL vLogLevel = validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
LOG_VAR(vLogLevel) << "----- Configuration -----";
......
......@@ -71,7 +71,7 @@ bool Configuration::readGlobal() {
} else if (boost::iequals(global.first, "messageSlots")) {
_global.messageSlots = stoul(global.second.data());
} else if (boost::iequals(global.first, "cacheInterval")) {
_global.pluginSettings.cacheInterval = stoul(global.second.data());
_global.pluginSettings.cacheInterval = stoul(global.second.data()) * 1000;
} else if (boost::iequals(global.first, "verbosity")) {
_global.logLevelFile = translateLogLevel(stoi(global.second.data()));
} else if (boost::iequals(global.first, "threads")) {
......
......@@ -15,7 +15,7 @@ SensorCache::~SensorCache() {
sensorCache.clear();
}
const sensorCache_t& SensorCache::getSensorMap() {
sensorCache_t& SensorCache::getSensorMap() {
return sensorCache;
}
......
......@@ -23,9 +23,9 @@ public:
virtual ~SensorCache();
/**
* @brief Returns a constant reference to the internal sensor cache map.
* @brief Returns a reference to the internal sensor cache map.
**/
const sensorCache_t& getSensorMap();
sensorCache_t& getSensorMap();
/**
* @brief Store a sensor reading in the SensorCache.
......
......@@ -396,9 +396,9 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
connection->set_status(server::connection::not_supported);
goto error;
}
}
//Updating the SensorNavigator on plugin reloads
if(action == "reload") {
if(pathStrs.back() == "reload") {
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector <std::string> names, topics;
......@@ -413,7 +413,6 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
qEngine.triggerUpdate();
}
}
}
LOGH(info) << "Responding: " << response;
data << response << std::endl;
......
......@@ -133,10 +133,11 @@ void MQTTPusher::push() {
break;
}
for (const auto &a : p.configurator->getAnalyzers()) {
if(a->getStreaming()) {
for (const auto &u : a->getUnits()) {
for (const auto &s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) {
if (_msgCap == DISABLED || totalCount < (unsigned) _maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
......@@ -148,6 +149,7 @@ void MQTTPusher::push() {
}
}
}
}
}
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
......@@ -221,6 +223,7 @@ bool MQTTPusher::sendMappings() {
// Performing auto-publish for analytics output sensors
for(auto& p: _analyticsPlugins)
for(auto& a: p.configurator->getAnalyzers())
if(a->getStreaming())
for(auto& u: a->getUnits())
for(auto& s: u->getBaseOutputs()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
......
......@@ -57,16 +57,23 @@ QueryEngine& _queryEngine = QueryEngine::getInstance();
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
//TODO: Include data analytics output sensors in the sensormap
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
//Initializing the sensor map if necessary. Thread safe!
if(_queryEngine.updated.load()) {
if(!_queryEngine.updating.exchange(true)) {
_sensorMap.clear();
// Adding ordinary sensors to the map
for (auto &p : _configuration->getPlugins())
for (auto &g : p.configurator->getSensorGroups())
for (auto &s : g->getSensors())
_sensorMap.insert(std::make_pair(s->getName(), s));
// Adding data analytics sensors to the map
for(auto& p : _analyticsManager->getPlugins())
for(const auto& a : p.configurator->getAnalyzers())
if (a->getStreaming())
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs())
_sensorMap.insert(std::make_pair(o->getName(), o));
_queryEngine.updated.store(false);
_queryEngine.updating.store(false);
} else {
......@@ -77,11 +84,11 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
if(_sensorMap.count(name) > 0) {
SBasePtr sensor = _sensorMap[name];
if(!sensor->isInit())
return buffer;
return NULL;
else
return sensor->getCache()->getView(startTs, endTs, buffer, rel, true);
}
return buffer;
return NULL;
}
void sigHandler(int sig) {
......@@ -255,7 +262,6 @@ int main(int argc, char** argv) {
_analyticsManager = new AnalyticsManager();
// Preparing the SensorNavigator
bool failedTree = false;
if(_analyticsManager->probe(argv[argc-1], "dcdbpusher.conf")) {
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector <std::string> names, topics;
......@@ -272,18 +278,16 @@ int main(int argc, char** argv) {
_queryEngine.triggerUpdate();
} catch (const std::invalid_argument &e) {
LOG(error) << e.what();
LOG(error) << "Failed to build sensor hierarchy tree, data analytics manager will not be initialized!";
failedTree = true;
LOG(error) << "Failed to build sensor hierarchy tree!";
return 1;
}
}
if(!failedTree) {
_queryEngine.setQueryCallback(sensorQueryCallback);
if(!_analyticsManager->load(argv[argc-1], "dcdbpusher.conf", pluginSettings)) {
LOG(fatal) << "Failed to load data analytics manager!";
return 1;
}
}
//print configuration to give some feedback
//config of plugins is only printed if the config shall be validated or to debug level otherwise
......@@ -365,7 +369,6 @@ int main(int argc, char** argv) {
}
}
if(!failedTree) {
if(!_queryEngine.updated.is_lock_free())
LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
......@@ -374,7 +377,6 @@ int main(int argc, char** argv) {
LOG(info) << "Starting analyzers...";
_analyticsManager->start();
}
LOG(info) << "Sensors started!";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment