dcdbpusher.cpp 14.6 KB
Newer Older
1 2
//================================================================================
// Name        : dcdbpusher.cpp
3
// Author      : Michael Ott (original), Micha Mueller
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
// Copyright   : Leibniz Supercomputing Centre
// Description : Main functions for the DCDB MQTT Pusher
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include <dcdbdaemon.h>
#include <functional>
29
#include <string>
30
#include <vector>
31

32 33
//Caution: include order matters! HttpsServer.h needs to be included first
#include "HttpsServer.h"
34 35
#include "Configuration.h"
#include "MQTTPusher.h"
36
#include "version.h"
37 38 39 40 41

#include <boost/foreach.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>

42 43 44 45 46 47 48
#include <boost/log/trivial.hpp>
#include <boost/log/expressions.hpp>
#include <boost/log/utility/setup/file.hpp>
#include <boost/log/utility/setup/console.hpp>
#include <boost/log/utility/setup/common_attributes.hpp>
#include <boost/log/support/date_time.hpp>

49 50
using namespace std;

Alessio Netti's avatar
Alessio Netti committed
51 52 53 54
Configuration*					_configuration;
MQTTPusher*						_mqttPusher;
HttpsServer*					_httpsServer;
AnalyticsManager* 				_analyticsManager;
55
std::map<std::string, SBasePtr> _sensorMap;
Alessio Netti's avatar
Alessio Netti committed
56
QueryEngine&					_queryEngine = QueryEngine::getInstance();
57

58
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
59

60
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
Alessio Netti's avatar
Alessio Netti committed
61 62 63
	//Initializing the sensor map if necessary. Thread safe!
	if(_queryEngine.updated.load()) {
		if(!_queryEngine.updating.exchange(true)) {
Alessio Netti's avatar
Alessio Netti committed
64
			_sensorMap.clear();
Alessio Netti's avatar
Alessio Netti committed
65 66 67 68 69 70 71 72 73 74
			for (auto &p : _configuration->getPlugins())
				for (auto &g : p.configurator->getSensorGroups())
					for (auto &s : g->getSensors())
						_sensorMap.insert(std::make_pair(s->getName(), s));
			_queryEngine.updated.store(false);
			_queryEngine.updating.store(false);
		} else {
			// Spinning while the sensormap is being built
			while( _queryEngine.updating.load() ) {}
		}
75 76 77
	}
	if(_sensorMap.count(name) > 0) {
		SBasePtr sensor = _sensorMap[name];
Alessio Netti's avatar
Alessio Netti committed
78
		if(!sensor->isInit())
79
			return buffer;
Alessio Netti's avatar
Alessio Netti committed
80 81
		else
			return sensor->getCache()->getView(startTs, endTs, buffer, rel, true);
82 83 84 85
	}
	return buffer;
}

Alessio Netti's avatar
Alessio Netti committed
86
void sigHandler(int sig) {
87
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
88 89 90 91
	if( sig == SIGINT )
		LOG(fatal) << "Received SIGINT";
	else if( sig == SIGTERM )
		LOG(fatal) << "Received SIGTERM";
92 93
	//Stop all sensors
	LOG(info) << "Stopping sensors...";
94
	for(auto& p : _configuration->getPlugins()) {
95
		LOG(info) << "Stop \"" << p.id << "\" plugin";
96
		for(const auto& g : p.configurator->getSensorGroups()) {
97
			g->stop();
98
		}
99
	}
100 101 102
	//Stop data analytics plugins and analyzers
	_analyticsManager->stop();

103 104
	//Stop io service by killing keepAliveWork
	keepAliveWork.reset();
105 106 107 108 109 110 111 112

	//Stop MQTTPusher
	LOG(info) << "Flushing MQTT queues...";
	_mqttPusher->stop();

	//Stop https server
	LOG(info) << "Stopping REST API Server...";
	_httpsServer->stop();
113 114 115 116 117 118 119 120
}

