dcdbpusher.cpp 17.5 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbpusher.cpp
3
// Author      : Michael Ott (original), Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Copyright   : Leibniz Supercomputing Centre
// Description : Main functions for the DCDB MQTT Pusher
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include <dcdbdaemon.h>
#include <functional>
29
#include <string>
30
#include <vector>
31

32
33
//Caution: include order matters! HttpsServer.h needs to be included first
#include "HttpsServer.h"
34
35
#include "Configuration.h"
#include "MQTTPusher.h"
36
#include "version.h"
37
38
39
40
41

#include <boost/foreach.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>

42
43
44
45
46
47
48
#include <boost/log/trivial.hpp>
#include <boost/log/expressions.hpp>
#include <boost/log/utility/setup/file.hpp>
#include <boost/log/utility/setup/console.hpp>
#include <boost/log/utility/setup/common_attributes.hpp>
#include <boost/log/support/date_time.hpp>

49
50
using namespace std;

Alessio Netti's avatar
Alessio Netti committed
51
52
53
54
Configuration*					_configuration;
MQTTPusher*						_mqttPusher;
HttpsServer*					_httpsServer;
AnalyticsManager* 				_analyticsManager;
55
std::map<std::string, SBasePtr> _sensorMap;
Alessio Netti's avatar
Alessio Netti committed
56
QueryEngine&					_queryEngine = QueryEngine::getInstance();
57

58
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
59

60
61
62
63
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
	if(!buffer)
		buffer = new std::vector<reading_t>();
	buffer->clear();
Alessio Netti's avatar
Alessio Netti committed
64
65
66
	//Initializing the sensor map if necessary. Thread safe!
	if(_queryEngine.updated.load()) {
		if(!_queryEngine.updating.exchange(true)) {
Alessio Netti's avatar
Alessio Netti committed
67
			_sensorMap.clear();
Alessio Netti's avatar
Alessio Netti committed
68
69
70
71
72
73
74
75
76
77
			for (auto &p : _configuration->getPlugins())
				for (auto &g : p.configurator->getSensorGroups())
					for (auto &s : g->getSensors())
						_sensorMap.insert(std::make_pair(s->getName(), s));
			_queryEngine.updated.store(false);
			_queryEngine.updating.store(false);
		} else {
			// Spinning while the sensormap is being built
			while( _queryEngine.updating.load() ) {}
		}
78
79
80
81
82
83
	}

	if(_sensorMap.count(name) > 0) {
		SBasePtr sensor = _sensorMap[name];
		// Converting absolute timestamps to relative offsets for cache access

Alessio Netti's avatar
Alessio Netti committed
84
85
86
87
88
		uint64_t interval = ((uint64_t)sensor->getCacheInterval() / ((uint64_t)sensor->getCacheSize() - 1)) * 2 * 1000000;
		uint64_t now = getTimestamp();
		uint64_t startTsInt = rel ? startTs : now - startTs;
		uint64_t endTsInt = rel ? endTs : now - endTs;

89
90
91
		//TODO: replace these two lines of code with the calls from CacheEntry used in collectagent
		//Converting absolute timestamps to relative offsets for cache access
		//Getting the cache indexes to access sensor data
Alessio Netti's avatar
Alessio Netti committed
92
93
		int64_t startIdx = sensor->getCacheOffset(startTsInt);
		int64_t endIdx = sensor->getCacheOffset(endTsInt);
94
95
96
97
98

		//Managing invalid time offsets
		if( startIdx < 0 || endIdx < 0)
			return buffer;

Alessio Netti's avatar
Alessio Netti committed
99
100
101
102
103
		//TODO: better ways to manage data obsolescence?
		//Managing obsolete data
		if(now - startTsInt > sensor->getCache()[startIdx].timestamp + interval || now - endTsInt > sensor->getCache()[endIdx].timestamp + interval)
            return buffer;

104
105
106
107
108
109
110
111
112
113
		if( startIdx <= endIdx )
			buffer->insert(buffer->end(), sensor->getCache() + startIdx, sensor->getCache() + endIdx + 1);
		else {
			buffer->insert(buffer->end(), sensor->getCache() + startIdx, sensor->getCache() + sensor->getCacheSize());
			buffer->insert(buffer->end(), sensor->getCache(), sensor->getCache() + endIdx + 1);
		}
	}
	return buffer;
}

