dcdbpusher.cpp 15.6 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbpusher.cpp
3
// Author      : Michael Ott, Micha Mueller
4
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main function for the DCDB MQTT Pusher.
6
7
8
9
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
10
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * @defgroup pusher Pusher
 *
 * Pusher is the DCDB building block responsible for gathering data from
 * hard-/software and pushing the values to the collect agent. The data
 * collection capabilities of pusher stem from its plugins.
 */

/**
 * @file dcdbpusher.cpp
 *
 * @brief Main function for the DCDB MQTT Pusher.
 *
 * @ingroup pusher
 */

43
44
#include <dcdbdaemon.h>
#include <functional>
45
#include <string>
46
#include <vector>
47

48
49
#include "Configuration.h"
#include "MQTTPusher.h"
50
#include "PluginManager.h"
51
#include "RestAPI.h"
52
#include "version.h"
53
54
55
56
57

#include <boost/foreach.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>

58
59
60
61
62
63
64
#include <boost/log/trivial.hpp>
#include <boost/log/expressions.hpp>
#include <boost/log/utility/setup/file.hpp>
#include <boost/log/utility/setup/console.hpp>
#include <boost/log/utility/setup/common_attributes.hpp>
#include <boost/log/support/date_time.hpp>

65
66
using namespace std;

Alessio Netti's avatar
Alessio Netti committed
67
68
Configuration*					_configuration;
MQTTPusher*						_mqttPusher;
69
PluginManager*                  _pluginManager;
70
RestAPI*	    				_httpsServer;
Alessio Netti's avatar
Alessio Netti committed
71
AnalyticsManager* 				_analyticsManager;
72
std::map<std::string, SBasePtr> _sensorMap;
Alessio Netti's avatar
Alessio Netti committed
73
QueryEngine&					_queryEngine = QueryEngine::getInstance();
74

75
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
76

77
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
Alessio Netti's avatar
Alessio Netti committed
78
79
80
	//Initializing the sensor map if necessary. Thread safe!
	if(_queryEngine.updated.load()) {
		if(!_queryEngine.updating.exchange(true)) {
Alessio Netti's avatar
Alessio Netti committed
81
			_sensorMap.clear();
82
			// Adding ordinary sensors to the map
83
			for (auto &p : _pluginManager->getPlugins())
Alessio Netti's avatar
Alessio Netti committed
84
85
				for (auto &g : p.configurator->getSensorGroups())
					for (auto &s : g->getSensors())
86
87
					    _sensorMap.insert(std::make_pair(s->getName(), s));
            // Adding data analytics sensors to the map
Alessio Netti's avatar
Alessio Netti committed
88
            for(auto& p : _analyticsManager->getPlugins()) {
89
90
                for(const auto& a : p.configurator->getAnalyzers())
                    if (a->getStreaming())
Alessio Netti's avatar
Alessio Netti committed
91
92
93
94
						for (const auto &u : a->getUnits())
							for (const auto &o: u->getBaseOutputs())
								_sensorMap.insert(std::make_pair(o->getName(), o));
			}
Alessio Netti's avatar
Alessio Netti committed
95
96
97
98
99
100
			_queryEngine.updated.store(false);
			_queryEngine.updating.store(false);
		} else {
			// Spinning while the sensormap is being built
			while( _queryEngine.updating.load() ) {}
		}
101
102
103
	}
	if(_sensorMap.count(name) > 0) {
		SBasePtr sensor = _sensorMap[name];
Alessio Netti's avatar
Alessio Netti committed
104
		if(!sensor->isInit())
105
			return NULL;
Alessio Netti's avatar
Alessio Netti committed
106
		else
107
			return sensor->getCache()->getView(startTs, endTs, buffer, rel);
108
	}
109
	return NULL;
110
111
}

Alessio Netti's avatar
Alessio Netti committed
112
void sigHandler(int sig) {
113
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
114
115
116
117
	if( sig == SIGINT )
		LOG(fatal) << "Received SIGINT";
	else if( sig == SIGTERM )
		LOG(fatal) << "Received SIGTERM";
118

119
120
	//Stop all sensors
	LOG(info) << "Stopping sensors...";
121
122
	_pluginManager->stopPlugin();

123
124
125
	//Stop data analytics plugins and analyzers
	_analyticsManager->stop();

126
127
	//Stop io service by killing keepAliveWork
	keepAliveWork.reset();
128
129
130
131
132
133
134
135

	//Stop MQTTPusher
	LOG(info) << "Flushing MQTT queues...";
	_mqttPusher->stop();

	//Stop https server
	LOG(info) << "Stopping REST API Server...";
	_httpsServer->stop();
136
137
138
139
140
141
142
143
}

