dcdbpusher.cpp 15.5 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbpusher.cpp
3
// Author      : Michael Ott (original), Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Copyright   : Leibniz Supercomputing Centre
// Description : Main functions for the DCDB MQTT Pusher
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include <dcdbdaemon.h>
#include <functional>
29
#include <string>
30
#include <vector>
31

32
33
//Caution: include order matters! HttpsServer.h needs to be included first
#include "HttpsServer.h"
34
35
#include "Configuration.h"
#include "MQTTPusher.h"
36
#include "version.h"
37
38
39
40
41

#include <boost/foreach.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>

42
43
44
45
46
47
48
#include <boost/log/trivial.hpp>
#include <boost/log/expressions.hpp>
#include <boost/log/utility/setup/file.hpp>
#include <boost/log/utility/setup/console.hpp>
#include <boost/log/utility/setup/common_attributes.hpp>
#include <boost/log/support/date_time.hpp>

49
50
using namespace std;

Alessio Netti's avatar
Alessio Netti committed
51
52
53
54
Configuration*					_configuration;
MQTTPusher*						_mqttPusher;
HttpsServer*					_httpsServer;
AnalyticsManager* 				_analyticsManager;
55
std::map<std::string, SBasePtr> _sensorMap;
Alessio Netti's avatar
Alessio Netti committed
56
QueryEngine&					_queryEngine = QueryEngine::getInstance();
57

58
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
59

60
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
Alessio Netti's avatar
Alessio Netti committed
61
62
63
	//Initializing the sensor map if necessary. Thread safe!
	if(_queryEngine.updated.load()) {
		if(!_queryEngine.updating.exchange(true)) {
Alessio Netti's avatar
Alessio Netti committed
64
			_sensorMap.clear();
65
			// Adding ordinary sensors to the map
Alessio Netti's avatar
Alessio Netti committed
66
67
68
			for (auto &p : _configuration->getPlugins())
				for (auto &g : p.configurator->getSensorGroups())
					for (auto &s : g->getSensors())
69
70
71
72
73
74
75
76
					    _sensorMap.insert(std::make_pair(s->getName(), s));
            // Adding data analytics sensors to the map
            for(auto& p : _analyticsManager->getPlugins())
                for(const auto& a : p.configurator->getAnalyzers())
                    if (a->getStreaming())
                        for (const auto &u : a->getUnits())
                            for (const auto &o: u->getBaseOutputs())
						        _sensorMap.insert(std::make_pair(o->getName(), o));
Alessio Netti's avatar
Alessio Netti committed
77
78
79
80
81
82
			_queryEngine.updated.store(false);
			_queryEngine.updating.store(false);
		} else {
			// Spinning while the sensormap is being built
			while( _queryEngine.updating.load() ) {}
		}
83
84
85
	}
	if(_sensorMap.count(name) > 0) {
		SBasePtr sensor = _sensorMap[name];
Alessio Netti's avatar
Alessio Netti committed
86
		if(!sensor->isInit())
87
			return NULL;
Alessio Netti's avatar
Alessio Netti committed
88
		else
89
			return sensor->getCache()->getView(startTs, endTs, buffer, rel);
90
	}
91
	return NULL;
92
93
}

Alessio Netti's avatar
Alessio Netti committed
94
void sigHandler(int sig) {
95
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
96
97
98
99
	if( sig == SIGINT )
		LOG(fatal) << "Received SIGINT";
	else if( sig == SIGTERM )
		LOG(fatal) << "Received SIGTERM";
100
101
	//Stop all sensors
	LOG(info) << "Stopping sensors...";
102
	for(auto& p : _configuration->getPlugins()) {
103
		LOG(info) << "Stop \"" << p.id << "\" plugin";
104
		for(const auto& g : p.configurator->getSensorGroups()) {
105
			g->stop();
106
		}
107
	}
108
109
110
	//Stop data analytics plugins and analyzers
	_analyticsManager->stop();

111
112
	//Stop io service by killing keepAliveWork
	keepAliveWork.reset();
113
114
115
116
117
118
119
120

	//Stop MQTTPusher
	LOG(info) << "Flushing MQTT queues...";
	_mqttPusher->stop();

	//Stop https server
	LOG(info) << "Stopping REST API Server...";
	_httpsServer->stop();
121
122
123
124
125
126
127
128
}

