//================================================================================ // Name : collectagent.cpp // Author : Axel Auweter // Copyright : Leibniz Supercomputing Centre // Description : Main code of the CollectAgent //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2016 Leibniz Supercomputing Centre // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version 2 // of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //================================================================================ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "version.h" #include "configuration.h" #include "simplemqttserver.h" #include "messaging.h" #include "abrt.h" #include "dcdbdaemon.h" #include "sensorcache.h" #define __STDC_FORMAT_MACROS #include using namespace std; int keepRunning; bool statistics; uint64_t msgCtr; uint64_t pmsgCtr; uint64_t readingCtr; DCDB::Connection* dcdbConn; DCDB::SensorDataStore *mySensorDataStore; DCDB::SensorConfig *mySensorConfig; DCDB::SensorCache mySensorCache; DCDB::SCError err; logger_t lg; /* Normal termination (SIGINT, CTRL+C) */ void sigHandler(int sig) { boost::log::sources::severity_logger lg; if( sig == SIGINT ) LOG(fatal) << "Received SIGINT"; else if( sig == SIGTERM ) LOG(fatal) << "Received SIGTERM"; keepRunning = 0; } /* Crash */ void abrtHandler(int sig) { abrt(EXIT_FAILURE, SIGNAL); } struct httpHandler_t; typedef boost::network::http::server httpServer_t; struct httpHandler_t { void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) { httpServer_t::string_type ip = source(request); std::ostringstream data; static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } }; //try getting the latest value try { boost::network::uri::uri uri("http://localhost"+request.destination); std::map queries; boost::network::uri::query_map(uri, queries); int avg = atoi(queries.find("avg")->second.c_str()); int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t) avg); data << val << "\n"; //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl; connection->set_status(httpServer_t::connection::ok); connection->set_headers(boost::make_iterator_range(headers, headers + 2)); connection->write(data.str()); } catch (const std::invalid_argument& e) { connection->set_status(httpServer_t::connection::not_found); connection->set_headers(boost::make_iterator_range(headers, headers + 2)); connection->write("Error: Sensor id not found.\n"); } catch (const std::out_of_range &e) { connection->set_status(httpServer_t::connection::no_content); connection->set_headers(boost::make_iterator_range(headers, headers + 2)); connection->write("Error: Sensor unavailable.\n"); } catch (const std::exception& e) { connection->set_status(httpServer_t::connection::internal_server_error); connection->set_headers(boost::make_iterator_range(headers, headers + 2)); connection->write("Server error.\n"); } } }; int mqttCallback(SimpleMQTTMessage *msg) { /* * Increment the msgCtr/vmsgCtr for statistics. */ msgCtr++; if (msg->isPublish()) pmsgCtr++; uint64_t len; /* * Decode the message and put into the database. */ if (msg->isPublish()) { const char *topic = msg->getTopic().c_str(); // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix // We use strncmp as it is the most efficient way to do it if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) { if ((len = msg->getPayloadLength()) == 0) { LOG(error) << "Empty topic-to-name mapping message received"; return 1; } string sensorName((char *) msg->getPayload(), len); err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN); // PublishSensor does most of the error checking for us switch (err) { case DCDB::SC_INVALIDPATTERN: LOG(error) << "Invalid sensor topic : " << msg->getTopic(); return 1; case DCDB::SC_INVALIDPUBLICNAME: LOG(error) << "Invalid sensor public name: " << sensorName; return 1; case DCDB::SC_INVALIDSESSION: LOG(error) << "Cannot reach sensor data store."; return 1; default: break; } } else { mqttPayload buf, *payload; len = msg->getPayloadLength(); //In the 64 bit message case, the collect agent provides a timestamp if (len == sizeof(uint64_t)) { payload = &buf; payload->value = *((int64_t *) msg->getPayload()); payload->timestamp = Messaging::calculateTimestamp(); len = sizeof(uint64_t) * 2; } //...otherwise it just retrieves it from the MQTT message payload. else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) { payload = (mqttPayload *) msg->getPayload(); } //...otherwise this message is malformed -> ignore... else { LOG(error) << "Message malformed"; return 1; } /* * Check if we can decode the message topic * into a valid SensorId. If successful, store * the record in the database. */ DCDB::SensorId sid; if (sid.mqttTopicConvert(msg->getTopic())) { #if 0 cout << "Topic decode successful:" << endl << " Raw: " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl << " DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl << " device_id: " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl << " sensor_number: " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec; cout << "Payload (" << len/sizeof(mqttPayload) << " messages):"<< endl; for (uint64_t i=0; igetTopic() << "\n"; return 1; } #endif } } return 0; } /* * Print usage information */ void usage() { Configuration config(""); globalCA_t& defaults = config.getGlobal(); /* 1 2 3 4 5 6 7 8 012345678901234567890123456789012345678901234567890123456789012345678901234567890 */ cout << "Usage:" << endl; cout << " collectagent [-m] [-r] [-c] [-C] [-u] [-p] [-t] [-v] [-d] [-s] " << endl; cout << " collectagent -h" << endl; cout << endl; cout << "Options:" << endl; cout << " -m MQTT listen address [default: " << defaults.mqttListenAddress << "]" << endl; cout << " -r REST API listen address [default: " << defaults.restListenAddress << "]" << endl; cout << " -c Cassandra host [default: " << defaults.cassandraSettings.address << "]" << endl; cout << " -C Cache interval in [s] [default: " << defaults.cacheInterval << "]" << endl; cout << " -u Cassandra username [default: none]" << endl; cout << " -p Cassandra password [default: none]" << endl; cout << " -t Cassandra insert TTL [default: " << defaults.cassandraSettings.ttl << "]" << endl; cout << " -v Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl << " Can be a number between 5 (all) and 0 (fatal)." << endl; cout << endl; cout << " -d Daemonize" << endl; cout << " -s Print message stats" <= 3) ? 3 : pwdLen); if (pwdLen > 3) { memset(optarg+3, 0, pwdLen-3); } break; } case 't': settings.cassandraSettings.ttl = stoul(optarg); break; case 'v': settings.logLevelCmd = translateLogLevel(stoi(optarg)); break; case 'd': case 'D': settings.daemonize = 1; break; case 's': settings.statistics = 1; break; case 'h': default: usage(); exit(EXIT_FAILURE); } } auto fileSink = setupFileLogger(settings.tempDir, std::string("collectagent")); //severity level may be overwritten (per option or config-file) --> set it according to globalSettings fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile); cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd); /* * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns. */ signal(SIGINT, sigHandler); signal(SIGTERM, sigHandler); /* * Catch critical signals to allow for backtraces */ signal(SIGABRT, abrtHandler); signal(SIGSEGV, abrtHandler); // Daemonizing the collectagent if(settings.daemonize) dcdbdaemon(); /* * Parse hostnames for port specifications */ listenHost = string(settings.mqttListenAddress); size_t pos = listenHost.find(":"); if (pos != string::npos) { listenPort = listenHost.substr(pos+1); listenHost.erase(pos); } else { listenPort = LISTENPORT; } cassandraHost = string(settings.cassandraSettings.address); pos = cassandraHost.find(":"); if (pos != string::npos) { cassandraPort = cassandraHost.substr(pos+1); cassandraHost.erase(pos); } else { cassandraPort = CASSANDRAPORT; } restApiHost = string(settings.restListenAddress); pos = restApiHost.find(":"); if (pos != string::npos) { restApiPort = restApiHost.substr(pos+1); restApiHost.erase(pos); } else { restApiPort = RESTAPIPORT; } // Setting the size of the sensor cache mySensorCache.setMaxHistory(settings.cacheInterval * 1000000000); //Allocate and initialize connection to Cassandra. dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password); dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo); dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo); uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests}; dcdbConn->setBackendParams(params); if (!dcdbConn->connect()) { LOG(fatal) << "Cannot connect to Cassandra!"; exit(EXIT_FAILURE); } /* * Legacy behavior: Initialize the DCDB schema in Cassandra. */ dcdbConn->initSchema(); /* * Allocate the SensorDataStore. */ mySensorDataStore = new DCDB::SensorDataStore(dcdbConn); mySensorConfig = new DCDB::SensorConfig(dcdbConn); /* * Set TTL for data store inserts if TTL > 0. */ if (settings.cassandraSettings.ttl > 0) mySensorDataStore->setTTL(settings.cassandraSettings.ttl); mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog); /* * Start the MQTT Message Server. */ SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots); ms.setMessageCallback(mqttCallback); ms.start(); LOG(info) << "MQTT Server running..."; /* * Start the HTTP Server for the REST API */ std::thread httpThread; httpHandler_t httpHandler; httpServer_t::options httpOptions(httpHandler); httpOptions.reuse_address(true); httpOptions.thread_pool(std::make_shared()); httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort)); httpThread = std::thread([&httpServer] { httpServer.run(); }); LOG(info) << "HTTP Server running..."; /* * Run (hopefully) forever... */ keepRunning = 1; timeval start, end; double elapsed; msgCtr = 0; pmsgCtr = 0; readingCtr = 0; gettimeofday(&start, NULL); uint64_t lastCleanup = start.tv_sec; LOG(info) << "Collect Agent running..."; while(keepRunning) { gettimeofday(&start, NULL); if(start.tv_sec - lastCleanup > settings.cleaningInterval) { uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000); lastCleanup = start.tv_sec; if(purged > 0) LOG(info) << "Cache: purged " << purged << " obsolete entries"; } sleep(60); /* not really thread safe but will do the job */ gettimeofday(&end, NULL); elapsed = (end.tv_sec - start.tv_sec) * 1000.0; elapsed += (end.tv_usec - start.tv_usec) / 1000.0; float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0; if (settings.statistics && keepRunning) { LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)"; } msgCtr = 0; pmsgCtr = 0; readingCtr = 0; } LOG(info) << "Stopping..."; ms.stop(); LOG(info) << "MQTT Server stopped..."; httpServer.stop(); httpThread.join(); LOG(info) << "HTTP Server stopped..."; delete mySensorDataStore; delete mySensorConfig; dcdbConn->disconnect(); delete dcdbConn; LOG(info) << "Collect Agent closed. Bye bye..."; } catch (const exception& e) { LOG(fatal) << "Exception: " << e.what(); abrt(EXIT_FAILURE, INTERR); } return EXIT_SUCCESS; }