//================================================================================ // Name : dcdbpusher.cpp // Author : Michael Ott (original), Micha Mueller // Copyright : Leibniz Supercomputing Centre // Description : Main functions for the DCDB MQTT Pusher //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2016 Leibniz Supercomputing Centre // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version 2 // of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //================================================================================ #include #include #include #include //Caution: include order matters! HttpsServer.h needs to be included first #include "HttpsServer.h" #include "Configuration.h" #include "MQTTPusher.h" #include "version.h" #include #include #include #include #include #include #include #include #include using namespace std; Configuration* _configuration; MQTTPusher* _mqttPusher; HttpsServer* _httpsServer; AnalyticsManager* _analyticsManager; std::map _sensorMap; QueryEngine& _queryEngine = QueryEngine::getInstance(); boost::shared_ptr keepAliveWork; std::vector* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector* buffer, const bool rel) { //Initializing the sensor map if necessary. Thread safe! if(_queryEngine.updated.load()) { if(!_queryEngine.updating.exchange(true)) { _sensorMap.clear(); for (auto &p : _configuration->getPlugins()) for (auto &g : p.configurator->getSensorGroups()) for (auto &s : g->getSensors()) _sensorMap.insert(std::make_pair(s->getName(), s)); _queryEngine.updated.store(false); _queryEngine.updating.store(false); } else { // Spinning while the sensormap is being built while( _queryEngine.updating.load() ) {} } } if(_sensorMap.count(name) > 0) { SBasePtr sensor = _sensorMap[name]; if(!sensor->isInit()) return buffer; else return sensor->getCache()->getView(startTs, endTs, buffer, rel, true); } return buffer; } void sigHandler(int sig) { boost::log::sources::severity_logger lg; if( sig == SIGINT ) LOG(fatal) << "Received SIGINT"; else if( sig == SIGTERM ) LOG(fatal) << "Received SIGTERM"; //Stop all sensors LOG(info) << "Stopping sensors..."; for(auto& p : _configuration->getPlugins()) { LOG(info) << "Stop \"" << p.id << "\" plugin"; for(const auto& g : p.configurator->getSensorGroups()) { g->stop(); } } //Stop data analytics plugins and analyzers _analyticsManager->stop(); //Stop io service by killing keepAliveWork keepAliveWork.reset(); //Stop MQTTPusher LOG(info) << "Flushing MQTT queues..."; _mqttPusher->stop(); //Stop https server LOG(info) << "Stopping REST API Server..."; _httpsServer->stop(); } void printSyntax() { /* 1 2 3 4 5 6 7 8 012345678901234567890123456789012345678901234567890123456789012345678901234567890 */ _configuration = new Configuration(""); global_t& globalSettings = _configuration->getGlobal(); cout << "Usage:" << endl; cout << " dcdbpusher [-d] [-x] [-a] [-b] [-p] [-m] [-w] [-v] " << endl; cout << " dcdbpusher -h" << endl; cout << endl; cout << "Options:" << endl; cout << " -a Auto-publish pattern [default: none]" << endl; cout << " -b MQTT broker [default: none]" << endl; cout << " -p MQTT broker port [default: " << globalSettings.brokerPort << "]" << endl; cout << " -m MQTT topic prefix [default: none]" << endl; cout << " -w Writable temp dir [default: .]" << endl; cout << " -v Set verbosity of output [default: " << globalSettings.logLevelCmd << "]" << endl << " Can be a number between 5 (all) and 0 (fatal)." << endl; cout << endl; cout << " -d Daemonize" << endl; cout << " -x Parse and print the config but do not actually start dcdbpusher" << endl; cout << " -h This help page" << endl; cout << endl; delete _configuration; _configuration = nullptr; } int main(int argc, char** argv) { cout << "dcdbpusher " << VERSION << endl << endl; boost::asio::io_service io; boost::thread_group threads; if (argc <= 1) { cout << "Please specify a path to the config-directory" << endl << endl; printSyntax(); return 1; } //define allowed command-line options once const char opts[] = "a:b:p:m:v:w:dxh"; //check if help flag specified char c; while ((c = getopt(argc, argv, opts)) != -1) { switch (c) { case 'h': printSyntax(); return 1; break; default: //do nothing (other options are read later on) break; } } //init LOGGING initLogging(); //set up logger to command line auto cmdSink = setupCmdLogger(); //get logger instance boost::log::sources::severity_logger lg; //finished logging startup for the moment (file log added later) _configuration = new Configuration(argv[argc-1]); //Read global variables from config file if(!_configuration->readGlobal()) { LOG(fatal) << "Failed to read global configuration!"; return 1; } global_t& globalSettings = _configuration->getGlobal(); //plugin and restAPI settings are actually part of globalSettings //use the references as shortcut, so that we do not have to prefix with 'globalSettings.' all the time pluginSettings_t& pluginSettings = globalSettings.pluginSettings; restAPISettings_t& restAPISettings = globalSettings.restAPISettings; //reset getopt() optind = 1; //read in options (overwrite dcdbpusher.conf settings if necessary) while ((c = getopt(argc, argv, opts)) != -1) { switch (c) { case 'a': pluginSettings.sensorPattern = optarg; break; case 'b': globalSettings.brokerHost = optarg; break; case 'p': globalSettings.brokerPort = atoi(optarg); break; case 'm': pluginSettings.mqttPrefix = optarg; break; case 'v': globalSettings.logLevelCmd = _configuration->translateLogLevel(stoi(optarg)); break; case 'd': globalSettings.daemonize = 1; break; case 'x': globalSettings.validateConfig = true; break; case 'w': pluginSettings.tempdir = optarg; if (pluginSettings.tempdir[pluginSettings.tempdir.length()-1] != '/') { pluginSettings.tempdir.append("/"); } break; case 'h': printSyntax(); return 1; break; default: if (c != '?') cerr << "Unknown parameter: " << c << endl; return 1; } } //we now should know where the writable tempdir is //set up logger to file auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("dcdbpusher")); //severity level may be overwritten (per option or config-file) --> set it according to globalSettings fileSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelFile); cmdSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelCmd); LOG(info) << "Logging setup complete"; //Read in rest of configuration. Also creates all sensors if(!_configuration->readPlugins()) { LOG(fatal) << "Failed to read configuration!"; return 1; } _analyticsManager = new AnalyticsManager(); // Preparing the SensorNavigator bool failedTree = false; std::shared_ptr navigator = std::make_shared(); vector names, topics; for(const auto& p : _configuration->getPlugins()) for(const auto& g : p.configurator->getSensorGroups()) for(const auto& s : g->getSensors()) { names.push_back(s->getName()); topics.push_back(s->getMqtt()); } try { navigator->buildTree(globalSettings.hierarchy, &names, &topics); } catch(const std::invalid_argument& e) { LOG(error) << e.what(); LOG(error) << "Failed to build sensor hierarchy tree, data analytics manager will not be initialized!"; failedTree = true; } if(!failedTree) { _queryEngine.setNavigator(navigator); _queryEngine.triggerUpdate(); _queryEngine.setQueryCallback(sensorQueryCallback); if(!_analyticsManager->load(argv[argc-1], "dcdbpusher.conf", pluginSettings)) { LOG(fatal) << "Failed to load data analytics manager!"; return 1; } } //print configuration to give some feedback //config of plugins is only printed if the config shall be validated or to debug level otherwise LOG_LEVEL vLogLevel = LOG_LEVEL::debug; if (globalSettings.validateConfig) { vLogLevel = boost::log::trivial::info; } LOG_VAR(vLogLevel) << "----- Configuration -----"; //print global settings in either case LOG(info) << "Global Settings:"; LOG(info) << " Broker: " << globalSettings.brokerHost << ":" << globalSettings.brokerPort; LOG(info) << " Threads: " << globalSettings.threads; LOG(info) << " Daemonize: " << (globalSettings.daemonize ? "Enabled" : "Disabled"); LOG(info) << " MaxMsgNum: " << globalSettings.maxMsgNum; LOG(info) << " MaxInflightMsgNum: " << globalSettings.maxInflightMsgNum; LOG(info) << " MaxQueuedMsgNum: " << globalSettings.maxQueuedMsgNum; LOG(info) << " MQTT-QoS: " << globalSettings.qosLevel; LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix; LOG(info) << " Write-Dir: " << pluginSettings.tempdir; LOG(info) << " Hierarchy: " << globalSettings.hierarchy; LOG(info) << " CacheInterval: " << pluginSettings.cacheInterval / 1000 << " [s]"; if(globalSettings.validateConfig) { LOG(info) << " Only validating config files."; } else { LOG(info) << " ValidateConfig: Disabled"; } LOG(info) << "RestAPI Settings:"; LOG(info) << " REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort; #ifdef DEBUG LOG(info) << " Certificate: " << restAPISettings.certificate; LOG(info) << " Private key file: " << restAPISettings.privateKey; LOG(info) << " DH params from: " << restAPISettings.dhFile; #else LOG(info) << " Certificate, private key and DH-param file not printed."; #endif LOG_VAR(vLogLevel) << "----- Sampling Configuration -----"; for(auto& p : _configuration->getPlugins()) { LOG_VAR(vLogLevel) << "Sampling Plugin \"" << p.id << "\""; p.configurator->printConfig(vLogLevel); } LOG_VAR(vLogLevel) << "----- Analytics Configuration -----"; for(auto& p : _analyticsManager->getPlugins()) { LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\""; p.configurator->printConfig(vLogLevel); } LOG_VAR(vLogLevel) << "----- End Configuration -----"; if (globalSettings.validateConfig) { return 0; } //MQTTPusher and Https server get their own threads _mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.sensorPattern, globalSettings.qosLevel, _configuration->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum); _httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, _analyticsManager, io); _configuration->readAuthkeys(_httpsServer); //Init all sensors LOG(info) << "Init sensors..."; for(auto& p : _configuration->getPlugins()) { LOG(info) << "Init \"" << p.id << "\" plugin"; for(const auto& g : p.configurator->getSensorGroups()) { LOG(debug) << " -Group: " << g->getGroupName(); g->init(io); } } //Start all sensors LOG(info) << "Starting sensors..."; for(auto& p : _configuration->getPlugins()) { LOG(info) << "Start \"" << p.id << "\" plugin"; for(const auto& g : p.configurator->getSensorGroups()) { g->start(); } } if(!failedTree) { if(!_queryEngine.updated.is_lock_free()) LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded."; LOG(info) << "Init analyzers..."; _analyticsManager->init(io); LOG(info) << "Starting analyzers..."; _analyticsManager->start(); } LOG(info) << "Sensors started!"; if (globalSettings.daemonize) { //boost.log does not support forking officially. //however, just don't touch the sinks after daemonizing and it should work nonetheless LOG(info) << "Detaching..."; cmdSink->flush(); boost::log::core::get()->remove_sink(cmdSink); cmdSink.reset(); //daemonize dcdbdaemon(); LOG(info) << "Now detached"; } LOG(info) << "Creating threads..."; //dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API) keepAliveWork = boost::make_shared(io); //Create pool of threads which handle the sensors for(size_t i = 0; i < globalSettings.threads; i++) { threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io)); } boost::thread mqttThread(bind(&MQTTPusher::push, _mqttPusher)); boost::thread restThread(bind(&HttpsServer::run, _httpsServer)); LOG(info) << "Threads created!"; LOG(info) << "Registering signal handlers..."; signal(SIGINT, sigHandler); //Handle Strg+C signal(SIGTERM, sigHandler); //Handle termination LOG(info) << "Signal handlers registered!"; LOG(info) << "Setup complete!"; LOG(trace) << "Running..."; //Run until Strg+C threads.join_all(); //will only continue if interrupted by SIGINT and threads were stopped mqttThread.join(); LOG(info) << "MQTTPusher stopped"; restThread.join(); LOG(info) << "REST API Server stopped"; LOG(info) << "Exiting...Goodbye!"; return 0; }