void printSyntax()
{
/*
                       1         2         3         4         5         6         7         8
             012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
Alessio Netti's avatar
Alessio Netti committed
129
	_configuration = new Configuration("", "dcdbpusher.conf");
130
	cout << "Usage:" << endl;
131
	cout << "  dcdbpusher [-d] [-x] [-a] [-b<host>] [-m<string>] [-w<path>] [-v<level>] <path/to/configfiles/>" << endl;
132
133
134
135
	cout << "  dcdbpusher -h" << endl;
	cout << endl;

	cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
136
	cout << "  -b <host>       MQTT broker                  [default: " << _configuration->brokerHost << ":" << _configuration->brokerPort << "]" << endl;
137
138
	cout << "  -m <string>     MQTT topic prefix            [default: none]" << endl;
	cout << "  -w <path>       Writable temp dir            [default: .]" << endl;
Alessio Netti's avatar
Alessio Netti committed
139
	cout << "  -v <level>      Set verbosity of output      [default: " << _configuration->logLevelCmd << "]" << endl
140
		 <<	"                  Can be a number between 5 (all) and 0 (fatal)." << endl;
141
142
	cout << endl;
	cout << "  -d              Daemonize" << endl;
143
	cout << "  -x              Parse and print the config but do not actually start dcdbpusher" << endl;
144
	cout << "  -a			   Enable sensor auto-publish" << endl;
145
146
	cout << "  -h              This help page" << endl;
	cout << endl;
147
148
149

	delete _configuration;
	_configuration = nullptr;
150
151
152
}

int main(int argc, char** argv) {
153
	cout << "dcdbpusher " << VERSION << endl << endl;
154
155
156
	boost::asio::io_service io;
	boost::thread_group threads;

157
	if (argc <= 1) {
158
159
		cout << "Please specify a path to the config-directory" << endl << endl;
		printSyntax();
160
161
162
		return 1;
	}

163
	//define allowed command-line options once
164
	const char opts[] = "b:p:m:v:w:dxah";
165

166
167
168
169
170
171
172
173
174
175
176
177
178
	//check if help flag specified
	char c;
	while ((c = getopt(argc, argv, opts)) != -1) {
		switch (c)
		{
			case 'h':
				printSyntax();
				return 1;
			default:
				//do nothing (other options are read later on)
				break;
		}
	}
179

180
	//init LOGGING
Alessio Netti's avatar
Alessio Netti committed
181
	initLogging();
182
183

	//set up logger to command line
Alessio Netti's avatar
Alessio Netti committed
184
	auto cmdSink = setupCmdLogger();
185
186
187

	//get logger instance
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
188
	//finished logging startup for the moment (file log added later)
189

Alessio Netti's avatar
Alessio Netti committed
190
	_configuration = new Configuration(argv[argc-1], "dcdbpusher.conf");
191

192
	//Read global variables from config file
Alessio Netti's avatar
Alessio Netti committed
193
	if(!_configuration->readConfig()) {
194
		LOG(fatal) << "Failed to read global configuration!";
195
196
		return 1;
	}
Alessio Netti's avatar
Alessio Netti committed
197
	
198
199
	//plugin and restAPI settings are actually part of globalSettings
	//use the references as shortcut, so that we do not have to prefix with 'globalSettings.' all the time
Alessio Netti's avatar
Alessio Netti committed
200
201
202
203
    Configuration& globalSettings = *_configuration;
	pluginSettings_t& pluginSettings = _configuration->pluginSettings;
	restAPISettings_t& restAPISettings = _configuration->restAPISettings;
	analyticsSettings_t& analyticsSettings = _configuration->analyticsSettings;
204

205
206
	//reset getopt()
	optind = 1;
Alessio Netti's avatar
Alessio Netti committed
207
	//read in options (overwrite dcdbpusher.conf settings if necessary)
208
	while ((c = getopt(argc, argv, opts)) != -1) {
209
210
		switch (c)
		{
211
			case 'a':
212
				pluginSettings.autoPublish = true;
213
				break;
214
			case 'b':
Alessio Netti's avatar
Alessio Netti committed
215
216
                globalSettings.brokerHost = parseNetworkHost(optarg);
                globalSettings.brokerPort = parseNetworkPort(optarg)=="" ? BROKERPORT : stoi(parseNetworkPort(optarg));
217
218
				break;
			case 'm':
219
				pluginSettings.mqttPrefix = optarg;
220
				break;
221
			case 'v':
Alessio Netti's avatar
Alessio Netti committed
222
				globalSettings.logLevelCmd = translateLogLevel(stoi(optarg));
223
				break;
224
			case 'd':
225
226
				globalSettings.daemonize = 1;
				break;
227
228
229
			case 'x':
			    globalSettings.validateConfig = true;
			    break;
230
			case 'w':
231
232
233
				pluginSettings.tempdir = optarg;
				if (pluginSettings.tempdir[pluginSettings.tempdir.length()-1] != '/') {
					pluginSettings.tempdir.append("/");
234
235
				}
				break;
236
237
238
239
			case 'h':
				printSyntax();
				return 1;
			default:
240
241
				if (c != '?') cerr << "Unknown parameter: " << c << endl;
				return 1;
242
243
244
		}
	}

245
	//we now should know where the writable tempdir is
246
	//set up logger to file
Alessio Netti's avatar
Alessio Netti committed
247
	auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("dcdbpusher"));
248

249
250
251
	//severity level may be overwritten (per option or config-file) --> set it according to globalSettings
	fileSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelFile);
	cmdSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelCmd);
252

253
254
	LOG(info) << "Logging setup complete";

255
	//Read in rest of configuration. Also creates all sensors
256
	if(!_configuration->readPlugins()) {
257
		LOG(fatal) << "Failed to read configuration!";
258
259
260
		return 1;
	}

261
262
	_analyticsManager = new AnalyticsManager();
	// Preparing the SensorNavigator
263
264
	if(_analyticsManager->probe(argv[argc-1], "dcdbpusher.conf")) {
		std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
Alessio Netti's avatar
Alessio Netti committed
265
		vector <std::string> topics;
266
267
		for (const auto &p : _configuration->getPlugins())
			for (const auto &g : p.configurator->getSensorGroups())
Alessio Netti's avatar
Alessio Netti committed
268
				for (const auto &s : g->getSensors())
269
270
					topics.push_back(s->getMqtt());
		try {
271
			navigator->setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
272
273
			navigator->buildTree(analyticsSettings.hierarchy, &topics);
			topics.clear();
274
275
276
277
278
			LOG(info) << "Built a sensor hierarchy tree of size " << navigator->getTreeSize() << " and depth " << navigator->getTreeDepth() << ".";
			_queryEngine.setNavigator(navigator);
			_queryEngine.triggerUpdate();
		} catch (const std::invalid_argument &e) {
			LOG(error) << e.what();
279
			LOG(error) << "Failed to build sensor hierarchy tree!";
280
281
282
			return 1;
		}
	}
283
284
285
286
287
288
	
    _queryEngine.setQueryCallback(sensorQueryCallback);
    if(!_analyticsManager->load(argv[argc-1], "dcdbpusher.conf", pluginSettings)) {
        LOG(fatal) << "Failed to load data analytics manager!";
        return 1;
    }
289

290
291
292
293
294
295
    //print configuration to give some feedback
	//config of plugins is only printed if the config shall be validated or to debug level otherwise
    LOG_LEVEL vLogLevel = LOG_LEVEL::debug;
    if (globalSettings.validateConfig) {
      vLogLevel = boost::log::trivial::info;
    }
296
    LOG_VAR(vLogLevel) << "-----  Configuration  -----";
297
298

	//print global settings in either case
299
	LOG(info) << "Global Settings:";
300
301
302
303
304
305
306
307
	LOG(info) << "    Broker:             " << globalSettings.brokerHost << ":" << globalSettings.brokerPort;
	LOG(info) << "    Threads:            " << globalSettings.threads;
	LOG(info) << "    Daemonize:          " << (globalSettings.daemonize ? "Enabled" : "Disabled");
	LOG(info) << "    MaxMsgNum:          " << globalSettings.maxMsgNum;
	LOG(info) << "    MaxInflightMsgNum:  " << globalSettings.maxInflightMsgNum;
	LOG(info) << "    MaxQueuedMsgNum:    " << globalSettings.maxQueuedMsgNum;
	LOG(info) << "    MQTT-QoS:           " << globalSettings.qosLevel;
	LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
308
	LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
309
310
	LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
	LOG(info) << "    CacheInterval:      " << pluginSettings.cacheInterval / 1000 << " [s]";
311
	if(globalSettings.validateConfig) {
312
        LOG(info) << "    Only validating config files.";
313
    } else {
314
        LOG(info) << "    ValidateConfig:     Disabled";
315
    }
Alessio Netti's avatar
Alessio Netti committed
316
317
	LOG(info) << "Analytics Settings:";
	LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
318
    LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
319
	LOG(info) << "RestAPI Settings:";
320
	LOG(info) << "    REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
321
#ifdef DEBUG
322
323
324
	LOG(info) << "    Certificate: " << restAPISettings.certificate;
	LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	LOG(info) << "    DH params from: " << restAPISettings.dhFile;
325
#else
326
	LOG(info) << "    Certificate, private key and DH-param file not printed.";
327
#endif
328

329
	LOG_VAR(vLogLevel) << "-----  Sampling Configuration  -----";
330
	for(auto& p : _configuration->getPlugins()) {
331
        LOG_VAR(vLogLevel) << "Sampling Plugin \"" << p.id << "\"";
332
333
334
        p.configurator->printConfig(vLogLevel);
    }

335
336
337
338
339
340
341
	LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
	for(auto& p : _analyticsManager->getPlugins()) {
		LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
		p.configurator->printConfig(vLogLevel);
	}

	LOG_VAR(vLogLevel) << "-----  End Configuration  -----";
342
343
344
345

	if (globalSettings.validateConfig) {
      return 0;
    }
346

347
	//MQTTPusher and Https server get their own threads
348
	_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.autoPublish, globalSettings.qosLevel,
349
								 _configuration->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
Alessio Netti's avatar
Alessio Netti committed
350
	_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, _analyticsManager, io);
351
	_configuration->readAuthkeys(_httpsServer);
352

Micha Mueller's avatar
Micha Mueller committed
353
	//Init all sensors
354
	LOG(info) << "Init sensors...";
355
	for(auto& p : _configuration->getPlugins()) {
356
		LOG(info) << "Init \"" << p.id << "\" plugin";
357
		for(const auto& g : p.configurator->getSensorGroups()) {
358
			LOG(debug) << " -Group: " << g->getGroupName();
359
			g->init(io);
360
		}
361
362
363
364
	}

	//Start all sensors
	LOG(info) << "Starting sensors...";
365
	for(auto& p : _configuration->getPlugins()) {
366
		LOG(info) << "Start \"" << p.id << "\" plugin";
367
		for(const auto& g : p.configurator->getSensorGroups()) {
368
			g->start();
369
		}
370
	}
371
372
373
	
    if(!_queryEngine.updated.is_lock_free())
        LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
374

375
376
    LOG(info) << "Init analyzers...";
    _analyticsManager->init(io);
377

378
379
    LOG(info) << "Starting analyzers...";
    _analyticsManager->start();
380

381
382
	LOG(info) << "Sensors started!";

383
	if (globalSettings.daemonize) {
384
385
		//boost.log does not support forking officially.
		//however, just don't touch the sinks after daemonizing and it should work nonetheless
386
387
		LOG(info) << "Detaching...";

388
		cmdSink->flush();
389
390
391
392
		boost::log::core::get()->remove_sink(cmdSink);
		cmdSink.reset();

		//daemonize
393
		dcdbdaemon();
394
395

		LOG(info) << "Now detached";
396
397
	}

398
399
	LOG(info) << "Creating threads...";

400
401
402
	//dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
	keepAliveWork = boost::make_shared<boost::asio::io_service::work>(io);

Micha Mueller's avatar
Micha Mueller committed
403
	//Create pool of threads which handle the sensors
404
	for(size_t i = 0; i < globalSettings.threads; i++) {
405
406
407
		threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
	}

408
409
	boost::thread mqttThread(bind(&MQTTPusher::push, _mqttPusher));
	boost::thread restThread(bind(&HttpsServer::run, _httpsServer));
410

411
	LOG(info) << "Threads created!";
412
413

	LOG(info) << "Registering signal handlers...";
Alessio Netti's avatar
Alessio Netti committed
414
415
	signal(SIGINT, sigHandler);	//Handle Strg+C
	signal(SIGTERM, sigHandler);	//Handle termination
416
417
	LOG(info) << "Signal handlers registered!";

418
419
420
	LOG(info) << "Setup complete!";

	LOG(trace) << "Running...";
421

Micha Mueller's avatar
Micha Mueller committed
422
	//Run until Strg+C
423
424
	threads.join_all();

425
426
427
	//will only continue if interrupted by SIGINT and threads were stopped

	mqttThread.join();
428
	LOG(info) << "MQTTPusher stopped";
429

430
	restThread.join();
431
	LOG(info) << "REST API Server stopped";
432

Micha Mueller's avatar
Micha Mueller committed
433
	LOG(info) << "Exiting...Goodbye!";
434
435
	return 0;
}