Commit 31cd6276 authored by Alessio Netti's avatar Alessio Netti
Browse files

Configuration support for CollectAgent

- The collectagent is now configurable, like dcdbpusher
- Available config options can be seen in the sample collectagent.conf file
- Supplying a path containing config files is mandatory, like dcdbpusher;
this may change in the future
- Please do a make all && make install from the root of the repo to update
scripts and directories that were modified to add config support
parent 6ac6d9ee
include ../config.mk
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../include/ -I../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/cxxopts/src -DASIO_HEADER_ONLY -DBOOST_TEST_DYN_LINK
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../include/ -I../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/cxxopts/src -DASIO_HEADER_ONLY -DBOOST_TEST_DYN_LINK -DBOOST_LOG_DYN_LINK
OBJS = collectagent.o \
configuration.o \
sensorcache.o \
simplemqttserver.o \
simplemqttserverthread.o \
simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -L../lib -ldcdb -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_regex -lcppnetlib-server-parsers -lcppnetlib-uri
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -L../lib -ldcdb -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_log_setup -lboost_log -lboost_regex -lcppnetlib-server-parsers -lcppnetlib-uri
TARGET = collectagent
.PHONY : clean install
......@@ -25,5 +26,8 @@ clean:
rm -f $(TARGET)
rm -f $(OBJS)
install: $(TARGET)
install_conf: config/collectagent.conf
install -m 644 $^ $(DCDBDEPLOYPATH)/etc/
install: $(TARGET) install_conf
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
......@@ -39,6 +39,7 @@
#include <dcdb/sensordatastore.h>
#include <dcdb/sensorconfig.h>
#include "configuration.h"
#include "simplemqttserver.h"
#include "messaging.h"
#include "abrt.h"
......@@ -51,14 +52,6 @@
using namespace std;
#define LISTENHOST "localhost"
#define LISTENPORT "1883"
#define CASSANDRAHOST "127.0.0.1"
#define CASSANDRAPORT "9042"
#define RESTAPIHOST "0.0.0.0"
#define RESTAPIPORT "8080"
#define TTL "0"
int keepRunning;
bool statistics;
uint64_t msgCtr;
......@@ -232,25 +225,27 @@ int mqttCallback(SimpleMQTTMessage *msg)
* Print usage information
*/
void usage() {
globalCA_t& defaults = Configuration("").getGlobal();
/*
1 2 3 4 5 6 7 8
012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
cout << "Usage:" << endl;
cout << " collectagent [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-d] [-s]" << endl;
cout << " collectagent [-m<host>] [-r<host>] [-c<host>] [-C<interval>] [-u<username>] [-p<password>] [-t<ttl>] [-d] [-s] <path/to/configfiles/>" << endl;
cout << " collectagent -h" << endl;
cout << endl;
cout << "Options:" << endl;
cout << " -m<host> MQTT listen address [default: " << LISTENHOST << ":" << LISTENPORT << "]" << endl;
cout << " -r<host> REST API listen address [default: " << RESTAPIHOST << ":" << RESTAPIPORT << "]" << endl;
cout << " -c<host> Cassandra host [default: " << CASSANDRAHOST << ":" << CASSANDRAPORT << "]" << endl;
cout << " -m<host> MQTT listen address [default: " << defaults.mqttListenAddress << "]" << endl;
cout << " -r<host> REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
cout << " -c<host> Cassandra host [default: " << defaults.cassandraSettings.address << "]" << endl;
cout << " -C<interval> Cache interval in [s] [default: " << defaults.cacheInterval << "]" << endl;
cout << " -u<username> Cassandra username [default: none]" << endl;
cout << " -p<password> Cassandra password [default: none]" << endl;
cout << " -t<ttl> Cassandra insert TTL [default: " << TTL << "]" << endl;
cout << " -t<ttl> Cassandra insert TTL [default: " << defaults.cassandraSettings.ttl << "]" << endl;
cout << endl;
cout << " -d Daemonize" << endl;
cout << " -s Print message statistics" << endl;
cout << " -s Print message stats" <<endl;
cout << " -h This help page" << endl;
cout << endl;
}
......@@ -258,46 +253,65 @@ void usage() {
int main(int argc, char* const argv[]) {
try{
/*
* Catch SIGINT signals to allow for proper server shutdowns.
*/
signal(SIGINT, sigHandler);
/*
* Catch critical signals to allow for backtraces
*/
signal(SIGABRT, abrtHandler);
signal(SIGSEGV, abrtHandler);
signal(SIGTERM, abrtHandler);
// Checking if path to config is supplied
if (argc <= 1) {
cout << "Please specify a path to the config-directory" << endl << endl;
usage();
exit(EXIT_FAILURE);
}
// Defining options
const char* opts = "m:r:c:C:u:p:t:dDsh";
// Same mechanism as in DCDBPusher - checking if help string is requested before loading config
char ret;
while ((ret = getopt(argc, argv, opts)) != -1) {
switch (ret)
{
case 'h':
usage();
exit(EXIT_FAILURE);
break;
default:
//do nothing (other options are read later on)
break;
}
}
Configuration config(argv[argc - 1]);
if( !config.readGlobal() ) {
cout << "Failed to read global configuration!" << endl;
exit(EXIT_FAILURE);
}
globalCA_t& settings = config.getGlobal();
/* Parse command line */
int ret;
std::string listenHost, cassandraHost, restApiHost, ttl;
std::string cassandraUser, cassandraPassword;
std::string listenHost, cassandraHost, restApiHost;
std::string listenPort, cassandraPort, restApiPort;
listenHost = LISTENHOST;
cassandraHost = CASSANDRAHOST;
restApiHost = RESTAPIHOST;
ttl = "0";
statistics = false;
while ((ret=getopt(argc, argv, "m:r:c:u:p:t:dDsh"))!=-1) {
optind = 1;
while ((ret=getopt(argc, argv, opts))!=-1) {
switch(ret) {
case 'm':
listenHost = optarg;
settings.mqttListenAddress = optarg;
break;
case 'r':
restApiHost = optarg;
settings.restListenAddress = optarg;
break;
case 'c':
cassandraHost = optarg;
settings.cassandraSettings.address = optarg;
break;
case 'C':
settings.cacheInterval = stoul(optarg);
break;
case 'u':
cassandraUser = optarg;
settings.cassandraSettings.username = optarg;
break;
case 'p': {
cassandraPassword = optarg;
settings.cassandraSettings.password = optarg;
// What does this do? Mask the password?
size_t pwdLen = strlen(optarg);
memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
if (pwdLen > 3) {
......@@ -306,14 +320,14 @@ int main(int argc, char* const argv[]) {
break;
}
case 't':
ttl = optarg;
settings.cassandraSettings.ttl = stoul(optarg);
break;
case 'd':
case 'D':
dcdbdaemon();
settings.daemonize = 1;
break;
case 's':
statistics = true;
settings.statistics = 1;
break;
case 'h':
default:
......@@ -322,9 +336,26 @@ int main(int argc, char* const argv[]) {
}
}
/*
* Catch SIGINT signals to allow for proper server shutdowns.
*/
signal(SIGINT, sigHandler);
/*
* Catch critical signals to allow for backtraces
*/
signal(SIGABRT, abrtHandler);
signal(SIGSEGV, abrtHandler);
signal(SIGTERM, abrtHandler);
// Daemonizing the collectagent
if(settings.daemonize)
dcdbdaemon();
/*
* Parse hostnames for port specifications
*/
listenHost = string(settings.mqttListenAddress);
size_t pos = listenHost.find(":");
if (pos != string::npos) {
listenPort = listenHost.substr(pos+1);
......@@ -332,6 +363,8 @@ int main(int argc, char* const argv[]) {
} else {
listenPort = LISTENPORT;
}
cassandraHost = string(settings.cassandraSettings.address);
pos = cassandraHost.find(":");
if (pos != string::npos) {
cassandraPort = cassandraHost.substr(pos+1);
......@@ -339,6 +372,8 @@ int main(int argc, char* const argv[]) {
} else {
cassandraPort = CASSANDRAPORT;
}
restApiHost = string(settings.restListenAddress);
pos = restApiHost.find(":");
if (pos != string::npos) {
restApiPort = restApiHost.substr(pos+1);
......@@ -347,11 +382,11 @@ int main(int argc, char* const argv[]) {
restApiPort = RESTAPIPORT;
}
// Setting the size of the sensor cache
mySensorCache.setMaxHistory(settings.cacheInterval * 1000000000);
/*
* Allocate and initialize connection to Cassandra.
*/
dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword);
//Allocate and initialize connection to Cassandra.
dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
if (!dcdbConn->connect()) {
std::cout << "Cannot connect to Cassandra!" << std::endl;
......@@ -372,20 +407,13 @@ int main(int argc, char* const argv[]) {
/*
* Set TTL for data store inserts if TTL > 0.
*/
uint64_t ttlInt;
std::istringstream ttlParser(ttl);
if (!(ttlParser >> ttlInt)) {
std::cout << "Invalid TTL!" << std::endl;
exit(EXIT_FAILURE);
}
if (ttlInt) {
mySensorDataStore->setTTL(ttlInt);
}
if (settings.cassandraSettings.ttl > 0)
mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
/*
* Start the MQTT Message Server.
*/
SimpleMQTTServer ms(listenHost, listenPort);
SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
ms.setMessageCallback(mqttCallback);
ms.start();
......@@ -423,7 +451,7 @@ int main(int argc, char* const argv[]) {
elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
if (statistics) {
if (settings.statistics) {
cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
}
msgCtr = 0;
......
global {
mqttListenAddress 127.0.0.1:1883
messageThreads 128
messageSlots 16
cacheInterval 120
daemonize false
statistics true
}
restAPI {
address 127.0.0.1:8080
}
cassandra {
address 127.0.0.1:9042
username
password
ttl 0
}
//
// Created by Netti, Alessio on 30.11.18.
//
#include "configuration.h"
using namespace std;
Configuration::Configuration(const std::string& cfgFilePath) :
_cfgFilePath(cfgFilePath) {
if (_cfgFilePath[_cfgFilePath.length()-1] != '/') {
_cfgFilePath.append("/");
}
//set default values for global variables
_global.daemonize = false;
_global.statistics = false;
_global.mqttListenAddress = string(LISTENHOST) + ":" + string(LISTENPORT);
_global.restListenAddress = string(RESTAPIHOST) + ":" + string(RESTAPIPORT);
_global.messageThreads = 128;
_global.messageSlots = 16;
_global.cacheInterval = 900;
_global.logLevelFile = boost::log::trivial::trace;
_global.logLevelCmd = boost::log::trivial::info;
_global.cassandraSettings.address = string(CASSANDRAHOST) + ":" + string(CASSANDRAPORT);
_global.cassandraSettings.username = "";
_global.cassandraSettings.password = "";
_global.cassandraSettings.ttl = 0;
}
Configuration::~Configuration() {}
bool Configuration::readGlobal() {
//open file
std::string globalConfig = _cfgFilePath;
globalConfig.append("collectagent.conf");
boost::property_tree::iptree cfg;
//parse to property_tree
try {
boost::property_tree::read_info(globalConfig, cfg);
} catch (boost::property_tree::info_parser_error& e) {
cout << "Error when reading collectagent.conf: " << e.what() << endl;
//LOG(error) << "Error when reading collectagent.conf: " << e.what();
return false;
}
//read global struct
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
if (boost::iequals(global.first, "mqttListenAddress")) {
_global.mqttListenAddress = global.second.data();
} else if (boost::iequals(global.first, "messageThreads")) {
_global.messageThreads = stoul(global.second.data());
} else if (boost::iequals(global.first, "messageSlots")) {
_global.messageSlots = stoul(global.second.data());
} else if (boost::iequals(global.first, "cacheInterval")) {
_global.cacheInterval = stoul(global.second.data());
} else if (boost::iequals(global.first, "verbosity")) {
_global.logLevelFile = translateLogLevel(stoi(global.second.data()));
} else if (boost::iequals(global.first, "daemonize")) {
_global.daemonize = global.second.data() == "true";
} else if (boost::iequals(global.first, "statistics")) {
_global.statistics = global.second.data() == "true";
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting" << endl;
//LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
}
//read cassandra struct
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("cassandra")) {
if (boost::iequals(global.first, "address")) {
_global.cassandraSettings.address = global.second.data();
} else if (boost::iequals(global.first, "username")) {
_global.cassandraSettings.username = global.second.data();
} else if (boost::iequals(global.first, "password")) {
_global.cassandraSettings.password = global.second.data();
} else if (boost::iequals(global.first, "ttl")) {
_global.cassandraSettings.ttl = stoul(global.second.data());
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting" << endl;
//LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
}
//read restAPI struct
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("restAPI")) {
if (boost::iequals(global.first, "address")) {
_global.restListenAddress = global.second.data();
//} else if (boost::iequals(global.first, "certificate")) {
// _global.restAPISettings.certificate = global.second.data();
//} else if (boost::iequals(global.first, "privateKey")) {
// _global.restAPISettings.privateKey = global.second.data();
//} else if (boost::iequals(global.first, "dhFile")) {
// _global.restAPISettings.dhFile = global.second.data();
//} else if (boost::iequals(global.first, "authkey")) {
// //Avoid unnecessary "Value not recognized" message
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting" << endl;
//LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
}
return true;
}
boost::log::trivial::severity_level Configuration::translateLogLevel(int logLevel) {
switch (logLevel) {
case 0:
return boost::log::trivial::fatal;
break;
case 1:
return boost::log::trivial::error;
break;
case 2:
return boost::log::trivial::warning;
break;
case 3:
return boost::log::trivial::info;
break;
case 4:
return boost::log::trivial::debug;
break;
case 5:
return boost::log::trivial::trace;
break;
default:
return boost::log::trivial::info;
break;
}
}
globalCA_t& Configuration::getGlobal() {
return _global;
}
//
// Created by Netti, Alessio on 30.11.18.
//
#ifndef CONFIGURATION_H_
#define CONFIGURATION_H_
#include <boost/property_tree/ptree.hpp>
#include <string>
#include <unistd.h>
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>
//TODO: move somewhere else
#include <boost/log/trivial.hpp>
#include <boost/log/sources/severity_logger.hpp>
// TODO: remove
#include <iostream>
#define LISTENHOST "localhost"
#define LISTENPORT "1883"
#define CASSANDRAHOST "127.0.0.1"
#define CASSANDRAPORT "9042"
#define RESTAPIHOST "0.0.0.0"
#define RESTAPIPORT "8080"
typedef struct {
std::string address;
std::string username;
std::string password;
uint32_t ttl;
} cassandra_t;
typedef struct {
bool daemonize;
bool statistics;
std::string mqttListenAddress;
std::string restListenAddress;
uint64_t messageThreads;
uint64_t messageSlots;
uint64_t cacheInterval;
cassandra_t cassandraSettings;
boost::log::trivial::severity_level logLevelFile;
boost::log::trivial::severity_level logLevelCmd;
} globalCA_t;
/**
* Class responsible of reading the collectagent global configuration.
*/
class Configuration {
public:
/**
* Create new Configuration. Sets global config file to read from to cfgFile.
* @param cfgFilePath Path to where all config-files are located
*/
Configuration(const std::string& cfgFilePath);
virtual ~Configuration();
/**
* Reads the global values from collectagent.conf (located at _cfgFilePath).
* Sets _global correspondingly.
*
* @return true on success, false otherwise
*/
bool readGlobal();
/**
* Translate numeric verbosity level to boost::log severity level
* @param logLevel The numeric verbosity level
* @return The boost::log severity level
*/
boost::log::trivial::severity_level translateLogLevel(int logLevel);
/**
* Return global settings
*/
globalCA_t& getGlobal();
private:
std::string _cfgFilePath;
globalCA_t _global;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
#endif /* CONFIGURATION_H_ */
......@@ -15,9 +15,9 @@
namespace DCDB {
SensorCache::SensorCache() {
SensorCache::SensorCache(uint64_t maxHistory) {
// TODO Auto-generated constructor stub
this->_maxHistory = maxHistory;
}
SensorCache::~SensorCache() {
......@@ -31,7 +31,8 @@ void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
sid.setRsvd(0);
sensorCache_t::iterator it = sensorCache.find(sid);
if (it != sensorCache.end()) {
if (it->second.size() && (it->second.front().timestamp+MAX_HISTORY_NS <= ts)) {
// If the sensor cache's size is reduced at runtime, the while loop makes sure to resize everything nicely
while (it->second.size() && (it->second.front().timestamp+_maxHistory <= ts)) {
it->second.pop_front();
}
it->second.push_back(s);
......
......@@ -13,8 +13,6 @@
#include <utility>
#include <dcdb/sensorid.h>
#define MAX_HISTORY_NS 60000000000 // Store max 60s of historic data
namespace DCDB {
typedef struct {
......@@ -26,7 +24,7 @@ typedef std::map<SensorId, cacheEntry_t> sensorCache_t;
class SensorCache {
public:
SensorCache();
SensorCache(uint64_t maxHistory=60000000000);
virtual ~SensorCache();
/**
......@@ -61,10 +59,26 @@ public:
*/
void dump();
/**
* @brief Set a new maximum cache length.
*
* @param maxHistory: new sensor cache length value.
*/
void setMaxHistory(uint64_t maxHistory) { this->_maxHistory = maxHistory; }
/**
* @brief Returns the current maximum sensor cache length
*
* @returns Current maximum sensor cache length
*/
uint64_t getMaxHistory() { return this->_maxHistory; }
private:
bool checkValid(cacheEntry_t &entry);
int64_t getAverage(cacheEntry_t &entry, uint64_t avg);
sensorCache_t sensorCache;
uint64_t _maxHistory;
};
......
......@@ -137,7 +137,7 @@ void SimpleMQTTServer::start()
* Start one accept thread per socket.
*/
for (unsigned int i=0; i<listenSockets.size(); i++)
acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i], messageCallback));
acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i], messageCallback, this->_maxThreads, this->_maxConnPerThread));
}