Commit 0f77d784 authored by Alessio Netti's avatar Alessio Netti

DA: integration in CollectAgent (WIP)

- First step for integration of the analytics framework in the collectagent
- QueryEngine and REST API integration still missing
- Proper testing still to be done
- Added a probe function: the global config file is scanned before
initialization, and if no data analytics plugins are requested, no
sensor navigator is built
parent b583a11f
......@@ -12,6 +12,34 @@ void AnalyticsManager::clear() {
_status = CLEAR;
}
bool AnalyticsManager::probe(const string& path, const string& globalFile) {
std::string cfgPath = path;
boost::property_tree::iptree cfg;
if (cfgPath != "" && cfgPath[cfgPath.length() - 1] != '/')
cfgPath.append("/");
try {
boost::property_tree::read_info(cfgPath + globalFile, cfg);
} catch (boost::property_tree::info_parser_error &e) {
return false;
}
if(cfg.find("analyzerPlugins") == cfg.not_found())
return false;
int pluginCtr = 0;
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("analyzerPlugins")) {
if (boost::iequals(plugin.first, "analyzerPlugin"))
pluginCtr++;
}
if( pluginCtr == 0)
return false;
else
return true;
}
bool AnalyticsManager::load(const string& path, const string& globalFile, const pluginSettings_t& pluginSettings) {
//The load code is pretty much the same as in Configuration.cpp to load pusher plugins
_configPath = path;
......@@ -34,7 +62,6 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
return true;
}
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
//Reading plugins
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("analyzerPlugins")) {
if (boost::iequals(plugin.first, "analyzerPlugin")) {
......
......@@ -59,6 +59,19 @@ public:
*/
void clear();
/**
* @brief Probes a configuration file to determine if initialization is required
*
* This method will read through the specified configuration file, and search for an
* analyzerPlugin block, with its associated data analytics plugin. If the method returns
* true, then one or more plugins were requested for initialization.
*
* @param path Path to the global and plugin configuration files
* @param globalFile Name of the global file (usually global.conf or collectagent.conf)
* @return true if a configuration is necessary, false otherwise
*/
bool probe(const string& path, const string& globalFile);
/**
* @brief Loads plugins as specified in the input config file
*
......
......@@ -9,7 +9,7 @@ ANALYZERS = aggregator
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
LIBEXT = dylib
LIBFLAGS = -dynamiclib -install_name
LIBFLAGS = -dynamiclib -install_name
else
BACNET_PORT = linux
LIBEXT = so
......
......@@ -39,12 +39,12 @@ void AggregatorAnalyzer::compute(int unitID) {
}
void AggregatorAnalyzer::computeSum(int unitID) {
uint64_t acc=0;
int64_t acc=0;
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(_buffer->empty()) {
if(!_buffer || _buffer->empty()) {
LOG(error) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
......@@ -60,7 +60,7 @@ void AggregatorAnalyzer::computeSum(int unitID) {
}
void AggregatorAnalyzer::computeAvg(int unitID) {
uint64_t acc=0, ctr=0;
int64_t acc=0, ctr=0;
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
......@@ -85,7 +85,7 @@ void AggregatorAnalyzer::computeAvg(int unitID) {
}
void AggregatorAnalyzer::computeMax(int unitID) {
uint64_t acc=0;
int64_t acc=0;
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
......@@ -107,7 +107,7 @@ void AggregatorAnalyzer::computeMax(int unitID) {
}
void AggregatorAnalyzer::computeMin(int unitID) {
uint64_t acc=0;
int64_t acc=0;
bool minInit=false;
for(const auto& in : _units[unitID]->getInputs()) {
......
......@@ -3,6 +3,9 @@ include ../config.mk
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-function -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../common/include/ -I../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_LOG_DYN_LINK -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
OBJS = ../common/src/logging.o \
../analytics/AnalyticsManager.o \
../common/src/sensornavigator.o \
analyticscontroller.o \
sensorcache.o \
collectagent.o \
configuration.o \
......
//
// Created by Netti, Alessio on 11.04.19.
//
#include "analyticscontroller.h"
void AnalyticsController::start() {
_keepRunning = true;
_readingCtr = 0;
_mainThread = boost::thread(bind(&AnalyticsController::run, this));
}
void AnalyticsController::stop() {
_keepRunning = false;
LOG(info) << "Stopping sensors...";
_manager->stop();
LOG(info) << "Stopping data analytics management thread...";
_mainThread.join();
LOG(info) << "Stopping worker threads...";
_keepAliveWork.reset();
_threads.join_all();
}
bool AnalyticsController::initialize(globalCA_t& settings, const string& configPath) {
_settings = settings;
_configPath = configPath;
_navigator = make_shared<SensorNavigator>();
//TODO: make exceptional conditions consistent
// A sensor navigator is only built if data analytics plugins are expected to be instantiated
QueryEngine &_queryEngine = QueryEngine::getInstance();
if(_manager->probe(_configPath, "collectagent.conf")) {
vector <string> names, topics;
list <DCDB::PublicSensor> publicSensors;
// Fetching sensor names and topics from the Cassandra datastore
_dcdbCfg->getPublicSensorsVerbose(publicSensors);
for (const auto &s : publicSensors)
if (!s.is_virtual) {
names.push_back(s.name);
topics.push_back(s.pattern);
}
publicSensors.clear();
// Building the sensor navigator
try {
_navigator->buildTree(_settings.hierarchy, &names, &topics);
} catch (const std::invalid_argument &e) {
LOG(error) << e.what();
LOG(error) << "Failed to build sensor hierarchy tree, data analytics manager will not be initialized!";
return true;
}
LOG(info) << "Built a sensor hierarchy tree of size " << _navigator->getTreeSize() << " and depth "
<< _navigator->getTreeDepth() << ".";
names.clear();
topics.clear();
// Assigning the newly-built sensor navigator to the QueryEngine
_queryEngine.setNavigator(_navigator);
_queryEngine.triggerUpdate();
}
//TODO: find a better solution to disable the SensorBase default cache
_settings.pluginSettings.cacheInterval = 0;
if(!_manager->load(_configPath, "collectagent.conf", _settings.pluginSettings)) {
LOG(fatal) << "Failed to load data analytics manager! Collect agent is proceeding anyway.";
return false;
}
if(!_queryEngine.updated.is_lock_free())
LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
LOG(info) << "Creating threads...";
// Dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
// Inherited from DCDB Pusher
_keepAliveWork = make_shared<boost::asio::io_service::work>(_io);
// Create pool of threads which handle the sensors
for(size_t i = 0; i < _settings.threads; i++) {
_threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &_io));
}
LOG(info) << "Threads created!";
_initialized = true;
return true;
}
void AnalyticsController::run() {
// If initialization of data analytics fails, the thread terminates
if(!_initialized)
return;
LOG(info) << "Init analyzers...";
_manager->init(_io);
LOG(info) << "Starting analyzers...";
_manager->start();
LOG(info) << "Sensors started!";
publishSensors();
vector<an_dl_t>& _analyticsPlugins = _manager->getPlugins();
DCDB::SensorId sid;
list<DCDB::SensorDataStoreReading> readings;
boost::lockfree::spsc_queue<reading_t> *sensorQueue;
reading_t readingBuf;
while (_keepRunning) {
if (_doHalt) {
_halted = true;
sleep(2);
continue;
}
_halted = false;
// Push output analytics sensors
for (auto &p : _analyticsPlugins) {
if (_doHalt) break;
for (const auto &a : p.configurator->getAnalyzers())
for (const auto &u : a->getUnits())
for (const auto &s : u->getBaseOutputs())
if (s->getSizeOfReadingQueue() >= a->getMinValues() && sid.mqttTopicConvert(s->getMqtt())) {
readings.clear();
sensorQueue = s->getReadingQueue();
while(sensorQueue->pop(readingBuf)) {
readings.push_back(DCDB::SensorDataStoreReading(sid, readingBuf.timestamp, readingBuf.value));
_sensorCache->storeSensor(sid, readingBuf.timestamp, readingBuf.value);
}
_dcdbStore->insertBatch(readings);
_readingCtr+=readings.size();
}
}
sleep(1);
}
}
bool AnalyticsController::publishSensors() {
if(_settings.pluginSettings.sensorPattern=="")
return false;
DCDB::SCError err;
vector<an_dl_t>& _analyticsPlugins = _manager->getPlugins();
bool failedPublish = false;
uint64_t publishCtr = 0;
for (auto &p : _analyticsPlugins)
for (const auto &a : p.configurator->getAnalyzers())
for (const auto &u : a->getUnits())
for (const auto &s : u->getBaseOutputs()) {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
switch (err) {
case DCDB::SC_OK:
publishCtr++;
break;
case DCDB::SC_INVALIDPATTERN:
LOG(error) << "Invalid sensor topic : " << s->getMqtt();
failedPublish = true;
break;
case DCDB::SC_INVALIDPUBLICNAME:
LOG(error) << "Invalid sensor public name: " << s->getName();
failedPublish = true;
break;
case DCDB::SC_INVALIDSESSION:
LOG(error) << "Cannot reach sensor data store.";
failedPublish = true;
break;
default:
break;
}
}
if(failedPublish)
LOG(error) << "Issues during sensor name auto-publish! Only " << publishCtr << " sensors were published.";
else
LOG(info) << "Sensor name auto-publish performed for all " << publishCtr << " sensors!";
return true;
}
//
// Created by Netti, Alessio on 11.04.19.
//
#ifndef PROJECT_ANALYTICSCONTROLLER_H
#define PROJECT_ANALYTICSCONTROLLER_H
#include <list>
#include <vector>
#include <dcdb/sensordatastore.h>
#include <dcdb/sensorconfig.h>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include "../analytics/AnalyticsManager.h"
#include "sensornavigator.h"
#include "sensorcache.h"
#include "configuration.h"
#include "logging.h"
using namespace std;
class AnalyticsController {
public:
AnalyticsController(DCDB::SensorConfig *dcdbCfg, DCDB::SensorDataStore *dcdbStore) {
_dcdbCfg = dcdbCfg;
_dcdbStore = dcdbStore;
_manager = make_shared<AnalyticsManager>();
_navigator = nullptr;
_sensorCache = nullptr;
_keepRunning = false;
_doHalt = false;
_halted = true;
_initialized = false;
}
~AnalyticsController() {}
void start();
void stop();
// Initializes the sensor navigator and data analytics manager
bool initialize(globalCA_t& settings, const string& configPath);
void setCache(SensorCache* cache) { _sensorCache = cache; }
bool isStopped() { return _halted; }
shared_ptr<AnalyticsManager> getManager() { return _manager; }
shared_ptr<SensorNavigator> getNavigator() { return _navigator; }
SensorCache* getCache() { return _sensorCache; }
uint64_t getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; }
private:
// Method implementing the main loop of the internal thread
void run();
// Performs auto-publish of data analytics sensors, if required
bool publishSensors();
// Flag to keep track of the thread's status
bool _keepRunning;
// Flag to trigger temporary stops to the data analytics controller
bool _doHalt;
bool _halted;
bool _initialized;
// Objects to connect to the Cassandra datastore
DCDB::SensorConfig *_dcdbCfg;
DCDB::SensorDataStore *_dcdbStore;
// Global sensor cache object for the collectagent
SensorCache *_sensorCache;
// Sensor navigator
shared_ptr<SensorNavigator> _navigator;
// Internal data analytics manager object
shared_ptr<AnalyticsManager> _manager;
// Misc configuration attributes
globalCA_t _settings;
string _configPath;
// Readings counter
uint64_t _readingCtr;
// Main management thread for the analytics controller
boost::thread _mainThread;
// IO service for the analyzers
boost::asio::io_service _io;
// Underlying thread pool
boost::thread_group _threads;
// Dummy task to keep thread pool alive
shared_ptr<boost::asio::io_service::work> _keepAliveWork;
//Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
#endif //PROJECT_ANALYTICSCONTROLLER_H
......@@ -47,6 +47,8 @@
#include "abrt.h"
#include "dcdbdaemon.h"
#include "sensorcache.h"
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
......@@ -59,12 +61,17 @@ uint64_t msgCtr;
uint64_t pmsgCtr;
uint64_t readingCtr;
SensorCache mySensorCache;
AnalyticsController* analyticsController;
DCDB::Connection* dcdbConn;
DCDB::SensorDataStore *mySensorDataStore;
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
logger_t lg;
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
return buffer;
}
/* Normal termination (SIGINT, CTRL+C) */
void sigHandler(int sig)
{
......@@ -244,11 +251,12 @@ void usage() {
012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
cout << "Usage:" << endl;
cout << " collectagent [-d] [-s] [-x] [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
cout << " collectagent [-d] [-s] [-x] [-a<string>] [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
cout << " collectagent -h" << endl;
cout << endl;
cout << "Options:" << endl;
cout << " -a <string> Auto-publish pattern [default: none]" << endl;
cout << " -m<host> MQTT listen address [default: " << defaults.mqttListenAddress << "]" << endl;
cout << " -r<host> REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
cout << " -c<host> Cassandra host [default: " << defaults.cassandraSettings.address << "]" << endl;
......@@ -279,7 +287,7 @@ int main(int argc, char* const argv[]) {
}
// Defining options
const char* opts = "m:r:c:C:u:p:t:v:dDsxh";
const char* opts = "a:m:r:c:C:u:p:t:v:dDsxh";
// Same mechanism as in DCDBPusher - checking if help string is requested before loading config
char ret;
......@@ -314,6 +322,9 @@ int main(int argc, char* const argv[]) {
optind = 1;
while ((ret=getopt(argc, argv, opts))!=-1) {
switch(ret) {
case 'a':
settings.pluginSettings.sensorPattern = optarg;
break;
case 'm':
settings.mqttListenAddress = optarg;
break;
......@@ -359,7 +370,7 @@ int main(int argc, char* const argv[]) {
}
}
auto fileSink = setupFileLogger(settings.tempDir, std::string("collectagent"));
auto fileSink = setupFileLogger(settings.pluginSettings.tempdir, std::string("collectagent"));
//severity level may be overwritten (per option or config-file) --> set it according to globalSettings
fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);
......@@ -376,46 +387,6 @@ int main(int argc, char* const argv[]) {
signal(SIGABRT, abrtHandler);
signal(SIGSEGV, abrtHandler);
LOG_LEVEL vLogLevel = validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
LOG_VAR(vLogLevel) << "----- Configuration -----";
//print global settings in either case
LOG(info) << "Global Settings:";
LOG(info) << " MQTT-listenAddress: " << settings.mqttListenAddress;
LOG(info) << " CacheInterval: " << settings.cacheInterval << " [s]";
LOG(info) << " CleaningInterval: " << settings.cleaningInterval << " [s]";
LOG(info) << " MessageThreads: " << settings.messageThreads;
LOG(info) << " MessageSlots: " << settings.messageSlots;
LOG(info) << " Daemonize: " << (settings.daemonize ? "Enabled" : "Disabled");
LOG(info) << " Statistics: " << (settings.statistics ? "Enabled" : "Disabled");
LOG(info) << " Write-Dir: " << settings.tempDir;
LOG(info) << " Hierarchy: " << (settings.hierarchy!="" ? settings.hierarchy : "none");
LOG(info) << (validateConfig ? " Only validating config files." : " ValidateConfig: Disabled");
LOG(info) << "Cassandra Driver Settings:";
LOG(info) << " Address: " << settings.cassandraSettings.address;
LOG(info) << " TTL: " << settings.cassandraSettings.ttl;
LOG(info) << " NumThreadsIO: " << settings.cassandraSettings.numThreadsIo;
LOG(info) << " QueueSizeIO: " << settings.cassandraSettings.queueSizeIo;
LOG(info) << " CoreConnPerHost: " << settings.cassandraSettings.coreConnPerHost;
LOG(info) << " MaxConnPerHost: " << settings.cassandraSettings.maxConnPerHost;
LOG(info) << " MaxConcRequests: " << settings.cassandraSettings.maxConcRequests;
LOG(info) << " DebugLog: " << (settings.cassandraSettings.debugLog ? "Enabled" : "Disabled");
#ifdef SimpleMQTTVerbose
LOG(info) << " Username: " << settings.cassandraSettings.username;
LOG(info) << " Password: " << settings.cassandraSettings.password;
#else
LOG(info) << " Username and password not printed.";
#endif
LOG(info) << "RestAPI Settings:";
LOG(info) << " REST Server: " << settings.restListenAddress;
LOG_VAR(vLogLevel) << "----- End Configuration -----";
if (validateConfig)
return 0;
// Daemonizing the collectagent
if(settings.daemonize)
dcdbdaemon();
......@@ -451,7 +422,8 @@ int main(int argc, char* const argv[]) {
}
// Setting the size of the sensor cache
mySensorCache.setMaxHistory(settings.cacheInterval * 1000000000);
// Conversion from milliseconds to nanoseconds
mySensorCache.setMaxHistory(settings.pluginSettings.cacheInterval * 1000000);
//Allocate and initialize connection to Cassandra.
dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
......@@ -485,6 +457,59 @@ int main(int argc, char* const argv[]) {
mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
analyticsController->setCache(&mySensorCache);
if(!analyticsController->initialize(settings, argv[argc - 1]))
return EXIT_FAILURE;
QueryEngine::getInstance().setQueryCallback(sensorQueryCallback);
LOG_LEVEL vLogLevel = validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
LOG_VAR(vLogLevel) << "----- Configuration -----";
//print global settings in either case
LOG(info) << "Global Settings:";
LOG(info) << " MQTT-listenAddress: " << settings.mqttListenAddress;
LOG(info) << " CacheInterval: " << int(settings.pluginSettings.cacheInterval/1000) << " [s]";
LOG(info) << " CleaningInterval: " << settings.cleaningInterval << " [s]";
LOG(info) << " MessageThreads: " << settings.messageThreads;
LOG(info) << " MessageSlots: " << settings.messageSlots;
LOG(info) << " Daemonize: " << (settings.daemonize ? "Enabled" : "Disabled");
LOG(info) << " Statistics: " << (settings.statistics ? "Enabled" : "Disabled");
LOG(info) << " Write-Dir: " << settings.pluginSettings.tempdir;
LOG(info) << " Hierarchy: " << (settings.hierarchy!="" ? settings.hierarchy : "none");
LOG(info) << (validateConfig ? " Only validating config files." : " ValidateConfig: Disabled");
LOG(info) << "Cassandra Driver Settings:";
LOG(info) << " Address: " << settings.cassandraSettings.address;
LOG(info) << " TTL: " << settings.cassandraSettings.ttl;
LOG(info) << " NumThreadsIO: " << settings.cassandraSettings.numThreadsIo;
LOG(info) << " QueueSizeIO: " << settings.cassandraSettings.queueSizeIo;
LOG(info) << " CoreConnPerHost: " << settings.cassandraSettings.coreConnPerHost;
LOG(info) << " MaxConnPerHost: " << settings.cassandraSettings.maxConnPerHost;
LOG(info) << " MaxConcRequests: " << settings.cassandraSettings.maxConcRequests;
LOG(info) << " DebugLog: " << (settings.cassandraSettings.debugLog ? "Enabled" : "Disabled");
#ifdef SimpleMQTTVerbose
LOG(info) << " Username: " << settings.cassandraSettings.username;
LOG(info) << " Password: " << settings.cassandraSettings.password;
#else
LOG(info) << " Username and password not printed.";
#endif
LOG(info) << "RestAPI Settings:";
LOG(info) << " REST Server: " << settings.restListenAddress;
LOG_VAR(vLogLevel) << "----- Analytics Configuration -----";
for(auto& p : analyticsController->getManager()->getPlugins()) {
LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
p.configurator->printConfig(vLogLevel);
}
LOG_VAR(vLogLevel) << "----- End Configuration -----";
if (validateConfig)
return EXIT_SUCCESS;
else
analyticsController->start();
/*
* Start the MQTT Message Server.
*/
......@@ -540,6 +565,7 @@ int main(int argc, char* const argv[]) {
float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
if (settings.statistics && keepRunning) {
LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
}
msgCtr = 0;
pmsgCtr = 0;
......@@ -547,7 +573,7 @@ int main(int argc, char* const argv[]) {
}
LOG(info) << "Stopping...";
analyticsController->stop();
ms.stop();
LOG(info) << "MQTT Server stopped...";
httpServer.stop();
......
global {
mqttListenAddress 127.0.0.1:1883
cleaningInterval 86400
mqttprefix /00112233445566778899AABB
threads 24
messageThreads 128
messageSlots 16