//================================================================================ // Name : collectagent.cpp // Author : Axel Auweter // Contact : info@dcdb.it // Copyright : Leibniz Supercomputing Centre // Description : Main code of the CollectAgent //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2019 Leibniz Supercomputing Centre // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version 2 // of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //================================================================================ /** * @defgroup ca Collect Agent * * @brief MQTT message broker in between pusher and storage backend. * * @details Collect Agent is a intermediary between one or multiple Pusher * instances and one storage backend. It runs a reduced custom MQTT * message server. Collect Agent receives data from Pusher * via MQTT messages and stores them in the storage via libdcdb. */ /** * @file collectagent.cpp * * @brief Main function for the DCDB Collect Agent. * * @ingroup ca */ #include #include #include #include #include #include #include #include #include #include #include #include "version.h" #include "CARestAPI.h" #include "configuration.h" #include "simplemqttserver.h" #include "messaging.h" #include "abrt.h" #include "dcdbdaemon.h" #include "sensorcache.h" #include "analyticscontroller.h" #include "../analytics/includes/QueryEngine.h" #define __STDC_FORMAT_MACROS #include using namespace std; int keepRunning; bool statistics; uint64_t msgCtr; uint64_t pmsgCtr; uint64_t readingCtr; SensorCache mySensorCache; AnalyticsController* analyticsController; DCDB::Connection* dcdbConn; DCDB::SensorDataStore *mySensorDataStore; DCDB::JobDataStore *myJobDataStore; DCDB::SensorConfig *mySensorConfig; DCDB::SCError err; QueryEngine& queryEngine = QueryEngine::getInstance(); logger_t lg; bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector& buffer, const bool rel, const bool range) { std::list tempList; JobData tempData; qeJobData tempQeData; JDError err; if(range) { // Getting a list of jobs in the given time range uint64_t now = getTimestamp(); uint64_t startTsInt = rel ? now - startTs : startTs; uint64_t endTsInt = rel ? now - endTs : endTs; DCDB::TimeStamp start(startTsInt), end(endTsInt); err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end); if(err != JD_OK) return false; } else { // Getting a single job by id err = myJobDataStore->getJobById(tempData, jobId); if(err != JD_OK) return false; tempList.push_back(tempData); } for(auto& jd : tempList) { tempQeData.jobId = jd.jobId; tempQeData.userId = jd.userId; tempQeData.startTime = jd.startTime.getRaw(); tempQeData.endTime = jd.endTime.getRaw(); tempQeData.nodes = jd.nodes; buffer.push_back(tempQeData); } return true; } bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector& buffer, const bool rel) { std::string topic=name; // Getting the topic of the queried sensor from the Navigator // If not found, we try to use the input name as topic try { topic = queryEngine.getNavigator()->getNodeTopic(name); } catch(const std::domain_error& e) {} DCDB::SensorId sid; // Creating a SID to perform the query if(!sid.mqttTopicConvert(topic)) return false; if(mySensorCache.getSensorMap().count(sid) > 0) { CacheEntry &entry = mySensorCache.getSensorMap()[sid]; if (entry.getView(startTs, endTs, buffer, rel)) return true; } // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra try { DCDB::PublicSensor publicSensor; publicSensor.name = name; publicSensor.pattern = topic; std::list results; DCDB::Sensor sensor(dcdbConn, publicSensor); uint64_t now = getTimestamp(); //Converting relative timestamps to absolute uint64_t startTsInt = rel ? now - startTs : startTs; uint64_t endTsInt = rel ? now - endTs : endTs; DCDB::TimeStamp start(startTsInt), end(endTsInt); sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000); if(results.empty()) return false; reading_t reading; for (const auto &r : results) { reading.value = r.value; reading.timestamp = r.timeStamp.getRaw(); buffer.push_back(reading); } } catch(const std::exception& e) { return false; } return true; } /* Normal termination (SIGINT, CTRL+C) */ void sigHandler(int sig) { boost::log::sources::severity_logger lg; if( sig == SIGINT ) LOG(fatal) << "Received SIGINT"; else if( sig == SIGTERM ) LOG(fatal) << "Received SIGTERM"; keepRunning = 0; } /* Crash */ void abrtHandler(int sig) { abrt(EXIT_FAILURE, SIGNAL); } int mqttCallback(SimpleMQTTMessage *msg) { /* * Increment the msgCtr/vmsgCtr for statistics. */ msgCtr++; if (msg->isPublish()) pmsgCtr++; uint64_t len; /* * Decode the message and put into the database. */ if (msg->isPublish()) { const char *topic = msg->getTopic().c_str(); // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix // We use strncmp as it is the most efficient way to do it if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) { if ((len = msg->getPayloadLength()) == 0) { LOG(error) << "Empty topic-to-name mapping message received"; return 1; } string sensorName((char *) msg->getPayload(), len); err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN); // PublishSensor does most of the error checking for us switch (err) { case DCDB::SC_INVALIDPATTERN: LOG(error) << "Invalid sensor topic : " << msg->getTopic(); return 1; case DCDB::SC_INVALIDPUBLICNAME: LOG(error) << "Invalid sensor public name: " << sensorName; return 1; case DCDB::SC_INVALIDSESSION: LOG(error) << "Cannot reach sensor data store."; return 1; default: break; } } else { mqttPayload buf, *payload; len = msg->getPayloadLength(); //In the 64 bit message case, the collect agent provides a timestamp if (len == sizeof(uint64_t)) { payload = &buf; payload->value = *((int64_t *) msg->getPayload()); payload->timestamp = Messaging::calculateTimestamp(); len = sizeof(uint64_t) * 2; } //...otherwise it just retrieves it from the MQTT message payload. else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) { payload = (mqttPayload *) msg->getPayload(); } //...otherwise this message is malformed -> ignore... else { LOG(error) << "Message malformed"; return 1; } /* * Check if we can decode the message topic * into a valid SensorId. If successful, store * the record in the database. */ DCDB::SensorId sid; if (sid.mqttTopicConvert(msg->getTopic())) { #if 0 cout << "Topic decode successful:" << endl << " Raw: " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl << " DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl << " device_id: " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl << " sensor_number: " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec; cout << "Payload (" << len/sizeof(mqttPayload) << " messages):"<< endl; for (uint64_t i=0; igetTopic() << "\n"; return 1; } #endif } } return 0; } /* * Print usage information */ void usage() { Configuration config("", "collectagent.conf"); /* 1 2 3 4 5 6 7 8 012345678901234567890123456789012345678901234567890123456789012345678901234567890 */ cout << "Usage:" << endl; cout << " collectagent [-d] [-s] [-x] [-a] [-m] [-c] [-u] [-p] [-t] [-v] " << endl; cout << " collectagent -h" << endl; cout << endl; cout << "Options:" << endl; cout << " -m MQTT listen address [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl; cout << " -c Cassandra host [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl; cout << " -u Cassandra username [default: none]" << endl; cout << " -p Cassandra password [default: none]" << endl; cout << " -t Cassandra insert TTL [default: " << config.cassandraSettings.ttl << "]" << endl; cout << " -v Set verbosity of output [default: " << config.logLevelCmd << "]" << endl << " Can be a number between 5 (all) and 0 (fatal)." << endl; cout << endl; cout << " -d Daemonize" << endl; cout << " -s Print message stats" <= 3) ? 3 : pwdLen); if (pwdLen > 3) { memset(optarg+3, 0, pwdLen-3); } break; } case 't': cassandraSettings.ttl = stoul(optarg); break; case 'v': settings.logLevelCmd = stoi(optarg); break; case 'd': case 'D': settings.daemonize = 1; break; case 's': settings.statistics = 1; break; case 'x': settings.validateConfig = true; break; case 'h': default: usage(); exit(EXIT_FAILURE); } } //set up logger to file if (settings.logLevelFile >= 0) { auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent")); fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile)); } //severity level may be overwritten (per option or config-file) --> set it according to globalSettings if (settings.logLevelCmd >= 0) { cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd)); } /* * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns. */ signal(SIGINT, sigHandler); signal(SIGTERM, sigHandler); /* * Catch critical signals to allow for backtraces */ signal(SIGABRT, abrtHandler); signal(SIGSEGV, abrtHandler); // Daemonizing the collectagent if(settings.daemonize) dcdbdaemon(); // Setting the size of the sensor cache // Conversion from milliseconds to nanoseconds mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000); //Allocate and initialize connection to Cassandra. dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), cassandraSettings.username, cassandraSettings.password); dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo); dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo); uint32_t params[1] = {cassandraSettings.coreConnPerHost}; dcdbConn->setBackendParams(params); if (!dcdbConn->connect()) { LOG(fatal) << "Cannot connect to Cassandra!"; exit(EXIT_FAILURE); } /* * Legacy behavior: Initialize the DCDB schema in Cassandra. */ dcdbConn->initSchema(); /* * Allocate the SensorDataStore. */ mySensorDataStore = new DCDB::SensorDataStore(dcdbConn); mySensorConfig = new DCDB::SensorConfig(dcdbConn); myJobDataStore = new DCDB::JobDataStore(dcdbConn); /* * Set TTL for data store inserts if TTL > 0. */ if (cassandraSettings.ttl > 0) mySensorDataStore->setTTL(cassandraSettings.ttl); mySensorDataStore->setDebugLog(cassandraSettings.debugLog); analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore); analyticsController->setCache(&mySensorCache); queryEngine.setFilter(analyticsSettings.filter); queryEngine.setJobFilter(analyticsSettings.jobFilter); queryEngine.setSensorHierarchy(analyticsSettings.hierarchy); queryEngine.setQueryCallback(sensorQueryCallback); queryEngine.setJobQueryCallback(jobQueryCallback); if(!analyticsController->initialize(settings, argv[argc - 1])) return EXIT_FAILURE; LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug; LOG_VAR(vLogLevel) << "----- Configuration -----"; //print global settings in either case LOG(info) << "Global Settings:"; LOG(info) << " MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort; LOG(info) << " CacheInterval: " << int(pluginSettings.cacheInterval/1000) << " [s]"; LOG(info) << " CleaningInterval: " << settings.cleaningInterval << " [s]"; LOG(info) << " MessageThreads: " << settings.messageThreads; LOG(info) << " MessageSlots: " << settings.messageSlots; LOG(info) << " Daemonize: " << (settings.daemonize ? "Enabled" : "Disabled"); LOG(info) << " Statistics: " << (settings.statistics ? "Enabled" : "Disabled"); LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix; LOG(info) << " Auto-publish: " << (pluginSettings.autoPublish ? "Enabled" : "Disabled"); LOG(info) << " Write-Dir: " << pluginSettings.tempdir; LOG(info) << (settings.validateConfig ? " Only validating config files." : " ValidateConfig: Disabled"); LOG(info) << "Analytics Settings:"; LOG(info) << " Hierarchy: " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none"); LOG(info) << " Filter: " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none"); LOG(info) << "Cassandra Driver Settings:"; LOG(info) << " Address: " << cassandraSettings.host << ":" << cassandraSettings.port; LOG(info) << " TTL: " << cassandraSettings.ttl; LOG(info) << " NumThreadsIO: " << cassandraSettings.numThreadsIo; LOG(info) << " QueueSizeIO: " << cassandraSettings.queueSizeIo; LOG(info) << " CoreConnPerHost: " << cassandraSettings.coreConnPerHost; LOG(info) << " DebugLog: " << (cassandraSettings.debugLog ? "Enabled" : "Disabled"); #ifdef SimpleMQTTVerbose LOG(info) << " Username: " << cassandraSettings.username; LOG(info) << " Password: " << cassandraSettings.password; #else LOG(info) << " Username and password not printed."; #endif if (restAPISettings.enabled) { LOG(info) << "RestAPI Settings:"; LOG(info) << " REST Server: " << restAPISettings.host << ":" << restAPISettings.port; LOG(info) << " Certificate: " << restAPISettings.certificate; LOG(info) << " Private key file: " << restAPISettings.privateKey; LOG(info) << " DH params from: " << restAPISettings.dhFile; } LOG_VAR(vLogLevel) << "----- Analytics Configuration -----"; for(auto& p : analyticsController->getManager()->getPlugins()) { LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\""; p.configurator->printConfig(vLogLevel); } LOG_VAR(vLogLevel) << "----- End Configuration -----"; if (settings.validateConfig) return EXIT_SUCCESS; else analyticsController->start(); /* * Start the MQTT Message Server. */ SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots); ms.setMessageCallback(mqttCallback); ms.start(); LOG(info) << "MQTT Server running..."; /* * Start the HTTP Server for the REST API */ CARestAPI* httpsServer = nullptr; if (restAPISettings.enabled) { httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController); config.readRestAPIUsers(httpsServer); httpsServer->start(); LOG(info) << "HTTP Server running..."; } /* * Run (hopefully) forever... */ keepRunning = 1; timeval start, end; double elapsed; msgCtr = 0; pmsgCtr = 0; readingCtr = 0; gettimeofday(&start, NULL); uint64_t lastCleanup = start.tv_sec; LOG(info) << "Collect Agent running..."; while(keepRunning) { gettimeofday(&start, NULL); if(start.tv_sec - lastCleanup > settings.cleaningInterval) { uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000); lastCleanup = start.tv_sec; if(purged > 0) LOG(info) << "Cache: purged " << purged << " obsolete entries"; } sleep(60); /* not really thread safe but will do the job */ gettimeofday(&end, NULL); elapsed = (end.tv_sec - start.tv_sec) * 1000.0; elapsed += (end.tv_usec - start.tv_usec) / 1000.0; float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0; if (settings.statistics && keepRunning) { LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)"; LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s "; } msgCtr = 0; pmsgCtr = 0; readingCtr = 0; } LOG(info) << "Stopping..."; analyticsController->stop(); ms.stop(); LOG(info) << "MQTT Server stopped..."; if (restAPISettings.enabled) { httpsServer->stop(); delete httpsServer; LOG(info) << "HTTP Server stopped..."; } delete mySensorDataStore; delete myJobDataStore; delete mySensorConfig; dcdbConn->disconnect(); delete dcdbConn; LOG(info) << "Collect Agent closed. Bye bye..."; } catch (const exception& e) { LOG(fatal) << "Exception: " << e.what(); abrt(EXIT_FAILURE, INTERR); } return EXIT_SUCCESS; }