Commit 8b4b1738 authored by Alessio Netti's avatar Alessio Netti
Browse files

Configuration tweaks

- Parsing of host:port network addresses is handled by GlobalConfiguration
- -p dcdbpusher command line parameter (broker port) removed and integrated
into the -b argument
parent 0b344fc2
......@@ -443,8 +443,8 @@ void usage() {
cout << "Options:" << endl;
cout << " -a <string> Auto-publish pattern [default: none]" << endl;
cout << " -m<host> MQTT listen address [default: " << config.mqttListenAddress << "]" << endl;
cout << " -c<host> Cassandra host [default: " << config.cassandraSettings.address << "]" << endl;
cout << " -m<host> MQTT listen address [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
cout << " -c<host> Cassandra host [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
cout << " -u<username> Cassandra username [default: none]" << endl;
cout << " -p<password> Cassandra password [default: none]" << endl;
cout << " -t<ttl> Cassandra insert TTL [default: " << config.cassandraSettings.ttl << "]" << endl;
......@@ -503,11 +503,7 @@ int main(int argc, char* const argv[]) {
pluginSettings_t& pluginSettings = config.pluginSettings;
restAPISettings_t& restAPISettings = config.restAPISettings;
analyticsSettings_t& analyticsSettings = config.analyticsSettings;
/* Parse command line */
std::string listenHost, cassandraHost;
std::string listenPort, cassandraPort;
optind = 1;
while ((ret=getopt(argc, argv, opts))!=-1) {
switch(ret) {
......@@ -515,10 +511,14 @@ int main(int argc, char* const argv[]) {
pluginSettings.sensorPattern = optarg;
break;
case 'm':
settings.mqttListenAddress = optarg;
settings.mqttListenHost = parseNetworkHost(optarg);
settings.mqttListenPort = parseNetworkPort(optarg);
if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
break;
case 'c':
cassandraSettings.address = optarg;
cassandraSettings.host = parseNetworkHost(optarg);
cassandraSettings.port = parseNetworkPort(optarg);
if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
break;
case 'u':
cassandraSettings.username = optarg;
......@@ -576,40 +576,19 @@ int main(int argc, char* const argv[]) {
// Daemonizing the collectagent
if(settings.daemonize)
dcdbdaemon();
/*
* Parse hostnames for port specifications
*/
listenHost = string(settings.mqttListenAddress);
size_t pos = listenHost.find(":");
if (pos != string::npos) {
listenPort = listenHost.substr(pos+1);
listenHost.erase(pos);
} else {
listenPort = LISTENPORT;
}
cassandraHost = string(cassandraSettings.address);
pos = cassandraHost.find(":");
if (pos != string::npos) {
cassandraPort = cassandraHost.substr(pos+1);
cassandraHost.erase(pos);
} else {
cassandraPort = CASSANDRAPORT;
}
// Setting the size of the sensor cache
// Conversion from milliseconds to nanoseconds
mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
//Allocate and initialize connection to Cassandra.
dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), cassandraSettings.username, cassandraSettings.password);
dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()),
cassandraSettings.username, cassandraSettings.password);
dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
uint32_t params[3] = {cassandraSettings.coreConnPerHost, cassandraSettings.maxConnPerHost, cassandraSettings.maxConcRequests};
dcdbConn->setBackendParams(params);
if (!dcdbConn->connect()) {
LOG(fatal) << "Cannot connect to Cassandra!";
exit(EXIT_FAILURE);
......@@ -619,8 +598,7 @@ int main(int argc, char* const argv[]) {
* Legacy behavior: Initialize the DCDB schema in Cassandra.
*/
dcdbConn->initSchema();
/*
* Allocate the SensorDataStore.
*/
......@@ -645,7 +623,7 @@ int main(int argc, char* const argv[]) {
//print global settings in either case
LOG(info) << "Global Settings:";
LOG(info) << " MQTT-listenAddress: " << settings.mqttListenAddress;
LOG(info) << " MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
LOG(info) << " CacheInterval: " << int(pluginSettings.cacheInterval/1000) << " [s]";
LOG(info) << " CleaningInterval: " << settings.cleaningInterval << " [s]";
LOG(info) << " MessageThreads: " << settings.messageThreads;
......@@ -660,7 +638,7 @@ int main(int argc, char* const argv[]) {
LOG(info) << " Hierarchy: " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
LOG(info) << "Cassandra Driver Settings:";
LOG(info) << " Address: " << cassandraSettings.address;
LOG(info) << " Address: " << cassandraSettings.host << ":" << cassandraSettings.port;
LOG(info) << " TTL: " << cassandraSettings.ttl;
LOG(info) << " NumThreadsIO: " << cassandraSettings.numThreadsIo;
LOG(info) << " QueueSizeIO: " << cassandraSettings.queueSizeIo;
......@@ -700,7 +678,7 @@ int main(int argc, char* const argv[]) {
/*
* Start the MQTT Message Server.
*/
SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
ms.setMessageCallback(mqttCallback);
ms.start();
......
......@@ -7,7 +7,9 @@
bool Configuration::readAdditionalValues(boost::property_tree::iptree::value_type &global) {
// ----- READING ADDITIONAL GLOBAL SETTINGS -----
if (boost::iequals(global.first, "mqttListenAddress")) {
mqttListenAddress = global.second.data();
mqttListenHost = parseNetworkHost(global.second.data());
mqttListenPort = parseNetworkPort(global.second.data());
if(mqttListenPort=="") mqttListenPort = string(LISTENPORT);
} else if (boost::iequals(global.first, "cleaningInterval")) {
cleaningInterval = stoul(global.second.data());
} else if (boost::iequals(global.first, "messageThreads")) {
......@@ -25,7 +27,9 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
if(cfg.find("cassandra") != cfg.not_found()) {
BOOST_FOREACH(boost::property_tree::iptree::value_type & global, cfg.get_child("cassandra")) {
if (boost::iequals(global.first, "address")) {
cassandraSettings.address = global.second.data();
cassandraSettings.host = parseNetworkHost(global.second.data());
cassandraSettings.port = parseNetworkPort(global.second.data());
if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
} else if (boost::iequals(global.first, "username")) {
cassandraSettings.username = global.second.data();
} else if (boost::iequals(global.first, "password")) {
......
......@@ -24,7 +24,8 @@ using namespace std;
class cassandra_t {
public:
cassandra_t() {}
std::string address = string(CASSANDRAHOST) + ":" + string(CASSANDRAPORT);
std::string host = string(CASSANDRAHOST);
std::string port = string(CASSANDRAPORT);
std::string username = "";
std::string password = "";
uint32_t ttl = 0;
......@@ -58,7 +59,8 @@ public:
virtual ~Configuration() {}
// Additional configuration parameters to be parsed within the global block
std::string mqttListenAddress = string(LISTENHOST) + ":" + string(LISTENPORT);
std::string mqttListenHost = string(LISTENHOST);
std::string mqttListenPort = string(LISTENPORT);
uint64_t cleaningInterval = 86400;
uint64_t messageThreads = 128;
uint64_t messageSlots = 16;
......
......@@ -40,9 +40,43 @@ public:
std::string hierarchy = "";
};
// Global bool conversion method
static bool to_bool(const std::string& s) { return s=="true" || s=="on"; }
/**
* @brief Parses a host:port string and returns the host
*
* @param str The input string to be parsed
* @return The host-related part of the string
*/
static std::string parseNetworkHost(std::string str) {
size_t pos = str.find(":");
if (pos != std::string::npos)
str.erase(pos);
return str;
}
/**
* @brief Parses a host:port string and returns the port
*
* @param str The input string to be parsed
* @return The port-related part of the string
*/
static std::string parseNetworkPort(std::string str) {
size_t pos = str.find(":");
if (pos != std::string::npos)
str = str.substr(pos+1);
else
str = "";
return str;
}
/**
* @brief Converts the input string to a boolean
*
* @param s The input string
* @return The corresponding boolean value
*/
static bool to_bool(const std::string& s) {
return s=="true" || s=="on";
}
/**
* This class contains the logic to parse and store configurations in the form of BOOST INFO trees. The
......@@ -73,7 +107,7 @@ public:
* @return True if successful, false otherwise
*/
bool readConfig();
// Global configuration members directly accessible from the class
bool validateConfig = false;
bool daemonize = false;
......
......@@ -91,4 +91,4 @@ bool GlobalConfiguration::readConfig() {
readAdditionalBlocks(cfg);
return true;
}
\ No newline at end of file
}
......@@ -27,12 +27,8 @@ Configuration::~Configuration() {
bool Configuration::readAdditionalValues(boost::property_tree::iptree::value_type &global) {
// ----- READING ADDITIONAL GLOBAL SETTINGS -----
if (boost::iequals(global.first, "mqttBroker")) {
brokerHost = global.second.data();
size_t pos = brokerHost.find(":");
if (pos != string::npos) {
brokerPort = stoi(brokerHost.substr(pos+1));
brokerHost.erase(pos);
}
brokerHost = parseNetworkHost(global.second.data());
brokerPort = parseNetworkPort(global.second.data())=="" ? BROKERPORT : stoi(parseNetworkPort(global.second.data()));
} else if (boost::iequals(global.first, "qosLevel")) {
qosLevel = stoi(global.second.data());
if(qosLevel>2 || qosLevel<0)
......
......@@ -15,6 +15,9 @@
#include "includes/PluginDefinitions.h"
#include "mqttchecker.h"
#define BROKERPORT 1883
#define BROKERHOST "127.0.0.1"
/**
* Class responsible of reading the global configuration as well as loading and invoking required dynamic libraries.
*/
......@@ -58,8 +61,8 @@ public:
int qosLevel = 1;
unsigned int maxInflightMsgNum = 20;
unsigned int maxQueuedMsgNum = 0;
int brokerPort = 1883;
std::string brokerHost = "";
int brokerPort = BROKERPORT;
std::string brokerHost = BROKERHOST;
int maxMsgNum = 0;
protected:
......
......@@ -128,14 +128,13 @@ void printSyntax()
*/
_configuration = new Configuration("", "dcdbpusher.conf");
cout << "Usage:" << endl;
cout << " dcdbpusher [-d] [-x] [-a<string>] [-b<host>] [-p<port>] [-m<string>] [-w<path>] [-v<level>] <path/to/configfiles/>" << endl;
cout << " dcdbpusher [-d] [-x] [-a<string>] [-b<host>] [-m<string>] [-w<path>] [-v<level>] <path/to/configfiles/>" << endl;
cout << " dcdbpusher -h" << endl;
cout << endl;
cout << "Options:" << endl;
cout << " -a <string> Auto-publish pattern [default: none]" << endl;
cout << " -b <host> MQTT broker [default: none]" << endl;
cout << " -p <port> MQTT broker port [default: " << _configuration->brokerPort << "]" << endl;
cout << " -b <host> MQTT broker [default: " << _configuration->brokerHost << ":" << _configuration->brokerPort << "]" << endl;
cout << " -m <string> MQTT topic prefix [default: none]" << endl;
cout << " -w <path> Writable temp dir [default: .]" << endl;
cout << " -v <level> Set verbosity of output [default: " << _configuration->logLevelCmd << "]" << endl
......@@ -213,10 +212,8 @@ int main(int argc, char** argv) {
pluginSettings.sensorPattern = optarg;
break;
case 'b':
globalSettings.brokerHost = optarg;
break;
case 'p':
globalSettings.brokerPort = atoi(optarg);
globalSettings.brokerHost = parseNetworkHost(optarg);
globalSettings.brokerPort = parseNetworkPort(optarg)=="" ? BROKERPORT : stoi(parseNetworkPort(optarg));
break;
case 'm':
pluginSettings.mqttPrefix = optarg;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment