Commit fd8c374a authored by Micha Mueller's avatar Micha Mueller
Browse files

Merge branch 'master' into CommonRestServer

parents 4c67ce63 8810a787
......@@ -5,7 +5,7 @@ MAKEFILENAME := $(lastword $(MAKEFILE_LIST))
SUB_DIRS = lib collectagent dcdbpusher analytics tools scripts
SOURCEFORGE_MROR = netcologne
CASSANDRA_VERSION = 2.2.10
CASSANDRA_VERSION = 3.0.18
MOSQUITTO_VERSION = 1.5.5
BOOST_VERSION = 1.70.0
OPENSSL_VERSION = 1.0.2l
......@@ -29,7 +29,7 @@ DISTFILES = apache-cassandra-$(CASSANDRA_VERSION).tar.gz;http://archive.apache.o
freeipmi-$(FREEIPMI_VERSION).tar.gz;http://ftp.gnu.org/gnu/freeipmi/freeipmi-$(FREEIPMI_VERSION).tar.gz \
net-snmp-$(NET-SNMP_VERSION).tar.gz;https://sourceforge.net/projects/net-snmp/files/net-snmp/$(NET-SNMP_VERSION)/net-snmp-$(NET-SNMP_VERSION).tar.gz/download
DISTFILES_HASHES = apache-cassandra-2.2.10.tar.gz|4c58cb7c6753ce26f7c4d650502feece;\
DISTFILES_HASHES = apache-cassandra-3.0.18.tar.gz|94dbdaa58b366166c53f881b8e266bc8;\
mosquitto-1.5.5.tar.gz|a17dffc6f63b2a4ab2eb5c51139e60e9;\
boost_1_70_0.tar.gz|fea771fe8176828fabf9c09242ee8c26;\
openssl-1.0.2l.tar.gz|f85123cd390e864dfbe517e7616e6566;\
......@@ -247,7 +247,7 @@ $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.installed: $(DCDBDEPSPATH)
$(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.built: $(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.patched
@echo ""
@echo "Building FreeIPMI library..."
cd $(@D) && ./configure --prefix=$(DCDBDEPLOYPATH) --without-argp
cd $(@D) && ./configure --prefix=$(DCDBDEPLOYPATH) --without-argp --without-encryption
cd $(@D) && make -j $(MAKETHREADS) && touch $(@)
$(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.installed: $(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.built | $(DCDBDEPLOYPATH)
......
......@@ -2,7 +2,7 @@ include ../config.mk
include ../common.mk
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/common/include -I$(DCDBDEPLOYPATH)/include -DVERSION=\"$(VERSION)\"
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldl -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -rdynamic
LIBS = -L../lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -rdynamic
ANALYZERS = aggregator
......
......@@ -39,7 +39,7 @@ Additional parameters specific to this framework are the following:
| Value | Explanation |
|:----- |:----------- |
| global | Wrapper structure for the global values.
| analytics | Wrapper structure for the data analytics-specific values.
| hierarchy | Space-separated sequence of regular expressions used to infer the local (DCDBPusher) or global (DCDBCollectAgent) sensor hierarchy. This parameter should be wrapped in quotes to ensure proper parsing. See the Sensor Tree [section](#sensorTree) for more details.
| analyzerPlugins | Block containing the specification of all data analytics plugin to be instantiated.
| plugin _name_ | The plugin name is used to build the corresponding lib-name (e.g. average --> libdcdbanalyzer_average.1.0)
......@@ -587,4 +587,4 @@ The configuration parameters specific to the _Aggregator_ plugin are the followi
Generating a DCDBAnalytics plugin requires implementing a _Analyzer_ and _Configurator_ class which contain all logic
tied to the specific plugin. Such classes should be derived from _AnalyzerTemplate_ and _AnalyzerConfiguratorTemplate_
respectively, which contain all plugin-agnostic configuration and runtime features. Please refer to the documentation
of the _Average_ plugin for an overview of how a basic plugin can be implemented.
of the _Aggregator_ plugin for an overview of how a basic plugin can be implemented.
......@@ -10,7 +10,7 @@
#include "QueryEngine.h"
#include "AnalyzerInterface.h"
#include "pluginsettings.h"
#include "globalconfiguration.h"
/**
* Interface to configurators for data analyzer plugins
......
......@@ -85,7 +85,11 @@ public:
virtual ~AnalyzerConfiguratorTemplate() {
for (auto ta : _templateAnalyzers)
delete ta.second;
for (auto ts : _templateSensors)
delete ts.second;
_templateAnalyzers.clear();
_templateSensors.clear();
_templateProtoInputs.clear();
_analyzerInterfaces.clear();
_analyzers.clear();
}
......@@ -112,8 +116,8 @@ public:
*/
void printConfig(LOG_LEVEL ll) final {
LOG_VAR(ll) << " General: ";
LOG_VAR(ll) << " MQTT-Prefix: " << (_mqttPrefix != "" ? _mqttPrefix : "DEFAULT");
LOG_VAR(ll) << " Sensor Pattern: " << (_sensorPattern != "" ? _sensorPattern : "DEFAULT");
LOG_VAR(ll) << " MQTT-Prefix: " << (_mqttPrefix != "" ? _mqttPrefix : std::string("DEFAULT"));
LOG_VAR(ll) << " Sensor Pattern: " << (_sensorPattern != "" ? _sensorPattern : std::string("DEFAULT"));
LOG_VAR(ll) << " Cache interval: " << _cacheInterval << " ms";
//prints plugin specific configurator attributes and entities if present
......@@ -154,18 +158,22 @@ public:
if (!val.second.empty()) {
Analyzer* an = new Analyzer(val.second.data());
an->setTemplate(true);
if (readAnalyzer(*an, val.second)) {
auto ret = _templateAnalyzers.insert(std::pair<std::string, Analyzer*>(val.second.data(), an));
if(!ret.second) {
LOG(warning) << "Template " << _analyzerName << " " << val.second.data() << " already exists! Omitting...";
delete an;
}
} else {
if (!readAnalyzer(*an, val.second)) {
LOG(warning) << "Template " << _analyzerName << " \"" << val.second.data() << "\" has bad values! Ignoring...";
delete an;
}
}
// Here we read and instantiate analyzers
// Sensor templates are read
} else if (boost::iequals(val.first, "template_" + _baseName)) {
LOG(debug) << "Template " << _baseName << " \"" << val.second.data() << "\"";
if (!val.second.empty()) {
SBase* base = new SBase(val.second.data());
if (!readSensorBase(*base, val.second, true)) {
LOG(warning) << "Template " << _baseName << " \"" << val.second.data() << "\" has bad values! Ignoring...";
delete base;
}
}
// Here we read and instantiate analyzers
} else if (boost::iequals(val.first, _analyzerName)) {
LOG(debug) << _analyzerName << " \"" << val.second.data() << "\"";
if (!val.second.empty()) {
......@@ -211,14 +219,18 @@ public:
for(auto a : _analyzers)
a->wait();
// First of all, delete all template analyzers
// First of all, delete all template analyzers and sensors
for (auto ta : _templateAnalyzers)
delete ta.second;
for (auto ts : _templateSensors)
delete ts.second;
// Clear all analyzers
_analyzerInterfaces.clear();
_analyzers.clear();
_templateAnalyzers.clear();
_templateSensors.clear();
_templateProtoInputs.clear();
}
/**
......@@ -294,7 +306,7 @@ protected:
* @param ll Severity level to log with
*/
virtual void printConfiguratorConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " No other plugin-specific general parameters defined";
LOG_VAR(ll) << " No other plugin-specific general parameters defined";
}
/**
......@@ -331,9 +343,12 @@ protected:
an = *(it->second);
an.setName(config.data());
an.setTemplate(false);
// Analyzers instantiated from templates DO NOT share the same units and sensors. This would lead to
// too much naming ambiguity and is generally just not needed
// Analyzers instantiated from templates DO NOT share the same units and output sensors.
// This would lead to too much naming ambiguity and is generally just not needed
an.clearUnits();
// The input sensors defined in the template are on the other hand preserved; this is meant as a
// workaround to shorten certain configurations
protoInputs = _templateProtoInputs[def.get().data()];
} else {
LOG(warning) << "Template " << _analyzerName << "\"" << def.get().data() << "\" not found! Using standard values.";
}
......@@ -362,7 +377,7 @@ protected:
if (boost::iequals(valInner.first, _baseName)) {
LOG(debug) << " I/O " << _baseName << " " << valInner.second.data();
SBase sensor = SBase(valInner.second.data());
if (readSensorBase(sensor, valInner.second)) {
if (readSensorBase(sensor, valInner.second, false)) {
shared_ptr<SBase> sensorPtr = make_shared<SBase>(sensor);
val.first==INPUT_BLOCK ? protoInputs.push_back(sensorPtr) : protoOutputs.push_back(sensorPtr);
} else {
......@@ -424,6 +439,14 @@ protected:
}
}
delete units;
} else {
// If the analyzer is a template, we add it to the related map
auto ret = _templateAnalyzers.insert(std::pair<std::string, Analyzer*>(an.getName(), &an));
if(!ret.second) {
LOG(warning) << "Template " << _analyzerName << " " << an.getName() << " already exists! Omitting...";
return false;
}
_templateProtoInputs.insert(std::pair<std::string, std::vector<shared_ptr<SBase>>>(an.getName(), protoInputs));
}
return true;
}
......@@ -439,9 +462,23 @@ protected:
* @param config A boost property (sub-)tree containing the sensor values
* @return True if successful, false otherwise
*/
bool readSensorBase(SBase& sBase, CFG_VAL config) {
bool readSensorBase(SBase& sBase, CFG_VAL config, bool isTemplate=false) {
sBase.setCacheInterval(_cacheInterval);
if (!isTemplate) {
// Copying parameters from the template (if defined)
boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
if(def) {
LOG(debug) << " Using \"" << def.get().data() << "\" as default.";
auto it = _templateSensors.find(def.get().data());
if(it != _templateSensors.end()) {
sBase = *(it->second);
sBase.setName(config.data());
} else {
LOG(warning) << "Template " << _baseName << "\" " << def.get().data() << "\" not found! Using standard values.";
}
}
}
// Reading other sensor parameters
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "mqttsuffix")) {
sBase.setMqtt(val.second.data());
......@@ -450,12 +487,21 @@ protected:
} else if (boost::iequals(val.first, "delta")) {
sBase.setDelta(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "sink")) {
sBase.setSinkPath( val.second.data() );
sBase.setSinkPath(val.second.data());
} else if (boost::iequals(val.first, "subSampling")) {
sBase.setSubsampling( std::stoul(val.second.data()) );
sBase.setSubsampling(std::stoul(val.second.data()));
}
}
sensorBase(sBase, config);
if(isTemplate) {
auto ret = _templateSensors.insert(std::pair<std::string, SBase*>(sBase.getName(), &sBase));
if(!ret.second) {
LOG(warning) << "Template " << _baseName << " " << sBase.getName() << " already exists! Omitting...";
return false;
}
}
return true;
}
......@@ -553,6 +599,10 @@ protected:
std::vector<A_Ptr> _analyzers;
// Map of the template analyzers that were defined in the config file - used for easy retrieval and instantiation
std::map<std::string, Analyzer*> _templateAnalyzers;
// Map of the template sensors that were defined
std::map<std::string, SBase*> _templateSensors;
// Map of the protoinputs belonging to template analyzers
std::map<std::string, std::vector<shared_ptr<SBase>>> _templateProtoInputs;
};
#endif //PROJECT_ANALYZERCONFIGURATORTEMPLATE_H
......@@ -108,7 +108,7 @@ public:
* @param ll Logging level at which the configuration is printed
*/
virtual void printConfig(LOG_LEVEL ll) override {
LOG_VAR(ll) << " MQTT part: " << (_mqttPart != "" ? _mqttPart : "DEFAULT");
LOG_VAR(ll) << " MQTT part: " << (_mqttPart != "" ? _mqttPart : std::string("DEFAULT"));
LOG_VAR(ll) << " Sync readings: " << (_sync ? "enabled" : "disabled");
LOG_VAR(ll) << " Streaming mode: " << (_streaming ? "enabled" : "disabled");
LOG_VAR(ll) << " Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
......@@ -395,8 +395,8 @@ protected:
if (_duplicate && _unitID >= 0)
compute(_unitID);
else
for (int i = 0; i < _units.size(); i++)
compute(i);
for (unsigned i = 0; i < _units.size(); i++)
compute((int)i);
} catch(const exception& e) {
LOG(error) << "Analyzer " + _name + ": internal error " + e.what() + " during computation!";
}
......
......@@ -91,6 +91,14 @@ public:
* "buffer" vector allows to re-use memory over successive readings. Note that in order to use
* this method, a callback must have been set through the setQueryCallback method. If not, this
* method will throw an exception.
*
* The "rel" argument governs how the search is performed in local sensor caches: if set to true,
* startTs and endTs indicate relative offsets against the most recent reading, and the returned
* vector is a view of the cache whose range is computed statically in O(1), and therefore the
* underlying data may be slightly unaligned depending on the sampling rate. If rel is set to
* false, startTs and endTs are interpreted as absolute timestamps, and the cache view is
* determined by performing binary search with O(log(n)) complexity, thus resulting in a accurate
* time range. This parameter does not affect the query method when using the Cassandra datastore.
*
* @param name Name of the sensor to be queried
* @param startTs Start timestamp (in nanoseconds) of the time range for the query
......
......@@ -5,6 +5,7 @@ CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-function -Wno-unused-local-typed
OBJS = ../common/src/logging.o \
../analytics/AnalyticsManager.o \
../common/src/sensornavigator.o \
../common/src/globalconfiguration.o \
analyticscontroller.o \
sensorcache.o \
collectagent.o \
......@@ -12,7 +13,9 @@ OBJS = ../common/src/logging.o \
simplemqttserver.o \
simplemqttserverthread.o \
simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -L../lib -ldcdb -ldl -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_log_setup -lboost_log -lboost_regex -lcppnetlib-server-parsers -lcppnetlib-uri
LIBS = -L../lib -L$(DCDBDEPLOYPATH)/lib -ldcdb -ldl -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_log_setup -lboost_log -lboost_regex -lcppnetlib-server-parsers -lcppnetlib-uri
TARGET = collectagent
.PHONY : clean install
......
......@@ -14,14 +14,22 @@ void AnalyticsController::stop() {
_keepRunning = false;
LOG(info) << "Stopping sensors...";
_manager->stop();
_manager->clear();
LOG(info) << "Stopping data analytics management thread...";
_mainThread.join();
LOG(info) << "Stopping worker threads...";
_keepAliveWork.reset();
_threads.join_all();
_initialized = false;
}
bool AnalyticsController::initialize(globalCA_t& settings, const string& configPath) {
restResponse_t AnalyticsController::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method) {
if(_initialized)
throw runtime_error("Cannot forward REST command, AnalyticsController is not initialized!");
return _manager->REST(pathStrs, queries, method, _io);
}
bool AnalyticsController::initialize(Configuration& settings, const string& configPath) {
_settings = settings;
_configPath = configPath;
_navigator = make_shared<SensorNavigator>();
......@@ -42,7 +50,8 @@ bool AnalyticsController::initialize(globalCA_t& settings, const string& configP
// Building the sensor navigator
try {
_navigator->buildTree(_settings.hierarchy, &names, &topics);
_navigator->setFilter(_settings.analyticsSettings.filter);
_navigator->buildTree(_settings.analyticsSettings.hierarchy, &names, &topics);
} catch (const std::invalid_argument &e) {
LOG(error) << e.what();
LOG(error) << "Failed to build sensor hierarchy tree!";
......@@ -130,6 +139,7 @@ void AnalyticsController::run() {
}
bool AnalyticsController::publishSensors() {
// Performing auto-publish (if required) for the sensors instantiated by the data analytics framework
if(_settings.pluginSettings.sensorPattern=="")
return false;
......
......@@ -19,10 +19,24 @@
using namespace std;
/**
* Class implementing a wrapper around the AnalyticsManager
*
* This class provides a wrapper around many features required for the instantiation of the Analytics Manager - namely
* the instantiation of a SensorNavigator, the creation of a dedicated thread pool, and the insertion of generated
* sensor values into the Cassandra datastore. Most of these features were already available in DCDBPusher to handle
* regular sensors, but they had to be ported over to the CollectAgent.
*/
class AnalyticsController {
public:
/**
* @brief Class constructor
*
* @param dcdbCfg SensorConfig object to be used to retrieve sensor meta-data from Cassandra
* @param dcdbStore SensorDataStore object to be used to insert sensor readings into Cassandra
*/
AnalyticsController(DCDB::SensorConfig *dcdbCfg, DCDB::SensorDataStore *dcdbStore) {
_dcdbCfg = dcdbCfg;
_dcdbStore = dcdbStore;
......@@ -35,21 +49,112 @@ public:
_initialized = false;
}
/**
* @brief Class destructor
*/
~AnalyticsController() {}
/**
* @brief Starts the internal thread of the controller
*
* Initialization must have been performed already at this point.
*/
void start();
void stop();
// Initializes the sensor navigator and data analytics manager
bool initialize(globalCA_t& settings, const string& configPath);
/**
* @brief Stops the internal management thread
*
* This will also stop and join all threads in the BOOST ASIO pool.
*/
void stop();
/**
* @brief Initializes the data analytics infrastructure
*
* This method will build a Sensor Navigator by fecthing sensor names from the Cassandra datastore,
* and then create an AnalyticsManager object, which will take care of instantiating and
* preparing plugins.
*
* @param settings Settings class containing user-specified configuration parameters
* @param configPath Path to the configuration files for the data analytics framework
* @return True if successful, false otherwise
*/
bool initialize(Configuration& settings, const string& configPath);
/**
* @brief Sets the cache to be used for sensors
*
* This method allows to set the SensorCache object to be used to store sensor readings produced
* by the data analytics framework.
* @param cache The SensorCache object to be used as cache
*/
void setCache(SensorCache* cache) { _sensorCache = cache; }
bool isStopped() { return _halted; }
/**
* @brief Returns the status of the internal thread
*
* @return True if the controller is currently stopped, false otherwise
*/
bool isHalted() { return _halted; }
/**
* @brief Triggers a temporary halt of the internal management thread
*
* @param wait If set to true, the method returns only when the thread has stopped
*/
void halt(bool wait=false) {
_doHalt = true;
if(wait) while (!_halted) { sleep(1); }
}
/**
* @brief Resumes the internal management thread
*/
void resume() { _doHalt = false; }
/**
* @brief Returns the internal AnalyticsManager object
*
* @return A shared pointer to the internal AnalyticsManager object
*/
shared_ptr<AnalyticsManager> getManager() { return _manager; }
/**
* @brief Returns the internal SensorNavigator object
*
* @return A shared pointer to the internal SensorNavigator object
*/
shared_ptr<SensorNavigator> getNavigator() { return _navigator; }
/**
* @brief Returns the SensorCache object used to store readings
*
* @return A pointer to a SensorCache object
*/
SensorCache* getCache() { return _sensorCache; }
/**
* @brief Returns an insert counter for data analytics readings
*
* This counter keeps track of how many inserts were performed into the Cassandra datastore for
* data analytics-related operations since the last call to this method.
*
* @return An insert counter to the Cassandra datastore
*/
uint64_t getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; }
/**
* @brief Supply a REST command to the manager
*
* This method simply forwards the request to the internal AnalyticsManager.
*
* @param pathStrs resource path to be accessed
* @param queries vector of queries
* @param method Either GET or PUT
* @return Response as a <data, response> pair
*/
restResponse_t REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method);
private:
// Method implementing the main loop of the internal thread
......@@ -73,7 +178,7 @@ private:
// Internal data analytics manager object
shared_ptr<AnalyticsManager> _manager;
// Misc configuration attributes
globalCA_t _settings;
Configuration _settings;
string _configPath;
// Readings counter
uint64_t _readingCtr;
......
This diff is collapsed.
......@@ -4,146 +4,53 @@
#include "configuration.h"
using namespace std;
Configuration::Configuration(const std::string& cfgFilePath) :
_cfgFilePath(cfgFilePath) {
if (_cfgFilePath[_cfgFilePath.length()-1] != '/') {
_cfgFilePath.append("/");
}
//set default values for global variables
_global.daemonize = false;
_global.statistics = false;
_global.mqttListenAddress = string(LISTENHOST) + ":" + string(LISTENPORT);
_global.restListenAddress = string(RESTAPIHOST) + ":" + string(RESTAPIPORT);
_global.messageThreads = 128;
_global.messageSlots = 16;
_global.threads = 24;
_global.cleaningInterval = 86400;
_global.logLevelFile = boost::log::trivial::trace;
_global.logLevelCmd = boost::log::trivial::info;
_global.pluginSettings.cacheInterval = 900000;
_global.pluginSettings.tempdir = "./";
_global.pluginSettings.sensorPattern = "";
_global.pluginSettings.mqttPrefix = "";
_global.cassandraSettings.address = string(CASSANDRAHOST) + ":" + string(CASSANDRAPORT);
_global.cassandraSettings.username = "";
_global.cassandraSettings.password = "";
_global.cassandraSettings.ttl = 0;
_global.cassandraSettings.numThreadsIo = 1;
_global.cassandraSettings.queueSizeIo = 4096;
_global.cassandraSettings.coreConnPerHost = 1;
_global.cassandraSettings.maxConnPerHost = 2;
_global.cassandraSettings.maxConcRequests = 100;
_global.cassandraSettings.debugLog = false;
}
Configuration::~Configuration() {}
bool Configuration::readGlobal() {
//open file
std::string globalConfig = _cfgFilePath;
globalConfig.append("collectagent.conf");
boost::property_tree::iptree cfg;
//parse to property_tree
try {
boost::property_tree::read_info(globalConfig, cfg);
} catch (boost::property_tree::info_parser_error& e) {
LOG(error) << "Error when reading collectagent.conf: " << e.what();
bool Configuration::readAdditionalValues(boost::property_tree::iptree::value_type &global) {
// ----- READING ADDITIONAL GLOBAL SETTINGS -----
if (boost::iequals(global.first, "mqttListenAddress")) {
mqttListenHost = parseNetworkHost(global.second.data());
mqttListenPort = parseNetworkPort(global.second.data());
if(mqttListenPort=="") mqttListenPort = string(LISTENPORT);
} else if (boost::iequals(global.first, "cleaningInterval")) {
cleaningInterval = stoul(global.second.data());
} else if (boost::iequals(global.first, "messageThreads")) {
messageThreads = stoul(global.second.data());
} else if (boost::iequals(global.first, "messageSlots")) {
messageSlots = stoul(global.second.data());
} else {
return false;
}
return true;
}
//read global struct
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
if (boost::iequals(global.first, "mqttListenAddress")) {
_global.mqttListenAddress = global.second.data();
} else if (boost::iequals(global.first, "hierarchy")) {
_global.hierarchy = global.second.data();
} else if (boost::iequals(global.first, "cleaningInterval")) {
_global.cleaningInterval = stoul(global.second.data());
} else if (boost::iequals(global.first, "messageThreads")) {
_global.messageThreads = stoul(global.second.data());
} else if (boost::iequals(global.first, "messageSlots")) {
_global.messageSlots = stoul(global.second.data());
} else if (boost::iequals(global.first, "cacheInterval")) {
_global.pluginSettings.cacheInterval = stoul(global.second.data()) * 1000;
} else if (boost::iequals(global.first, "verbosity")) {
_global.logLevelFile = translateLogLevel(stoi(global.second.data()));
} else if (boost::iequals(global.first, "threads")) {
_global.threads = stoi(global.second.data());
} else if (boost::iequals(global.first, "tempDir")) {
_global.pluginSettings.tempdir = global.second.data();
if (_global.pluginSettings.tempdir[_global.pluginSettings.tempdir.length()-1] != '/') {
_global.pluginSettings.tempdir.append("/");
void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
// ----- READING CASSANDRA SETTINGS -----
if(cfg.find("cassandra") != cfg.not_found()) {
BOOST_FOREACH(boost::property_tree::iptree::value_type & global, cfg.get_child("cassandra")) {
if (boost::iequals(global.first, "address")) {
cassandraSettings.host = parseNetworkHost(global.second.data());
cassandraSettings.port = parseNetworkPort(global.second.data());
if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
} else if (boost::iequals(global.first, "username")) {