void printSyntax()
{
/*
                       1         2         3         4         5         6         7         8
             012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
Alessio Netti's avatar
Alessio Netti committed
144
	_configuration = new Configuration("", "dcdbpusher.conf");
145
	cout << "Usage:" << endl;
Alessio Netti's avatar
Alessio Netti committed
146
	cout << "  dcdbpusher [-d] [-x] [-a<string>] [-b<host>] [-m<string>] [-w<path>] [-v<level>] <path/to/configfiles/>" << endl;
147
148
149
150
	cout << "  dcdbpusher -h" << endl;
	cout << endl;

	cout << "Options:" << endl;
151
	cout << "  -a <string>     Auto-publish pattern         [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
152
	cout << "  -b <host>       MQTT broker                  [default: " << _configuration->brokerHost << ":" << _configuration->brokerPort << "]" << endl;
153
154
	cout << "  -m <string>     MQTT topic prefix            [default: none]" << endl;
	cout << "  -w <path>       Writable temp dir            [default: .]" << endl;
Alessio Netti's avatar
Alessio Netti committed
155
	cout << "  -v <level>      Set verbosity of output      [default: " << _configuration->logLevelCmd << "]" << endl
156
		 <<	"                  Can be a number between 5 (all) and 0 (fatal)." << endl;
157
158
	cout << endl;
	cout << "  -d              Daemonize" << endl;
159
	cout << "  -x              Parse and print the config but do not actually start dcdbpusher" << endl;
160
161
	cout << "  -h              This help page" << endl;
	cout << endl;
162
163
164

	delete _configuration;
	_configuration = nullptr;
165
166
167
}

int main(int argc, char** argv) {
168
	cout << "dcdbpusher " << VERSION << endl << endl;
169
170
171
	boost::asio::io_service io;
	boost::thread_group threads;

172
	if (argc <= 1) {
173
174
		cout << "Please specify a path to the config-directory" << endl << endl;
		printSyntax();
175
176
177
		return 1;
	}

178
	//define allowed command-line options once
Alessio Netti's avatar
Alessio Netti committed
179
	const char opts[] = "a:b:p:m:v:w:dxh";
180

181
182
183
184
185
186
187
188
189
190
191
192
193
	//check if help flag specified
	char c;
	while ((c = getopt(argc, argv, opts)) != -1) {
		switch (c)
		{
			case 'h':
				printSyntax();
				return 1;
			default:
				//do nothing (other options are read later on)
				break;
		}
	}
194

195
	//init LOGGING
Alessio Netti's avatar
Alessio Netti committed
196
	initLogging();
197
198

	//set up logger to command line
Alessio Netti's avatar
Alessio Netti committed
199
	auto cmdSink = setupCmdLogger();
200
201
202

	//get logger instance
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
203
	//finished logging startup for the moment (file log added later)
204

Alessio Netti's avatar
Alessio Netti committed
205
	_configuration = new Configuration(argv[argc-1], "dcdbpusher.conf");
206

207
	//Read global variables from config file
Alessio Netti's avatar
Alessio Netti committed
208
	if(!_configuration->readConfig()) {
209
		LOG(fatal) << "Failed to read global configuration!";
210
211
		return 1;
	}
Alessio Netti's avatar
Alessio Netti committed
212
	
213
214
	//plugin and restAPI settings are actually part of globalSettings
	//use the references as shortcut, so that we do not have to prefix with 'globalSettings.' all the time
Alessio Netti's avatar
Alessio Netti committed
215
216
    Configuration& globalSettings = *_configuration;
	pluginSettings_t& pluginSettings = _configuration->pluginSettings;
217
	serverSettings_t& restAPISettings = _configuration->restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
218
	analyticsSettings_t& analyticsSettings = _configuration->analyticsSettings;
219

220
221
	//reset getopt()
	optind = 1;
Alessio Netti's avatar
Alessio Netti committed
222
	//read in options (overwrite dcdbpusher.conf settings if necessary)
223
	while ((c = getopt(argc, argv, opts)) != -1) {
224
225
		switch (c)
		{
226
227
228
			case 'a':
				pluginSettings.sensorPattern = optarg;
				break;
229
			case 'b':
Alessio Netti's avatar
Alessio Netti committed
230
231
                globalSettings.brokerHost = parseNetworkHost(optarg);
                globalSettings.brokerPort = parseNetworkPort(optarg)=="" ? BROKERPORT : stoi(parseNetworkPort(optarg));
232
233
				break;
			case 'm':
234
				pluginSettings.mqttPrefix = optarg;
235
				break;
236
			case 'v':
237
				globalSettings.logLevelCmd = stoi(optarg);
238
				break;
239
			case 'd':
240
241
				globalSettings.daemonize = 1;
				break;
242
243
244
			case 'x':
			    globalSettings.validateConfig = true;
			    break;
245
			case 'w':
246
247
248
				pluginSettings.tempdir = optarg;
				if (pluginSettings.tempdir[pluginSettings.tempdir.length()-1] != '/') {
					pluginSettings.tempdir.append("/");
249
250
				}
				break;
251
252
253
254
			case 'h':
				printSyntax();
				return 1;
			default:
255
256
				if (c != '?') cerr << "Unknown parameter: " << c << endl;
				return 1;
257
258
259
		}
	}

260
	//we now should know where the writable tempdir is
261
	//set up logger to file
262
263
264
265
266
	if (globalSettings.logLevelFile >= 0) {
		auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("dcdbpusher"));
		fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(globalSettings.logLevelFile));
	}
	
267
	//severity level may be overwritten (per option or config-file) --> set it according to globalSettings
268
269
270
	if (globalSettings.logLevelCmd >= 0) {
		cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(globalSettings.logLevelCmd));
	}
271

272
273
	LOG(info) << "Logging setup complete";

274
	_pluginManager = new PluginManager(pluginSettings);
275
	//Read in rest of configuration. Also creates all sensors
276
	if(!_configuration->readPlugins(*_pluginManager)) {
277
		LOG(fatal) << "Failed to read configuration!";
278
279
280
		return 1;
	}

281
282
	_analyticsManager = new AnalyticsManager();
	// Preparing the SensorNavigator
283
284
285
	if(_analyticsManager->probe(argv[argc-1], "dcdbpusher.conf")) {
		std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
		vector <std::string> names, topics;
286
		for (const auto &p : _pluginManager->getPlugins())
287
288
289
290
291
292
			for (const auto &g : p.configurator->getSensorGroups())
				for (const auto &s : g->getSensors()) {
					names.push_back(s->getName());
					topics.push_back(s->getMqtt());
				}
		try {
293
			navigator->setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
294
			navigator->buildTree(analyticsSettings.hierarchy, &names, &topics);
295
296
297
298
299
			LOG(info) << "Built a sensor hierarchy tree of size " << navigator->getTreeSize() << " and depth " << navigator->getTreeDepth() << ".";
			_queryEngine.setNavigator(navigator);
			_queryEngine.triggerUpdate();
		} catch (const std::invalid_argument &e) {
			LOG(error) << e.what();
300
			LOG(error) << "Failed to build sensor hierarchy tree!";
301
302
303
			return 1;
		}
	}
304
305
306
307
308
309
	
    _queryEngine.setQueryCallback(sensorQueryCallback);
    if(!_analyticsManager->load(argv[argc-1], "dcdbpusher.conf", pluginSettings)) {
        LOG(fatal) << "Failed to load data analytics manager!";
        return 1;
    }
310

311
312
313
314
315
316
    //print configuration to give some feedback
	//config of plugins is only printed if the config shall be validated or to debug level otherwise
    LOG_LEVEL vLogLevel = LOG_LEVEL::debug;
    if (globalSettings.validateConfig) {
      vLogLevel = boost::log::trivial::info;
    }
317
    LOG_VAR(vLogLevel) << "-----  Configuration  -----";
318
319

	//print global settings in either case
320
	LOG(info) << "Global Settings:";
321
322
323
324
325
326
327
328
329
330
	LOG(info) << "    Broker:             " << globalSettings.brokerHost << ":" << globalSettings.brokerPort;
	LOG(info) << "    Threads:            " << globalSettings.threads;
	LOG(info) << "    Daemonize:          " << (globalSettings.daemonize ? "Enabled" : "Disabled");
	LOG(info) << "    MaxMsgNum:          " << globalSettings.maxMsgNum;
	LOG(info) << "    MaxInflightMsgNum:  " << globalSettings.maxInflightMsgNum;
	LOG(info) << "    MaxQueuedMsgNum:    " << globalSettings.maxQueuedMsgNum;
	LOG(info) << "    MQTT-QoS:           " << globalSettings.qosLevel;
	LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
	LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
	LOG(info) << "    CacheInterval:      " << pluginSettings.cacheInterval / 1000 << " [s]";
331
	if(globalSettings.validateConfig) {
332
        LOG(info) << "    Only validating config files.";
333
    } else {
334
        LOG(info) << "    ValidateConfig:     Disabled";
335
    }
Alessio Netti's avatar
Alessio Netti committed
336
337
	LOG(info) << "Analytics Settings:";
	LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
338
    LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
339
	LOG(info) << "RestAPI Settings:";
340
	LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
341
#ifdef DEBUG
342
343
344
	LOG(info) << "    Certificate: " << restAPISettings.certificate;
	LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	LOG(info) << "    DH params from: " << restAPISettings.dhFile;
345
#else
346
	LOG(info) << "    Certificate, private key and DH-param file not printed.";
347
#endif
348

349
	LOG_VAR(vLogLevel) << "-----  Sampling Configuration  -----";
350
	for(auto& p : _pluginManager->getPlugins()) {
351
        LOG_VAR(vLogLevel) << "Sampling Plugin \"" << p.id << "\"";
352
353
354
        p.configurator->printConfig(vLogLevel);
    }

355
356
357
358
359
360
	LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
	for(auto& p : _analyticsManager->getPlugins()) {
		LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
		p.configurator->printConfig(vLogLevel);
	}

361
362
	LOG_VAR(vLogLevel) << "-----  End Configuration  -----";
	
363
	//MQTTPusher and Https server get their own threads
364
	_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.sensorPattern, globalSettings.qosLevel,
365
366
								 _pluginManager->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
	_httpsServer = new RestAPI(restAPISettings, _pluginManager, _mqttPusher, _analyticsManager, io);
367
	_configuration->readRestAPIUsers(_httpsServer);
368
	
369
370
371
    if (globalSettings.validateConfig) {
      return 0;
    }
372

Micha Mueller's avatar
Micha Mueller committed
373
	//Init all sensors
374
	LOG(info) << "Init sensors...";
375
	_pluginManager->initPlugin(io);
376
377
378

	//Start all sensors
	LOG(info) << "Starting sensors...";
379
	_pluginManager->startPlugin();
380
381
382
	
    if(!_queryEngine.updated.is_lock_free())
        LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
383

384
385
    LOG(info) << "Init analyzers...";
    _analyticsManager->init(io);
386

387
388
    LOG(info) << "Starting analyzers...";
    _analyticsManager->start();
389

390
391
	LOG(info) << "Sensors started!";

392
	if (globalSettings.daemonize) {
393
394
		//boost.log does not support forking officially.
		//however, just don't touch the sinks after daemonizing and it should work nonetheless
395
396
		LOG(info) << "Detaching...";

397
		cmdSink->flush();
398
399
400
401
		boost::log::core::get()->remove_sink(cmdSink);
		cmdSink.reset();

		//daemonize
402
		dcdbdaemon();
403
404

		LOG(info) << "Now detached";
405
406
	}

407
408
	LOG(info) << "Creating threads...";

409
410
411
	//dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
	keepAliveWork = boost::make_shared<boost::asio::io_service::work>(io);

Micha Mueller's avatar
Micha Mueller committed
412
	//Create pool of threads which handle the sensors
413
	for(size_t i = 0; i < globalSettings.threads; i++) {
414
415
416
		threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
	}

417
	boost::thread mqttThread(bind(&MQTTPusher::push, _mqttPusher));
418

419
	LOG(info) << "Threads created!";
420

421
422
423
	LOG(info) << "Starting RestAPI Https Server...";
	_httpsServer->start();

424
	LOG(info) << "Registering signal handlers...";
Alessio Netti's avatar
Alessio Netti committed
425
426
	signal(SIGINT, sigHandler);	//Handle Strg+C
	signal(SIGTERM, sigHandler);	//Handle termination
427
428
	LOG(info) << "Signal handlers registered!";

429
430
431
	LOG(info) << "Cleaning up...";
	delete _configuration;

432
433
434
	LOG(info) << "Setup complete!";

	LOG(trace) << "Running...";
435

Micha Mueller's avatar
Micha Mueller committed
436
	//Run until Strg+C
437
438
	threads.join_all();

439
440
441
	//will only continue if interrupted by SIGINT and threads were stopped

	mqttThread.join();
442
	LOG(info) << "MQTTPusher stopped";
443

444
	LOG(info) << "Tearing down objects...";
445
	_sensorMap.clear();
446
447
448
449
450
	delete _httpsServer;
	delete _mqttPusher;
	delete _analyticsManager;
	delete _pluginManager;

Micha Mueller's avatar
Micha Mueller committed
451
	LOG(info) << "Exiting...Goodbye!";
452
453
	return 0;
}