//================================================================================ // Name : collectagent.cpp // Author : Axel Auweter // Copyright : Leibniz Supercomputing Centre // Description : Main code of the CollectAgent //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2016 Leibniz Supercomputing Centre // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version 2 // of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //================================================================================ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "version.h" #include "configuration.h" #include "simplemqttserver.h" #include "messaging.h" #include "abrt.h" #include "dcdbdaemon.h" #include "sensorcache.h" #include "analyticscontroller.h" #include "../analytics/includes/QueryEngine.h" #define __STDC_FORMAT_MACROS #include using namespace std; int keepRunning; bool statistics; uint64_t msgCtr; uint64_t pmsgCtr; uint64_t readingCtr; SensorCache mySensorCache; AnalyticsController* analyticsController; DCDB::Connection* dcdbConn; DCDB::SensorDataStore *mySensorDataStore; DCDB::SensorConfig *mySensorConfig; DCDB::SCError err; QueryEngine& queryEngine = QueryEngine::getInstance(); logger_t lg; std::vector* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector* buffer, const bool rel) { std::string topic; // Getting the topic of the queried sensor from the Navigator try { topic = queryEngine.getNavigator()->getNodeTopic(name); } catch(const std::domain_error& e) { return NULL; } std::vector *output = NULL; DCDB::SensorId sid; // Creating a SID to perform the query sid.mqttTopicConvert(topic); if(mySensorCache.getSensorMap().count(sid) > 0) { CacheEntry &entry = mySensorCache.getSensorMap()[sid]; output = entry.getView(startTs, endTs, buffer, rel); if (output->size() > 0) return output; } // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra try { DCDB::PublicSensor publicSensor; publicSensor.name = name; publicSensor.pattern = topic; std::list results; DCDB::Sensor sensor(dcdbConn, publicSensor); uint64_t now = getTimestamp(); //Converting relative timestamps to absolute uint64_t startTsInt = rel ? now - startTs : startTs; uint64_t endTsInt = rel ? now - endTs : endTs; DCDB::TimeStamp start(startTsInt), end(endTsInt); sensor.query(results, start, end, DCDB::AGGREGATE_NONE); // Dealing with allocations that may have been performed by the cache search if(output) output->clear(); else if(buffer) { buffer->clear(); output = buffer; } else output = new std::vector(); reading_t reading; //TODO: fix when result contains only partial time range of the query for (const auto &r : results) { reading.value = r.value; reading.timestamp = r.timeStamp.getRaw(); output->push_back(reading); } } catch(const std::exception& e) { if(!buffer && output) delete output; return NULL; } return output; } /* Normal termination (SIGINT, CTRL+C) */ void sigHandler(int sig) { boost::log::sources::severity_logger lg; if( sig == SIGINT ) LOG(fatal) << "Received SIGINT"; else if( sig == SIGTERM ) LOG(fatal) << "Received SIGTERM"; keepRunning = 0; } /* Crash */ void abrtHandler(int sig) { abrt(EXIT_FAILURE, SIGNAL); } //TODO: trim common code with dcdbpusher struct httpHandler_t; typedef boost::network::http::server httpServer_t; struct httpHandler_t { void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) { httpServer_t::string_type ip = source(request); static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } }; boost::network::uri::uri uri("https://" + request.destination); httpServer_t::string_type method = request.method; httpServer_t::string_type path = uri.path(); httpServer_t::string_type query = uri.query(); std::string response = ""; std::ostringstream data; std::string auth_value = ""; std::vector pathStrs; std::vector> queries; //first check if request is supported at all if (method != "GET" && method != "PUT") { LOGH(warning) << "Unsupported " << method << " request was made"; connection->set_status(httpServer_t::connection::not_supported); goto error; } LOGH(info) << method << " request of " << request.destination << " was made"; //do some string processing //split path into its hierarchical parts if (path.size() >= 2) { if (path[0] == '/') path.erase(0,1); if (path[path.size() -1] == '/') path.erase(path.size() -1); boost::split(pathStrs, path, boost::is_any_of("/"), boost::token_compress_off); } //split query part into the individual queries (key-value pairs) { //do not remove the enclosing brackets //need to encapsulate this code block to keep queryStrs local std::vector queryStrs; boost::split(queryStrs, query, boost::is_any_of(";"), boost::token_compress_on); for(auto& key : queryStrs) { size_t pos = key.find("="); if (pos != std::string::npos) { std::string value; value = key.substr(pos+1); key.erase(pos); queries.push_back(std::make_pair(key, value)); } } } //finished string processing //bool json = false; //for (auto& p : queries) { // authkey is required in every case //if (p.first == "authkey") { // auth_value = p.second; //} else //if (p.first == "json") // if (stoi(p.second) > 0) // json = true; //} if (pathStrs.size() < 1) { LOGH(warning) << "Received malformed request: No first path part"; connection->set_status(httpServer_t::connection::bad_request); goto error; } //select code depending on request if (method == "GET") { if (pathStrs[0] == "help") { response = "collectagent RESTful API cheatsheet:\n" " -GET: /help This help message\n" " /analytics/help\n" " /[sensor]/avg?interval=[timeInSec]\n" " Average of last sensor readings from the last\n" " [interval] seconds or of all cached readings\n" " if no interval is given\n" "\n"; //"All resources have to be prepended by host:port and need at\n" //"least the query ?authkey=[token] at the end. Multiple queries\n" //"need to be separated by semicolons(';')\n"; response += analyticsController->getManager()->restCheatSheet; } else if (pathStrs[0] == "analytics") { try { restResponse_t reply = analyticsController->REST(pathStrs, queries, method); data << reply.data; response = reply.response; } catch(const std::invalid_argument &e) { LOGH(warning) << e.what(); connection->set_status(httpServer_t::connection::bad_request); goto error; } catch(const std::domain_error &e) { response = e.what(); connection->set_status(httpServer_t::connection::not_found); } catch(const std::exception &e) { LOGH(warning) << e.what(); connection->set_status(httpServer_t::connection::internal_server_error); goto error; } } else { if (pathStrs.size() < 2) { LOGH(warning) << "Received malformed request: No second path part"; connection->set_status(httpServer_t::connection::bad_request); goto error; } if (pathStrs[1] != "avg") { LOGH(warning) << "Unknown action " << pathStrs[1] << " requested"; connection->set_status(httpServer_t::connection::not_supported); goto error; } uint64_t time = 0; for (auto &p : queries) if (p.first == "interval") time = std::stoul(p.second); //try getting the latest value try { //TODO: switch from SID input to sensor name input int64_t val = mySensorCache.getSensor(pathStrs[0], (uint64_t) time * 1000000000); connection->set_status(httpServer_t::connection::ok); response = "collectagent::" + pathStrs[0] + " Average of last " + std::to_string(time) + " seconds is " + std::to_string(val); //data << val << "\n"; //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl; } catch (const std::invalid_argument &e) { connection->set_status(httpServer_t::connection::not_found); response = "Error: Sensor id not found.\n"; } catch (const std::out_of_range &e) { connection->set_status(httpServer_t::connection::no_content); response = "Error: Sensor unavailable.\n"; } catch (const std::exception &e) { connection->set_status(httpServer_t::connection::internal_server_error); LOGH(warning) << "Internal server error.\n"; goto error; } } } else if(method == "PUT") { if( pathStrs.back() == "reload" ) analyticsController->halt(true); try { restResponse_t reply = analyticsController->REST(pathStrs, queries, method); data << reply.data; response = reply.response; } catch(const std::invalid_argument &e) { LOGH(warning) << e.what(); connection->set_status(httpServer_t::connection::bad_request); goto error; } catch(const std::domain_error &e) { response = e.what(); connection->set_status(httpServer_t::connection::not_found); } catch(const std::exception &e) { response = e.what(); connection->set_status(httpServer_t::connection::internal_server_error); } // Continue MQTTPusher when a reload was performed if( pathStrs.back() == "reload" ) analyticsController->resume(); } LOGH(info) << "Responding: " << response; data << response << std::endl; //Error management section error: connection->set_headers(boost::make_iterator_range(headers, headers + 2)); connection->write(data.str()); } }; int mqttCallback(SimpleMQTTMessage *msg) { /* * Increment the msgCtr/vmsgCtr for statistics. */ msgCtr++; if (msg->isPublish()) pmsgCtr++; uint64_t len; /* * Decode the message and put into the database. */ if (msg->isPublish()) { const char *topic = msg->getTopic().c_str(); // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix // We use strncmp as it is the most efficient way to do it if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) { if ((len = msg->getPayloadLength()) == 0) { LOG(error) << "Empty topic-to-name mapping message received"; return 1; } string sensorName((char *) msg->getPayload(), len); err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN); // PublishSensor does most of the error checking for us switch (err) { case DCDB::SC_INVALIDPATTERN: LOG(error) << "Invalid sensor topic : " << msg->getTopic(); return 1; case DCDB::SC_INVALIDPUBLICNAME: LOG(error) << "Invalid sensor public name: " << sensorName; return 1; case DCDB::SC_INVALIDSESSION: LOG(error) << "Cannot reach sensor data store."; return 1; default: break; } } else { mqttPayload buf, *payload; len = msg->getPayloadLength(); //In the 64 bit message case, the collect agent provides a timestamp if (len == sizeof(uint64_t)) { payload = &buf; payload->value = *((int64_t *) msg->getPayload()); payload->timestamp = Messaging::calculateTimestamp(); len = sizeof(uint64_t) * 2; } //...otherwise it just retrieves it from the MQTT message payload. else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) { payload = (mqttPayload *) msg->getPayload(); } //...otherwise this message is malformed -> ignore... else { LOG(error) << "Message malformed"; return 1; } /* * Check if we can decode the message topic * into a valid SensorId. If successful, store * the record in the database. */ DCDB::SensorId sid; if (sid.mqttTopicConvert(msg->getTopic())) { #if 0 cout << "Topic decode successful:" << endl << " Raw: " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl << " DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl << " device_id: " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl << " sensor_number: " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec; cout << "Payload (" << len/sizeof(mqttPayload) << " messages):"<< endl; for (uint64_t i=0; igetTopic() << "\n"; return 1; } #endif } } return 0; } /* * Print usage information */ void usage() { Configuration config("", "collectagent.conf"); /* 1 2 3 4 5 6 7 8 012345678901234567890123456789012345678901234567890123456789012345678901234567890 */ cout << "Usage:" << endl; cout << " collectagent [-d] [-s] [-x] [-a] [-m] [-c] [-u] [-p] [-t] [-v] " << endl; cout << " collectagent -h" << endl; cout << endl; cout << "Options:" << endl; cout << " -a Auto-publish pattern [default: none]" << endl; cout << " -m MQTT listen address [default: " << config.mqttListenAddress << "]" << endl; cout << " -c Cassandra host [default: " << config.cassandraSettings.address << "]" << endl; cout << " -u Cassandra username [default: none]" << endl; cout << " -p Cassandra password [default: none]" << endl; cout << " -t Cassandra insert TTL [default: " << config.cassandraSettings.ttl << "]" << endl; cout << " -v Set verbosity of output [default: " << config.logLevelCmd << "]" << endl << " Can be a number between 5 (all) and 0 (fatal)." << endl; cout << endl; cout << " -d Daemonize" << endl; cout << " -s Print message stats" <= 3) ? 3 : pwdLen); if (pwdLen > 3) { memset(optarg+3, 0, pwdLen-3); } break; } case 't': cassandraSettings.ttl = stoul(optarg); break; case 'v': settings.logLevelCmd = translateLogLevel(stoi(optarg)); break; case 'd': case 'D': settings.daemonize = 1; break; case 's': settings.statistics = 1; break; case 'x': settings.validateConfig = true; break; case 'h': default: usage(); exit(EXIT_FAILURE); } } auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent")); //severity level may be overwritten (per option or config-file) --> set it according to globalSettings fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile); cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd); /* * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns. */ signal(SIGINT, sigHandler); signal(SIGTERM, sigHandler); /* * Catch critical signals to allow for backtraces */ signal(SIGABRT, abrtHandler); signal(SIGSEGV, abrtHandler); // Daemonizing the collectagent if(settings.daemonize) dcdbdaemon(); /* * Parse hostnames for port specifications */ listenHost = string(settings.mqttListenAddress); size_t pos = listenHost.find(":"); if (pos != string::npos) { listenPort = listenHost.substr(pos+1); listenHost.erase(pos); } else { listenPort = LISTENPORT; } cassandraHost = string(cassandraSettings.address); pos = cassandraHost.find(":"); if (pos != string::npos) { cassandraPort = cassandraHost.substr(pos+1); cassandraHost.erase(pos); } else { cassandraPort = CASSANDRAPORT; } // Setting the size of the sensor cache // Conversion from milliseconds to nanoseconds mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000); //Allocate and initialize connection to Cassandra. dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), cassandraSettings.username, cassandraSettings.password); dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo); dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo); uint32_t params[3] = {cassandraSettings.coreConnPerHost, cassandraSettings.maxConnPerHost, cassandraSettings.maxConcRequests}; dcdbConn->setBackendParams(params); if (!dcdbConn->connect()) { LOG(fatal) << "Cannot connect to Cassandra!"; exit(EXIT_FAILURE); } /* * Legacy behavior: Initialize the DCDB schema in Cassandra. */ dcdbConn->initSchema(); /* * Allocate the SensorDataStore. */ mySensorDataStore = new DCDB::SensorDataStore(dcdbConn); mySensorConfig = new DCDB::SensorConfig(dcdbConn); /* * Set TTL for data store inserts if TTL > 0. */ if (cassandraSettings.ttl > 0) mySensorDataStore->setTTL(cassandraSettings.ttl); mySensorDataStore->setDebugLog(cassandraSettings.debugLog); analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore); analyticsController->setCache(&mySensorCache); if(!analyticsController->initialize(settings, argv[argc - 1])) return EXIT_FAILURE; queryEngine.setQueryCallback(sensorQueryCallback); LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug; LOG_VAR(vLogLevel) << "----- Configuration -----"; //print global settings in either case LOG(info) << "Global Settings:"; LOG(info) << " MQTT-listenAddress: " << settings.mqttListenAddress; LOG(info) << " CacheInterval: " << int(pluginSettings.cacheInterval/1000) << " [s]"; LOG(info) << " CleaningInterval: " << settings.cleaningInterval << " [s]"; LOG(info) << " MessageThreads: " << settings.messageThreads; LOG(info) << " MessageSlots: " << settings.messageSlots; LOG(info) << " Daemonize: " << (settings.daemonize ? "Enabled" : "Disabled"); LOG(info) << " Statistics: " << (settings.statistics ? "Enabled" : "Disabled"); LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix; LOG(info) << " Write-Dir: " << pluginSettings.tempdir; LOG(info) << (settings.validateConfig ? " Only validating config files." : " ValidateConfig: Disabled"); LOG(info) << "Analytics Settings:"; LOG(info) << " Hierarchy: " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none"); LOG(info) << "Cassandra Driver Settings:"; LOG(info) << " Address: " << cassandraSettings.address; LOG(info) << " TTL: " << cassandraSettings.ttl; LOG(info) << " NumThreadsIO: " << cassandraSettings.numThreadsIo; LOG(info) << " QueueSizeIO: " << cassandraSettings.queueSizeIo; LOG(info) << " CoreConnPerHost: " << cassandraSettings.coreConnPerHost; LOG(info) << " MaxConnPerHost: " << cassandraSettings.maxConnPerHost; LOG(info) << " MaxConcRequests: " << cassandraSettings.maxConcRequests; LOG(info) << " DebugLog: " << (cassandraSettings.debugLog ? "Enabled" : "Disabled"); #ifdef SimpleMQTTVerbose LOG(info) << " Username: " << cassandraSettings.username; LOG(info) << " Password: " << cassandraSettings.password; #else LOG(info) << " Username and password not printed."; #endif LOG(info) << "RestAPI Settings:"; LOG(info) << " REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort; #ifdef SimpleMQTTVerbose LOG(info) << " Certificate: " << restAPISettings.certificate; LOG(info) << " Private key file: " << restAPISettings.privateKey; LOG(info) << " DH params from: " << restAPISettings.dhFile; #else LOG(info) << " Certificate, private key and DH-param file not printed."; #endif LOG_VAR(vLogLevel) << "----- Analytics Configuration -----"; for(auto& p : analyticsController->getManager()->getPlugins()) { LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\""; p.configurator->printConfig(vLogLevel); } LOG_VAR(vLogLevel) << "----- End Configuration -----"; if (settings.validateConfig) return EXIT_SUCCESS; else analyticsController->start(); /* * Start the MQTT Message Server. */ SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots); ms.setMessageCallback(mqttCallback); ms.start(); LOG(info) << "MQTT Server running..."; /* * Start the HTTP Server for the REST API */ std::thread httpThread; httpHandler_t httpHandler; httpServer_t::options httpOptions(httpHandler); httpOptions.reuse_address(true); httpOptions.thread_pool(std::make_shared()); httpServer_t httpServer(httpOptions.address(restAPISettings.restHost).port(restAPISettings.restPort)); httpThread = std::thread([&httpServer] { httpServer.run(); }); LOG(info) << "HTTP Server running..."; /* * Run (hopefully) forever... */ keepRunning = 1; timeval start, end; double elapsed; msgCtr = 0; pmsgCtr = 0; readingCtr = 0; gettimeofday(&start, NULL); uint64_t lastCleanup = start.tv_sec; LOG(info) << "Collect Agent running..."; while(keepRunning) { gettimeofday(&start, NULL); if(start.tv_sec - lastCleanup > settings.cleaningInterval) { uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000); lastCleanup = start.tv_sec; if(purged > 0) LOG(info) << "Cache: purged " << purged << " obsolete entries"; } sleep(60); /* not really thread safe but will do the job */ gettimeofday(&end, NULL); elapsed = (end.tv_sec - start.tv_sec) * 1000.0; elapsed += (end.tv_usec - start.tv_usec) / 1000.0; float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0; if (settings.statistics && keepRunning) { LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)"; LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s "; } msgCtr = 0; pmsgCtr = 0; readingCtr = 0; } LOG(info) << "Stopping..."; analyticsController->stop(); ms.stop(); LOG(info) << "MQTT Server stopped..."; httpServer.stop(); httpThread.join(); LOG(info) << "HTTP Server stopped..."; delete mySensorDataStore; delete mySensorConfig; dcdbConn->disconnect(); delete dcdbConn; LOG(info) << "Collect Agent closed. Bye bye..."; } catch (const exception& e) { LOG(fatal) << "Exception: " << e.what(); abrt(EXIT_FAILURE, INTERR); } return EXIT_SUCCESS; }