void printSyntax()
{
/*
                       1         2         3         4         5         6         7         8
             012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
121 122
	_configuration = new Configuration("");
	global_t& globalSettings = _configuration->getGlobal();
123
	cout << "Usage:" << endl;
Alessio Netti's avatar
Alessio Netti committed
124
	cout << "  dcdbpusher [-d] [-x] [-a<string>] [-b<host>] [-p<port>] [-m<string>] [-w<path>] [-v<level>] <path/to/configfiles/>" << endl;
125 126 127 128
	cout << "  dcdbpusher -h" << endl;
	cout << endl;

	cout << "Options:" << endl;
129 130 131 132 133 134 135
	cout << "  -a <string>     Auto-publish pattern         [default: none]" << endl;
	cout << "  -b <host>       MQTT broker                  [default: none]" << endl;
	cout << "  -p <port>       MQTT broker port             [default: " << globalSettings.brokerPort << "]" << endl;
	cout << "  -m <string>     MQTT topic prefix            [default: none]" << endl;
	cout << "  -w <path>       Writable temp dir            [default: .]" << endl;
	cout << "  -v <level>      Set verbosity of output      [default: " << globalSettings.logLevelCmd << "]" << endl
		 <<	"                  Can be a number between 5 (all) and 0 (fatal)." << endl;
136 137
	cout << endl;
	cout << "  -d              Daemonize" << endl;
138
	cout << "  -x              Parse and print the config but do not actually start dcdbpusher" << endl;
139 140
	cout << "  -h              This help page" << endl;
	cout << endl;
141 142 143

	delete _configuration;
	_configuration = nullptr;
144 145 146
}

int main(int argc, char** argv) {
147
	cout << "dcdbpusher " << VERSION << endl << endl;
148 149 150
	boost::asio::io_service io;
	boost::thread_group threads;

151
	if (argc <= 1) {
152 153
		cout << "Please specify a path to the config-directory" << endl << endl;
		printSyntax();
154 155 156
		return 1;
	}

157
	//define allowed command-line options once
Alessio Netti's avatar
Alessio Netti committed
158
	const char opts[] = "a:b:p:m:v:w:dxh";
159

160 161 162 163 164 165 166 167 168 169 170 171 172 173
	//check if help flag specified
	char c;
	while ((c = getopt(argc, argv, opts)) != -1) {
		switch (c)
		{
			case 'h':
				printSyntax();
				return 1;
				break;
			default:
				//do nothing (other options are read later on)
				break;
		}
	}
174

175
	//init LOGGING
Alessio Netti's avatar
Alessio Netti committed
176
	initLogging();
177 178

	//set up logger to command line
Alessio Netti's avatar
Alessio Netti committed
179
	auto cmdSink = setupCmdLogger();
180 181 182

	//get logger instance
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
183
	//finished logging startup for the moment (file log added later)
184

185
	_configuration = new Configuration(argv[argc-1]);
186

187
	//Read global variables from config file
188
	if(!_configuration->readGlobal()) {
189
		LOG(fatal) << "Failed to read global configuration!";
190 191
		return 1;
	}
192 193 194 195 196
	global_t& globalSettings = _configuration->getGlobal();
	//plugin and restAPI settings are actually part of globalSettings
	//use the references as shortcut, so that we do not have to prefix with 'globalSettings.' all the time
	pluginSettings_t& pluginSettings = globalSettings.pluginSettings;
	restAPISettings_t& restAPISettings = globalSettings.restAPISettings;
197

198 199
	//reset getopt()
	optind = 1;
Alessio Netti's avatar
Alessio Netti committed
200
	//read in options (overwrite dcdbpusher.conf settings if necessary)
201
	while ((c = getopt(argc, argv, opts)) != -1) {
202 203
		switch (c)
		{
204 205 206
			case 'a':
				pluginSettings.sensorPattern = optarg;
				break;
207
			case 'b':
208
				globalSettings.brokerHost = optarg;
209 210
				break;
			case 'p':
211
				globalSettings.brokerPort = atoi(optarg);
212 213
				break;
			case 'm':
214
				pluginSettings.mqttPrefix = optarg;
215
				break;
216
			case 'v':
217
				globalSettings.logLevelCmd = _configuration->translateLogLevel(stoi(optarg));
218
				break;
219
			case 'd':
220 221
				globalSettings.daemonize = 1;
				break;
222 223 224
			case 'x':
			    globalSettings.validateConfig = true;
			    break;
225
			case 'w':
226 227 228
				pluginSettings.tempdir = optarg;
				if (pluginSettings.tempdir[pluginSettings.tempdir.length()-1] != '/') {
					pluginSettings.tempdir.append("/");
229 230
				}
				break;
231 232 233 234 235
			case 'h':
				printSyntax();
				return 1;
				break;
			default:
236 237
				if (c != '?') cerr << "Unknown parameter: " << c << endl;
				return 1;
238 239 240
		}
	}

241
	//we now should know where the writable tempdir is
242
	//set up logger to file
Alessio Netti's avatar
Alessio Netti committed
243
	auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("dcdbpusher"));
244

245 246 247
	//severity level may be overwritten (per option or config-file) --> set it according to globalSettings
	fileSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelFile);
	cmdSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelCmd);
248

249 250
	LOG(info) << "Logging setup complete";

251
	//Read in rest of configuration. Also creates all sensors
252
	if(!_configuration->readPlugins()) {
253
		LOG(fatal) << "Failed to read configuration!";
254 255 256
		return 1;
	}

257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
	_analyticsManager = new AnalyticsManager();
	// Preparing the SensorNavigator
	bool failedTree = false;
	std::shared_ptr<SensorNavigator> navigator = std::make_shared<SensorNavigator>();
	vector<std::string> names, topics;
	for(const auto& p : _configuration->getPlugins())
		for(const auto& g : p.configurator->getSensorGroups())
			for(const auto& s : g->getSensors()) {
				names.push_back(s->getName());
				topics.push_back(s->getMqtt());
			}
	try {
		navigator->buildTree(globalSettings.hierarchy, &names, &topics);
	} catch(const std::invalid_argument& e) {
		LOG(error) << e.what();
		LOG(error) << "Failed to build sensor hierarchy tree, data analytics manager will not be initialized!";
		failedTree = true;
	}

	if(!failedTree) {
		_queryEngine.setNavigator(navigator);
		_queryEngine.triggerUpdate();
		_queryEngine.setQueryCallback(sensorQueryCallback);

		if(!_analyticsManager->load(argv[argc-1], "dcdbpusher.conf", pluginSettings)) {
			LOG(fatal) << "Failed to load data analytics manager!";
			return 1;
		}
	}

287 288 289 290 291 292
    //print configuration to give some feedback
	//config of plugins is only printed if the config shall be validated or to debug level otherwise
    LOG_LEVEL vLogLevel = LOG_LEVEL::debug;
    if (globalSettings.validateConfig) {
      vLogLevel = boost::log::trivial::info;
    }
293
    LOG_VAR(vLogLevel) << "-----  Configuration  -----";
294 295

	//print global settings in either case
296
	LOG(info) << "Global Settings:";
297 298 299 300 301 302 303 304 305 306 307
	LOG(info) << "    Broker:             " << globalSettings.brokerHost << ":" << globalSettings.brokerPort;
	LOG(info) << "    Threads:            " << globalSettings.threads;
	LOG(info) << "    Daemonize:          " << (globalSettings.daemonize ? "Enabled" : "Disabled");
	LOG(info) << "    MaxMsgNum:          " << globalSettings.maxMsgNum;
	LOG(info) << "    MaxInflightMsgNum:  " << globalSettings.maxInflightMsgNum;
	LOG(info) << "    MaxQueuedMsgNum:    " << globalSettings.maxQueuedMsgNum;
	LOG(info) << "    MQTT-QoS:           " << globalSettings.qosLevel;
	LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
	LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
	LOG(info) << "    Hierarchy:          " << globalSettings.hierarchy;
	LOG(info) << "    CacheInterval:      " << pluginSettings.cacheInterval / 1000 << " [s]";
308
	if(globalSettings.validateConfig) {
309
        LOG(info) << "    Only validating config files.";
310
    } else {
311
        LOG(info) << "    ValidateConfig:     Disabled";
312
    }
313 314

	LOG(info) << "RestAPI Settings:";
315
	LOG(info) << "    REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
316
#ifdef DEBUG
317 318 319
	LOG(info) << "    Certificate: " << restAPISettings.certificate;
	LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	LOG(info) << "    DH params from: " << restAPISettings.dhFile;
320
#else
321
	LOG(info) << "    Certificate, private key and DH-param file not printed.";
322
#endif
323

324
	LOG_VAR(vLogLevel) << "-----  Sampling Configuration  -----";
325
	for(auto& p : _configuration->getPlugins()) {
326
        LOG_VAR(vLogLevel) << "Sampling Plugin \"" << p.id << "\"";
327 328 329
        p.configurator->printConfig(vLogLevel);
    }

330 331 332 333 334 335 336
	LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
	for(auto& p : _analyticsManager->getPlugins()) {
		LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
		p.configurator->printConfig(vLogLevel);
	}

	LOG_VAR(vLogLevel) << "-----  End Configuration  -----";
337 338 339 340

	if (globalSettings.validateConfig) {
      return 0;
    }
341

342
	//MQTTPusher and Https server get their own threads
343
	_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.sensorPattern, globalSettings.qosLevel,
344
								 _configuration->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
Alessio Netti's avatar
Alessio Netti committed
345
	_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, _analyticsManager, io);
346
	_configuration->readAuthkeys(_httpsServer);
347

Micha Müller's avatar
Micha Müller committed
348
	//Init all sensors
349
	LOG(info) << "Init sensors...";
350
	for(auto& p : _configuration->getPlugins()) {
351
		LOG(info) << "Init \"" << p.id << "\" plugin";
352
		for(const auto& g : p.configurator->getSensorGroups()) {
353
			LOG(debug) << " -Group: " << g->getGroupName();
354
			g->init(io);
355
		}
356 357 358 359
	}

	//Start all sensors
	LOG(info) << "Starting sensors...";
360
	for(auto& p : _configuration->getPlugins()) {
361
		LOG(info) << "Start \"" << p.id << "\" plugin";
362
		for(const auto& g : p.configurator->getSensorGroups()) {
363
			g->start();
364
		}
365 366
	}

367
	if(!failedTree) {
Alessio Netti's avatar
Alessio Netti committed
368 369 370
		if(!_queryEngine.updated.is_lock_free())
			LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";

371 372 373 374 375 376 377
		LOG(info) << "Init analyzers...";
		_analyticsManager->init(io);

		LOG(info) << "Starting analyzers...";
		_analyticsManager->start();
	}

378 379
	LOG(info) << "Sensors started!";

380
	if (globalSettings.daemonize) {
381 382
		//boost.log does not support forking officially.
		//however, just don't touch the sinks after daemonizing and it should work nonetheless
383 384
		LOG(info) << "Detaching...";

385
		cmdSink->flush();
386 387 388 389
		boost::log::core::get()->remove_sink(cmdSink);
		cmdSink.reset();

		//daemonize
390
		dcdbdaemon();
391 392

		LOG(info) << "Now detached";
393 394
	}

395 396
	LOG(info) << "Creating threads...";

397 398 399
	//dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
	keepAliveWork = boost::make_shared<boost::asio::io_service::work>(io);

Micha Müller's avatar
Micha Müller committed
400
	//Create pool of threads which handle the sensors
401
	for(size_t i = 0; i < globalSettings.threads; i++) {
402 403 404
		threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
	}

405 406
	boost::thread mqttThread(bind(&MQTTPusher::push, _mqttPusher));
	boost::thread restThread(bind(&HttpsServer::run, _httpsServer));
407

408
	LOG(info) << "Threads created!";
409 410

	LOG(info) << "Registering signal handlers...";
Alessio Netti's avatar
Alessio Netti committed
411 412
	signal(SIGINT, sigHandler);	//Handle Strg+C
	signal(SIGTERM, sigHandler);	//Handle termination
413 414
	LOG(info) << "Signal handlers registered!";

415 416 417
	LOG(info) << "Setup complete!";

	LOG(trace) << "Running...";
418

Micha Müller's avatar
Micha Müller committed
419
	//Run until Strg+C
420 421
	threads.join_all();

422 423 424
	//will only continue if interrupted by SIGINT and threads were stopped

	mqttThread.join();
425
	LOG(info) << "MQTTPusher stopped";
426

427
	restThread.join();
428
	LOG(info) << "REST API Server stopped";
429

Micha Müller's avatar
Micha Müller committed
430
	LOG(info) << "Exiting...Goodbye!";
431 432
	return 0;
}