Commit 3f7206ed authored by Micha Mueller's avatar Micha Mueller
Browse files

Merge branch 'master' into slurmJobdata

parents f066148f 0abfe1f6
include ../config.mk include ../config.mk
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../include/ -I../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/cxxopts/src -DASIO_HEADER_ONLY -DBOOST_TEST_DYN_LINK CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../include/ -I../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_LOG_DYN_LINK -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
OBJS = collectagent.o \ OBJS = collectagent.o \
configuration.o \
logging.o \
sensorcache.o \ sensorcache.o \
simplemqttserver.o \ simplemqttserver.o \
simplemqttserverthread.o \ simplemqttserverthread.o \
simplemqttservermessage.o simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -L../lib -ldcdb -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_regex -lcppnetlib-server-parsers -lcppnetlib-uri LIBS = -L$(DCDBDEPLOYPATH)/lib/ -L../lib -ldcdb -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_log_setup -lboost_log -lboost_regex -lcppnetlib-server-parsers -lcppnetlib-uri
TARGET = collectagent TARGET = collectagent
.PHONY : clean install .PHONY : clean install
...@@ -25,5 +27,8 @@ clean: ...@@ -25,5 +27,8 @@ clean:
rm -f $(TARGET) rm -f $(TARGET)
rm -f $(OBJS) rm -f $(OBJS)
install: $(TARGET) install_conf: config/collectagent.conf
install -m 644 $^ $(DCDBDEPLOYPATH)/etc/
install: $(TARGET) install_conf
install $(TARGET) $(DCDBDEPLOYPATH)/bin/ install $(TARGET) $(DCDBDEPLOYPATH)/bin/
...@@ -38,7 +38,9 @@ ...@@ -38,7 +38,9 @@
#include <dcdb/connection.h> #include <dcdb/connection.h>
#include <dcdb/sensordatastore.h> #include <dcdb/sensordatastore.h>
#include <dcdb/sensorconfig.h> #include <dcdb/sensorconfig.h>
#include <dcdb/version.h>
#include "configuration.h"
#include "simplemqttserver.h" #include "simplemqttserver.h"
#include "messaging.h" #include "messaging.h"
#include "abrt.h" #include "abrt.h"
...@@ -51,23 +53,17 @@ ...@@ -51,23 +53,17 @@
using namespace std; using namespace std;
#define LISTENHOST "localhost"
#define LISTENPORT "1883"
#define CASSANDRAHOST "127.0.0.1"
#define CASSANDRAPORT "9042"
#define RESTAPIHOST "0.0.0.0"
#define RESTAPIPORT "8080"
#define TTL "0"
int keepRunning; int keepRunning;
bool statistics; bool statistics;
uint64_t msgCtr; uint64_t msgCtr;
uint64_t pmsgCtr; uint64_t pmsgCtr;
uint64_t readingCtr;
DCDB::Connection* dcdbConn; DCDB::Connection* dcdbConn;
DCDB::SensorDataStore *mySensorDataStore; DCDB::SensorDataStore *mySensorDataStore;
DCDB::SensorConfig *mySensorConfig; DCDB::SensorConfig *mySensorConfig;
DCDB::SensorCache mySensorCache; DCDB::SensorCache mySensorCache;
DCDB::SCError err; DCDB::SCError err;
DCDBLog::logger_t lg;
/* Normal termination (SIGINT, CTRL+C) */ /* Normal termination (SIGINT, CTRL+C) */
void sigHandler(int sig) void sigHandler(int sig)
...@@ -96,7 +92,7 @@ struct httpHandler_t { ...@@ -96,7 +92,7 @@ struct httpHandler_t {
boost::network::uri::query_map(uri, queries); boost::network::uri::query_map(uri, queries);
int avg = atoi(queries.find("avg")->second.c_str()); int avg = atoi(queries.find("avg")->second.c_str());
uint64_t val = mySensorCache.getSensor(uri.path(), (uint64_t) avg); int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t) avg);
data << val << "\n"; data << val << "\n";
//data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl; //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
...@@ -143,9 +139,9 @@ int mqttCallback(SimpleMQTTMessage *msg) ...@@ -143,9 +139,9 @@ int mqttCallback(SimpleMQTTMessage *msg)
// We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
// sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
// We use strncmp as it is the most efficient way to do it // We use strncmp as it is the most efficient way to do it
if (strlen(topic) > DCDB_MAP_LEN && strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) { if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
if ((len = msg->getPayloadLength()) == 0) { if ((len = msg->getPayloadLength()) == 0) {
cout << "Empty topic-to-name mapping message received\n"; LOG(error) << "Empty topic-to-name mapping message received";
return 1; return 1;
} }
...@@ -155,13 +151,13 @@ int mqttCallback(SimpleMQTTMessage *msg) ...@@ -155,13 +151,13 @@ int mqttCallback(SimpleMQTTMessage *msg)
// PublishSensor does most of the error checking for us // PublishSensor does most of the error checking for us
switch (err) { switch (err) {
case DCDB::SC_INVALIDPATTERN: case DCDB::SC_INVALIDPATTERN:
std::cout << "Invalid sensor topic : " << msg->getTopic() << std::endl; LOG(error) << "Invalid sensor topic : " << msg->getTopic();
return 1; return 1;
case DCDB::SC_INVALIDPUBLICNAME: case DCDB::SC_INVALIDPUBLICNAME:
std::cout << "Invalid sensor public name: " << sensorName << std::endl; LOG(error) << "Invalid sensor public name: " << sensorName;
return 1; return 1;
case DCDB::SC_INVALIDSESSION: case DCDB::SC_INVALIDSESSION:
std::cout << "Cannot reach sensor data store." << std::endl; LOG(error) << "Cannot reach sensor data store.";
return 1; return 1;
default: default:
break; break;
...@@ -173,7 +169,7 @@ int mqttCallback(SimpleMQTTMessage *msg) ...@@ -173,7 +169,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
//In the 64 bit message case, the collect agent provides a timestamp //In the 64 bit message case, the collect agent provides a timestamp
if (len == sizeof(uint64_t)) { if (len == sizeof(uint64_t)) {
payload = &buf; payload = &buf;
payload->value = *((uint64_t *) msg->getPayload()); payload->value = *((int64_t *) msg->getPayload());
payload->timestamp = Messaging::calculateTimestamp(); payload->timestamp = Messaging::calculateTimestamp();
len = sizeof(uint64_t) * 2; len = sizeof(uint64_t) * 2;
} }
...@@ -183,7 +179,7 @@ int mqttCallback(SimpleMQTTMessage *msg) ...@@ -183,7 +179,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
} }
//...otherwise this message is malformed -> ignore... //...otherwise this message is malformed -> ignore...
else { else {
cout << "Message malformed\n"; LOG(error) << "Message malformed";
return 1; return 1;
} }
...@@ -207,11 +203,15 @@ int mqttCallback(SimpleMQTTMessage *msg) ...@@ -207,11 +203,15 @@ int mqttCallback(SimpleMQTTMessage *msg)
} }
cout << endl; cout << endl;
#endif #endif
std::list<DCDB::SensorDataStoreReading> readings;
for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) { for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
mySensorDataStore->insert(&sid, payload[i].timestamp, payload[i].value); DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
readings.push_back(r);
mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value); mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
} }
mySensorDataStore->insertBatch(readings);
readingCtr+= readings.size();
//mySensorCache.dump(); //mySensorCache.dump();
} }
#if 1 #if 1
...@@ -232,72 +232,99 @@ int mqttCallback(SimpleMQTTMessage *msg) ...@@ -232,72 +232,99 @@ int mqttCallback(SimpleMQTTMessage *msg)
* Print usage information * Print usage information
*/ */
void usage() { void usage() {
globalCA_t& defaults = Configuration("").getGlobal();
/* /*
1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8
012345678901234567890123456789012345678901234567890123456789012345678901234567890 012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/ */
cout << "Usage:" << endl; cout << "Usage:" << endl;
cout << " collectagent [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-d] [-s]" << endl; cout << " collectagent [-m<host>] [-r<host>] [-c<host>] [-C<interval>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] [-d] [-s] <path/to/configfiles/>" << endl;
cout << " collectagent -h" << endl; cout << " collectagent -h" << endl;
cout << endl; cout << endl;
cout << "Options:" << endl; cout << "Options:" << endl;
cout << " -m<host> MQTT listen address [default: " << LISTENHOST << ":" << LISTENPORT << "]" << endl; cout << " -m<host> MQTT listen address [default: " << defaults.mqttListenAddress << "]" << endl;
cout << " -r<host> REST API listen address [default: " << RESTAPIHOST << ":" << RESTAPIPORT << "]" << endl; cout << " -r<host> REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
cout << " -c<host> Cassandra host [default: " << CASSANDRAHOST << ":" << CASSANDRAPORT << "]" << endl; cout << " -c<host> Cassandra host [default: " << defaults.cassandraSettings.address << "]" << endl;
cout << " -C<interval> Cache interval in [s] [default: " << defaults.cacheInterval << "]" << endl;
cout << " -u<username> Cassandra username [default: none]" << endl; cout << " -u<username> Cassandra username [default: none]" << endl;
cout << " -p<password> Cassandra password [default: none]" << endl; cout << " -p<password> Cassandra password [default: none]" << endl;
cout << " -t<ttl> Cassandra insert TTL [default: " << TTL << "]" << endl; cout << " -t<ttl> Cassandra insert TTL [default: " << defaults.cassandraSettings.ttl << "]" << endl;
cout << " -v<level> Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl
<< " Can be a number between 5 (all) and 0 (fatal)." << endl;
cout << endl; cout << endl;
cout << " -d Daemonize" << endl; cout << " -d Daemonize" << endl;
cout << " -s Print message statistics" << endl; cout << " -s Print message stats" <<endl;
cout << " -h This help page" << endl; cout << " -h This help page" << endl;
cout << endl; cout << endl;
} }
int main(int argc, char* const argv[]) { int main(int argc, char* const argv[]) {
cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
try{ try{
/*
* Catch SIGINT signals to allow for proper server shutdowns.
*/
signal(SIGINT, sigHandler);
/* // Checking if path to config is supplied
* Catch critical signals to allow for backtraces if (argc <= 1) {
*/ cout << "Please specify a path to the config-directory" << endl << endl;
signal(SIGABRT, abrtHandler); usage();
signal(SIGSEGV, abrtHandler); exit(EXIT_FAILURE);
signal(SIGTERM, abrtHandler); }
// Defining options
const char* opts = "m:r:c:C:u:p:t:v:dDsh";
// Same mechanism as in DCDBPusher - checking if help string is requested before loading config
char ret;
while ((ret = getopt(argc, argv, opts)) != -1) {
switch (ret)
{
case 'h':
usage();
exit(EXIT_FAILURE);
break;
default:
//do nothing (other options are read later on)
break;
}
}
DCDBLog::initLogging();
auto cmdSink = DCDBLog::setupCmdLogger();
Configuration config(argv[argc - 1]);
if( !config.readGlobal() ) {
LOG(fatal) << "Failed to read global configuration!";
exit(EXIT_FAILURE);
}
globalCA_t& settings = config.getGlobal();
/* Parse command line */ /* Parse command line */
int ret; std::string listenHost, cassandraHost, restApiHost;
std::string listenHost, cassandraHost, restApiHost, ttl;
std::string cassandraUser, cassandraPassword;
std::string listenPort, cassandraPort, restApiPort; std::string listenPort, cassandraPort, restApiPort;
listenHost = LISTENHOST; optind = 1;
cassandraHost = CASSANDRAHOST; while ((ret=getopt(argc, argv, opts))!=-1) {
restApiHost = RESTAPIHOST;
ttl = "0";
statistics = false;
while ((ret=getopt(argc, argv, "m:r:c:u:p:t:dDsh"))!=-1) {
switch(ret) { switch(ret) {
case 'm': case 'm':
listenHost = optarg; settings.mqttListenAddress = optarg;
break; break;
case 'r': case 'r':
restApiHost = optarg; settings.restListenAddress = optarg;
break; break;
case 'c': case 'c':
cassandraHost = optarg; settings.cassandraSettings.address = optarg;
break;
case 'C':
settings.cacheInterval = stoul(optarg);
break; break;
case 'u': case 'u':
cassandraUser = optarg; settings.cassandraSettings.username = optarg;
break; break;
case 'p': { case 'p': {
cassandraPassword = optarg; settings.cassandraSettings.password = optarg;
// What does this do? Mask the password?
size_t pwdLen = strlen(optarg); size_t pwdLen = strlen(optarg);
memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen); memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
if (pwdLen > 3) { if (pwdLen > 3) {
...@@ -306,14 +333,17 @@ int main(int argc, char* const argv[]) { ...@@ -306,14 +333,17 @@ int main(int argc, char* const argv[]) {
break; break;
} }
case 't': case 't':
ttl = optarg; settings.cassandraSettings.ttl = stoul(optarg);
break;
case 'v':
settings.logLevelCmd = DCDBLog::translateLogLevel(stoi(optarg));
break; break;
case 'd': case 'd':
case 'D': case 'D':
dcdbdaemon(); settings.daemonize = 1;
break; break;
case 's': case 's':
statistics = true; settings.statistics = 1;
break; break;
case 'h': case 'h':
default: default:
...@@ -322,9 +352,31 @@ int main(int argc, char* const argv[]) { ...@@ -322,9 +352,31 @@ int main(int argc, char* const argv[]) {
} }
} }
auto fileSink = DCDBLog::setupFileLogger(settings.tempDir, std::string("collectagent"));
//severity level may be overwritten (per option or config-file) --> set it according to globalSettings
fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);
/*
* Catch SIGINT signals to allow for proper server shutdowns.
*/
signal(SIGINT, sigHandler);
/*
* Catch critical signals to allow for backtraces
*/
signal(SIGABRT, abrtHandler);
signal(SIGSEGV, abrtHandler);
signal(SIGTERM, abrtHandler);
// Daemonizing the collectagent
if(settings.daemonize)
dcdbdaemon();
/* /*
* Parse hostnames for port specifications * Parse hostnames for port specifications
*/ */
listenHost = string(settings.mqttListenAddress);
size_t pos = listenHost.find(":"); size_t pos = listenHost.find(":");
if (pos != string::npos) { if (pos != string::npos) {
listenPort = listenHost.substr(pos+1); listenPort = listenHost.substr(pos+1);
...@@ -332,6 +384,8 @@ int main(int argc, char* const argv[]) { ...@@ -332,6 +384,8 @@ int main(int argc, char* const argv[]) {
} else { } else {
listenPort = LISTENPORT; listenPort = LISTENPORT;
} }
cassandraHost = string(settings.cassandraSettings.address);
pos = cassandraHost.find(":"); pos = cassandraHost.find(":");
if (pos != string::npos) { if (pos != string::npos) {
cassandraPort = cassandraHost.substr(pos+1); cassandraPort = cassandraHost.substr(pos+1);
...@@ -339,6 +393,8 @@ int main(int argc, char* const argv[]) { ...@@ -339,6 +393,8 @@ int main(int argc, char* const argv[]) {
} else { } else {
cassandraPort = CASSANDRAPORT; cassandraPort = CASSANDRAPORT;
} }
restApiHost = string(settings.restListenAddress);
pos = restApiHost.find(":"); pos = restApiHost.find(":");
if (pos != string::npos) { if (pos != string::npos) {
restApiPort = restApiHost.substr(pos+1); restApiPort = restApiHost.substr(pos+1);
...@@ -347,14 +403,14 @@ int main(int argc, char* const argv[]) { ...@@ -347,14 +403,14 @@ int main(int argc, char* const argv[]) {
restApiPort = RESTAPIPORT; restApiPort = RESTAPIPORT;
} }
// Setting the size of the sensor cache
mySensorCache.setMaxHistory(settings.cacheInterval * 1000000000);
/* //Allocate and initialize connection to Cassandra.
* Allocate and initialize connection to Cassandra. dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
*/
dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword);
if (!dcdbConn->connect()) { if (!dcdbConn->connect()) {
std::cout << "Cannot connect to Cassandra!" << std::endl; LOG(fatal) << "Cannot connect to Cassandra!";
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
...@@ -372,25 +428,18 @@ int main(int argc, char* const argv[]) { ...@@ -372,25 +428,18 @@ int main(int argc, char* const argv[]) {
/* /*
* Set TTL for data store inserts if TTL > 0. * Set TTL for data store inserts if TTL > 0.
*/ */
uint64_t ttlInt; if (settings.cassandraSettings.ttl > 0)
std::istringstream ttlParser(ttl); mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
if (!(ttlParser >> ttlInt)) {
std::cout << "Invalid TTL!" << std::endl;
exit(EXIT_FAILURE);
}
if (ttlInt) {
mySensorDataStore->setTTL(ttlInt);
}
/* /*
* Start the MQTT Message Server. * Start the MQTT Message Server.
*/ */
SimpleMQTTServer ms(listenHost, listenPort); SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
ms.setMessageCallback(mqttCallback); ms.setMessageCallback(mqttCallback);
ms.start(); ms.start();
cout << "MQTT Server running..." << std::endl; LOG(info) << "MQTT Server running...";
/* /*
* Start the HTTP Server for the REST API * Start the HTTP Server for the REST API
...@@ -404,7 +453,7 @@ int main(int argc, char* const argv[]) { ...@@ -404,7 +453,7 @@ int main(int argc, char* const argv[]) {
httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort)); httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
httpThread = std::thread([&httpServer] { httpServer.run(); }); httpThread = std::thread([&httpServer] { httpServer.run(); });
cout << "HTTP Server running..." << std::endl; LOG(info) << "HTTP Server running...";
/* /*
* Run (hopefully) forever... * Run (hopefully) forever...
...@@ -412,9 +461,11 @@ int main(int argc, char* const argv[]) { ...@@ -412,9 +461,11 @@ int main(int argc, char* const argv[]) {
keepRunning = 1; keepRunning = 1;
timeval start, end; timeval start, end;
double elapsed; double elapsed;
msgCtr = 0;
cout << "Collect Agent running..." << std::endl; pmsgCtr = 0;
readingCtr = 0;
LOG(info) << "Collect Agent running...";
while(keepRunning) { while(keepRunning) {
gettimeofday(&start, NULL); gettimeofday(&start, NULL);
sleep(60); sleep(60);
...@@ -423,28 +474,29 @@ int main(int argc, char* const argv[]) { ...@@ -423,28 +474,29 @@ int main(int argc, char* const argv[]) {
elapsed = (end.tv_sec - start.tv_sec) * 1000.0; elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
elapsed += (end.tv_usec - start.tv_usec) / 1000.0; elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0; float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
if (statistics) { if (settings.statistics) {
cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n"; LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
} }
msgCtr = 0; msgCtr = 0;
pmsgCtr = 0; pmsgCtr = 0;
readingCtr = 0;
} }
cout << "Stopping...\n"; LOG(info) << "Stopping...";
ms.stop(); ms.stop();
cout << "MQTT Server stopped..." << std::endl; LOG(info) << "MQTT Server stopped...";
httpServer.stop(); httpServer.stop();
httpThread.join(); httpThread.join();
cout << "HTTP Server stopped..." << std::endl; LOG(info) << "HTTP Server stopped...";
delete mySensorDataStore; delete mySensorDataStore;
delete mySensorConfig; delete mySensorConfig;
dcdbConn->disconnect(); dcdbConn->disconnect();
delete dcdbConn; delete dcdbConn;
cout << "Collect Agent closed. Bye bye..." << std::endl; LOG(info) << "Collect Agent closed. Bye bye...";
} }
catch (const exception& e) { catch (const exception& e) {
cout << "Exception: " << e.what() << "\n"; LOG(fatal) << "Exception: " << e.what();
abrt(EXIT_FAILURE, INTERR); abrt(EXIT_FAILURE, INTERR);
} }
......
global {
mqttListenAddress 127.0.0.1:1883