Alessio Netti's avatar
Alessio Netti committed
114
void sigHandler(int sig) {
115
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
116
117
118
119
	if( sig == SIGINT )
		LOG(fatal) << "Received SIGINT";
	else if( sig == SIGTERM )
		LOG(fatal) << "Received SIGTERM";
120
121
	//Stop all sensors
	LOG(info) << "Stopping sensors...";
122
	for(auto& p : _configuration->getPlugins()) {
123
		LOG(info) << "Stop \"" << p.id << "\" plugin";
124
		for(const auto& g : p.configurator->getSensorGroups()) {
125
			g->stop();
126
		}
127
	}
128
129
130
	//Stop data analytics plugins and analyzers
	_analyticsManager->stop();

131
132
	//Stop io service by killing keepAliveWork
	keepAliveWork.reset();
133
134
135
136
137
138
139
140

	//Stop MQTTPusher
	LOG(info) << "Flushing MQTT queues...";
	_mqttPusher->stop();

	//Stop https server
	LOG(info) << "Stopping REST API Server...";
	_httpsServer->stop();
141
142
143
144
145
146
147
148
}

void printSyntax()
{
/*
                       1         2         3         4         5         6         7         8
             012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
149
150
	_configuration = new Configuration("");
	global_t& globalSettings = _configuration->getGlobal();
151
	cout << "Usage:" << endl;
152
	cout << "  dcdbpusher [-d] [-x] [-a<string>] [-b<host>] [-p<port>] [-m<string>] [-t<number>] <path/to/configfiles/>" << endl;
153
154
155
156
	cout << "  dcdbpusher -h" << endl;
	cout << endl;

	cout << "Options:" << endl;
157
158
159
160
161
162
163
164
165
	cout << "  -a <string>     Auto-publish pattern         [default: none]" << endl;
	cout << "  -b <host>       MQTT broker                  [default: none]" << endl;
	cout << "  -p <port>       MQTT broker port             [default: " << globalSettings.brokerPort << "]" << endl;
	cout << "  -m <string>     MQTT topic prefix            [default: none]" << endl;
	cout << "  -t <number>     Thread count                 [default: " << globalSettings.threads << "]" << endl;
	cout << "  -w <path>       Writable temp dir            [default: .]" << endl;
	cout << "  -c <sec>        Cache interval in [s]        [default: " << globalSettings.pluginSettings.cacheInterval / 1000 << "]" << endl;
	cout << "  -v <level>      Set verbosity of output      [default: " << globalSettings.logLevelCmd << "]" << endl
		 <<	"                  Can be a number between 5 (all) and 0 (fatal)." << endl;
166
167
	cout << endl;
	cout << "  -d              Daemonize" << endl;
168
	cout << "  -x              Parse and print the config but do not actually start dcdbpusher" << endl;
169
170
	cout << "  -h              This help page" << endl;
	cout << endl;
171
172
173

	delete _configuration;
	_configuration = nullptr;
174
175
176
}

int main(int argc, char** argv) {
177
	cout << "dcdbpusher " << VERSION << endl << endl;
178
179
180
	boost::asio::io_service io;
	boost::thread_group threads;

181
	if (argc <= 1) {
182
183
		cout << "Please specify a path to the config-directory" << endl << endl;
		printSyntax();
184
185
186
		return 1;
	}

187
	//define allowed command-line options once
188
	const char opts[] = "a:b:p:m:t:v:w:c:dxh";
189

190
191
192
193
194
195
196
197
198
199
200
201
202
203
	//check if help flag specified
	char c;
	while ((c = getopt(argc, argv, opts)) != -1) {
		switch (c)
		{
			case 'h':
				printSyntax();
				return 1;
				break;
			default:
				//do nothing (other options are read later on)
				break;
		}
	}
204

205
206
207
208
209
	//init LOGGING
	boost::log::add_common_attributes();

	//set up logger to command line
	auto cmdSink = boost::log::add_console_log(
210
211
212
213
214
215
216
			std::cout,
			boost::log::keywords::format = (boost::log::expressions::stream //only print timestamp (without date), severity and message to terminal
					<< "[" << boost::log::expressions::format_date_time< boost::posix_time::ptime >("TimeStamp", "%H:%M:%S") << "]"
					<< " <" << boost::log::trivial::severity << ">"
					<< ": " << boost::log::expressions::smessage
			),
			boost::log::keywords::filter = (boost::log::trivial::severity >= boost::log::trivial::info)
217
218
219
220
	);

	//get logger instance
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
221
	//finished logging startup for the moment (file log added later)
222

223
	_configuration = new Configuration(argv[argc-1]);
224

225
	//Read global variables from config file
226
	if(!_configuration->readGlobal()) {
227
		LOG(fatal) << "Failed to read global configuration!";
228
229
		return 1;
	}
230
231
232
233
234
	global_t& globalSettings = _configuration->getGlobal();
	//plugin and restAPI settings are actually part of globalSettings
	//use the references as shortcut, so that we do not have to prefix with 'globalSettings.' all the time
	pluginSettings_t& pluginSettings = globalSettings.pluginSettings;
	restAPISettings_t& restAPISettings = globalSettings.restAPISettings;
235

236
237
238
239
	//reset getopt()
	optind = 1;
	//read in options (overwrite global.conf settings if necessary)
	while ((c = getopt(argc, argv, opts)) != -1) {
240
241
		switch (c)
		{
242
243
244
			case 'a':
				pluginSettings.sensorPattern = optarg;
				break;
245
			case 'b':
246
				globalSettings.brokerHost = optarg;
247
248
				break;
			case 'p':
249
				globalSettings.brokerPort = atoi(optarg);
250
251
				break;
			case 'm':
252
				pluginSettings.mqttPrefix = optarg;
253
254
				break;
			case 't':
255
				globalSettings.threads = stoul(optarg);
256
				break;
257
			case 'v':
258
				globalSettings.logLevelCmd = _configuration->translateLogLevel(stoi(optarg));
259
				break;
260
			case 'd':
261
262
				globalSettings.daemonize = 1;
				break;
263
264
265
			case 'x':
			    globalSettings.validateConfig = true;
			    break;
266
			case 'w':
267
268
269
				pluginSettings.tempdir = optarg;
				if (pluginSettings.tempdir[pluginSettings.tempdir.length()-1] != '/') {
					pluginSettings.tempdir.append("/");
270
271
				}
				break;
272
273
274
			case 'c':
				pluginSettings.cacheInterval = stoul(optarg) * 1000;
				break;
275
276
277
278
279
			case 'h':
				printSyntax();
				return 1;
				break;
			default:
280
281
				if (c != '?') cerr << "Unknown parameter: " << c << endl;
				return 1;
282
283
284
		}
	}

285
	//we now should know where the writable tempdir is
286
287
	//set up logger to file
	auto fileSink = boost::log::add_file_log(
288
289
290
291
292
293
294
295
296
297
298
299
300
			boost::log::keywords::file_name = pluginSettings.tempdir + "dcdb_%N.log",	//	number logfiles ascending
			boost::log::keywords::rotation_size = 10 * 1024 * 1024,	//rotate logfile every 10 MiB
			//boost::log::keywords::time_based_rotation = boost::log::sinks::file::rotation_at_time_point(0, 0, 0),	//Throws bad year-exception for no obvious reason
			boost::log::keywords::format = //	format:		LineID [Timestamp] ThreadID <severity>: Message
					(	boost::log::expressions::stream
							<< boost::log::expressions::attr< unsigned int >("LineID")
							<< " [" << boost::log::expressions::format_date_time< boost::posix_time::ptime >("TimeStamp", "%Y-%m-%d, %H:%M:%S") << "]"
							<< " " << boost::log::expressions::attr<boost::log::attributes::current_thread_id::value_type >("ThreadID")
							<< " <" << boost::log::trivial::severity << ">"
							<< ": " << boost::log::expressions::smessage
					),
			boost::log::keywords::filter = (boost::log::trivial::severity >= boost::log::trivial::trace),
			boost::log::keywords::auto_flush = true
301
302
	);

303
304
305
	//severity level may be overwritten (per option or config-file) --> set it according to globalSettings
	fileSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelFile);
	cmdSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelCmd);
306

307
308
	LOG(info) << "Logging setup complete";

309
	//Read in rest of configuration. Also creates all sensors
310
	if(!_configuration->readPlugins()) {
311
		LOG(fatal) << "Failed to read configuration!";
312
313
314
		return 1;
	}

315
316
317
318
319
320
321
322
323
    //print configuration to give some feedback
	//config of plugins is only printed if the config shall be validated or to debug level otherwise
    LOG_LEVEL vLogLevel = LOG_LEVEL::debug;
    if (globalSettings.validateConfig) {
      vLogLevel = boost::log::trivial::info;
    }
    LOG_VAR(vLogLevel) << "-----  Configuration:  -----";

	//print global settings in either case
324
	LOG(info) << "Global Settings:";
Alessio Netti's avatar
Logging    
Alessio Netti committed
325
326
	LOG(info) << "  Broker:             " << globalSettings.brokerHost << ":" << globalSettings.brokerPort;
	LOG(info) << "  Threads:            " << globalSettings.threads;
327
	if(globalSettings.daemonize) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
328
        LOG(info) << "  Daemonize:          Enabled";
329
	} else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
330
        LOG(info) << "  Daemonize:          Disabled";
331
	}
Alessio Netti's avatar
Logging    
Alessio Netti committed
332
333
334
335
336
337
338
339
	LOG(info) << "  MaxMsgNum:          " << globalSettings.maxMsgNum;
	LOG(info) << "  MaxInflightMsgNum:  " << globalSettings.maxInflightMsgNum;
	LOG(info) << "  MaxQueuedMsgNum:    " << globalSettings.maxQueuedMsgNum;
	LOG(info) << "  MQTT-QoS:           " << globalSettings.qosLevel;
	LOG(info) << "  MQTT-prefix:        " << pluginSettings.mqttPrefix;
	LOG(info) << "  Write-Dir:          " << pluginSettings.tempdir;
	LOG(info) << "  Hierarchy:          " << globalSettings.hierarchy;
	LOG(info) << "  CacheInterval:      " << pluginSettings.cacheInterval / 1000 << " [s]";
340
341
342
	if(globalSettings.validateConfig) {
        LOG(info) << "  Only validating config files.";
    } else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
343
        LOG(info) << "  validateConfig:     Disabled";
344
    }
345
346

	LOG(info) << "RestAPI Settings:";
347
	LOG(info) << "  REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
348
#ifdef DEBUG
349
350
351
	LOG(info) << "  Certificate: " << restAPISettings.certificate;
	LOG(info) << "  Private key file: " << restAPISettings.privateKey;
	LOG(info) << "  DH params from: " << restAPISettings.dhFile;
352
353
#else
	LOG(info) << "  Certificate, private key and DH-param file not printed.";
354
#endif
355
356
357
358
359
360
361
362
363
364
365

	for(auto& p : _configuration->getPlugins()) {
        LOG_VAR(vLogLevel) << "Plugin \"" << p.id << "\"";
        p.configurator->printConfig(vLogLevel);
    }

	LOG_VAR(vLogLevel) << "-----  End Config  -----";

	if (globalSettings.validateConfig) {
      return 0;
    }
366

367
	//MQTTPusher and Https server get their own threads
368
369
	_analyticsManager = new AnalyticsManager();
	_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.sensorPattern, globalSettings.qosLevel,
370
								 _configuration->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
Alessio Netti's avatar
Alessio Netti committed
371
	_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, _analyticsManager, io);
372
	_configuration->readAuthkeys(_httpsServer);
373

Micha Mueller's avatar
Micha Mueller committed
374
	//Init all sensors
375
	LOG(info) << "Init sensors...";
376
	for(auto& p : _configuration->getPlugins()) {
377
		LOG(info) << "Init \"" << p.id << "\" plugin";
378
		for(const auto& g : p.configurator->getSensorGroups()) {
379
			LOG(debug) << " -Group: " << g->getGroupName();
380
			g->init(io);
381
		}
382
383
384
385
	}

	//Start all sensors
	LOG(info) << "Starting sensors...";
386
	for(auto& p : _configuration->getPlugins()) {
387
		LOG(info) << "Start \"" << p.id << "\" plugin";
388
		for(const auto& g : p.configurator->getSensorGroups()) {
389
			g->start();
390
		}
391
392
	}

Alessio Netti's avatar
Alessio Netti committed
393
	// Preparing the SensorNavigator
394
395
396
397
398
399
400
401
402
403
404
405
	bool failedTree = false;
	std::shared_ptr<SensorNavigator> navigator = std::make_shared<SensorNavigator>();
	vector<std::string> names, topics;
	for(const auto& p : _configuration->getPlugins())
		for(const auto& g : p.configurator->getSensorGroups())
			for(const auto& s : g->getSensors()) {
				names.push_back(s->getName());
				topics.push_back(s->getMqtt());
			}
	try {
		navigator->buildTree(globalSettings.hierarchy, &names, &topics);
	} catch(const std::invalid_argument& e) {
Alessio Netti's avatar
Alessio Netti committed
406
        LOG(error) << e.what();
407
408
409
410
411
		LOG(error) << "Failed to build sensor hierarchy tree, data analytics manager will not be initialized!";
		failedTree = true;
	}

	if(!failedTree) {
Alessio Netti's avatar
Alessio Netti committed
412
413
414
		_queryEngine.setNavigator(navigator);
		_queryEngine.triggerUpdate();
		_queryEngine.setQueryCallback(sensorQueryCallback);
415
416
417
418
419
420
421

		if(!_analyticsManager->load(argv[argc-1], "global.conf", pluginSettings) ||
		   !_analyticsManager->mqttCheck(_configuration->getPlugins())) {
			LOG(fatal) << "Failed to load data analytics manager!";
			return 1;
		}

Alessio Netti's avatar
Alessio Netti committed
422
423
424
		if(!_queryEngine.updated.is_lock_free())
			LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";

425
426
427
428
429
430
431
		LOG(info) << "Init analyzers...";
		_analyticsManager->init(io);

		LOG(info) << "Starting analyzers...";
		_analyticsManager->start();
	}

432
433
	LOG(info) << "Sensors started!";

434
	if (globalSettings.daemonize) {
435
436
		//boost.log does not support forking officially.
		//however, just don't touch the sinks after daemonizing and it should work nonetheless
437
438
		LOG(info) << "Detaching...";

439
		cmdSink->flush();
440
441
442
443
		boost::log::core::get()->remove_sink(cmdSink);
		cmdSink.reset();

		//daemonize
444
		dcdbdaemon();
445
446

		LOG(info) << "Now detached";
447
448
	}

449
450
	LOG(info) << "Creating threads...";

451
452
453
	//dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
	keepAliveWork = boost::make_shared<boost::asio::io_service::work>(io);

Micha Mueller's avatar
Micha Mueller committed
454
	//Create pool of threads which handle the sensors
455
	for(size_t i = 0; i < globalSettings.threads; i++) {
456
457
458
		threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
	}

459
460
	boost::thread mqttThread(bind(&MQTTPusher::push, _mqttPusher));
	boost::thread restThread(bind(&HttpsServer::run, _httpsServer));
461

462
	LOG(info) << "Threads created!";
463
464

	LOG(info) << "Registering signal handlers...";
Alessio Netti's avatar
Alessio Netti committed
465
466
	signal(SIGINT, sigHandler);	//Handle Strg+C
	signal(SIGTERM, sigHandler);	//Handle termination
467
468
	LOG(info) << "Signal handlers registered!";

469
470
471
	LOG(info) << "Setup complete!";

	LOG(trace) << "Running...";
472

Micha Mueller's avatar
Micha Mueller committed
473
	//Run until Strg+C
474
475
	threads.join_all();

476
477
478
	//will only continue if interrupted by SIGINT and threads were stopped

	mqttThread.join();
479
	LOG(info) << "MQTTPusher stopped";
480

481
	restThread.join();
482
	LOG(info) << "REST API Server stopped";
483

Micha Mueller's avatar
Micha Mueller committed
484
	LOG(info) << "Exiting...Goodbye!";
485
486
	return 0;
}