dcdbpusher.cpp 11.7 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbpusher.cpp
3
// Author      : Michael Ott (original), Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Copyright   : Leibniz Supercomputing Centre
// Description : Main functions for the DCDB MQTT Pusher
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include <dcdbdaemon.h>
#include <functional>
29
30
#include <string>

31
32
//Caution: include order matters! HttpsServer.h needs to be included first
#include "HttpsServer.h"
33
34
35
36
37
38
39
40
#include "Sensor.h"
#include "Configuration.h"
#include "MQTTPusher.h"

#include <boost/foreach.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>

41
42
43
44
45
46
47
#include <boost/log/trivial.hpp>
#include <boost/log/expressions.hpp>
#include <boost/log/utility/setup/file.hpp>
#include <boost/log/utility/setup/console.hpp>
#include <boost/log/utility/setup/common_attributes.hpp>
#include <boost/log/support/date_time.hpp>

48
49
50
using namespace std;

volatile int keepRunning;
51
pluginVector_t plugins;
52
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
53

Micha Mueller's avatar
Micha Mueller committed
54
void sigintHandler(int sig) {
55
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
56
	LOG(fatal) << "Received SIGINT";
57
58
	//Stop all sensors
	LOG(info) << "Stopping sensors...";
59
60
61
62
63
	for(auto& p : plugins) {
		LOG(info) << "Stop \"" << p.id << "\" plugin";
		for(auto s : p.configurator->getSensors()) {
			s->stopPolling();
		}
64
	}
65
66
	//Stop io service by killing keepAliveWork
	keepAliveWork.reset();
67
68
}

Micha Mueller's avatar
Micha Mueller committed
69
70
71
72
73
74
75
void sigtermHandler(int sig) {
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
	LOG(fatal) << "Received SIGTERM. Terminating";
	signal(SIGTERM, SIG_DFL);
	kill(getpid(), SIGTERM);
}

76
77
78
79
80
81
82
void printSyntax()
{
/*
                       1         2         3         4         5         6         7         8
             012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
	cout << "Usage:" << endl;
83
	cout << "  dcdbpusher [-d] [-b<host>] [-p<port>] [-m<string>] [-t<number>] <path/to/configfiles/>" << endl;
84
85
86
87
	cout << "  dcdbpusher -h" << endl;
	cout << endl;

	cout << "Options:" << endl;
88
89
90
91
	cout << "  -b<host>        MQTT broker" << endl;
	cout << "  -p<port>        MQTT broker port" << endl;
	cout << "  -m<string>      MQTT topic prefix" << endl;
	cout << "  -t<number>      Thread count" << endl;
92
	cout << "  -w<path>        Writable directory for temporal files" << endl;
93
94
	cout << endl;
	cout << "  -d              Daemonize" << endl;
95
	cout << "  -v              Set verbosity of output." << endl
96
97
		 <<	"                  Can be a number between 5 (all output) and 0 (only fatal messages)." << endl
		 <<	"                  Default level is 3 (info)" << endl;
98
99
100
101
	cout << "  -h              This help page" << endl;
	cout << endl;
}

102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
 * Inverts the given severity level.
 * trace <=> fatal
 * debug <=> error
 * info <=> warning
 */
boost::log::trivial::severity_level invertLogLevel(boost::log::trivial::severity_level logLevel) {
	switch (logLevel) {
		case boost::log::trivial::trace:
			return boost::log::trivial::fatal;
			break;
		case boost::log::trivial::debug:
			return boost::log::trivial::error;
			break;
		case boost::log::trivial::info:
			return boost::log::trivial::warning;
			break;
		case boost::log::trivial::warning:
			return boost::log::trivial::info;
			break;
		case boost::log::trivial::error:
			return boost::log::trivial::debug;
			break;
		case boost::log::trivial::fatal:
			return boost::log::trivial::trace;
			break;
		default:
			return boost::log::trivial::info;
			break;
	}
}

134
135
int main(int argc, char** argv) {
	if (argc <= 1) {
136
137
		cout << "Please specify a path to the config-directory" << endl << endl;
		printSyntax();
138
139
140
		return 1;
	}

141
	//define allowed command-line options once
142
	const char opts[] = "b:p:m:t:v:w:dh";
143

144
145
146
147
148
149
150
151
152
153
154
155
156
157
	//check if help flag specified
	char c;
	while ((c = getopt(argc, argv, opts)) != -1) {
		switch (c)
		{
			case 'h':
				printSyntax();
				return 1;
				break;
			default:
				//do nothing (other options are read later on)
				break;
		}
	}
158

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
	//init LOGGING
	boost::log::add_common_attributes();

	//set up logger to command line
	auto cmdSink = boost::log::add_console_log(
		std::cout,
		boost::log::keywords::format = (boost::log::expressions::stream //only print timestamp (without date), severity and message to terminal
				<< "[" << boost::log::expressions::format_date_time< boost::posix_time::ptime >("TimeStamp", "%H:%M:%S") << "]"
				<< " <" << boost::log::trivial::severity << ">"
				<< ": " << boost::log::expressions::smessage
		),
		boost::log::keywords::filter = (boost::log::trivial::severity >= boost::log::trivial::info)
	);

	//get logger instance
	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
175
	//finished logging startup for the moment (file log added later)
176

177
178
179
180
181
	boost::asio::io_service io;
	boost::thread_group threads;
	Configuration cfg(argv[argc-1]);
	global_t globalSettings;
	keepRunning = 1;
Micha Mueller's avatar
Micha Mueller committed
182
183
	signal(SIGINT, sigintHandler);	//Handle Strg+C
	signal(SIGTERM, sigtermHandler);	//Handle termination
184

185
186
	//Read global variables from config file
	if(!cfg.readGlobal()) {
187
		LOG(fatal) << "Failed to read global configuration!";
188
189
190
		return 1;
	}
	globalSettings = cfg.getGlobal();
191

192
193
194
195
	//reset getopt()
	optind = 1;
	//read in options (overwrite global.conf settings if necessary)
	while ((c = getopt(argc, argv, opts)) != -1) {
196
197
		switch (c)
		{
198
			case 'b':
199
				globalSettings.brokerHost = optarg;
200
201
				break;
			case 'p':
202
				globalSettings.brokerPort = atoi(optarg);
203
204
				break;
			case 'm':
205
				globalSettings.mqttPrefix = optarg;
206
207
				break;
			case 't':
208
				globalSettings.threads = stoul(optarg);
209
				break;
210
211
212
			case 'v':
				globalSettings.logLevelCmd = static_cast<boost::log::trivial::severity_level>(stoi(optarg));
				break;
213
			case 'd':
214
215
				globalSettings.daemonize = 1;
				break;
216
217
218
219
220
221
			case 'w':
				globalSettings.tempdir = optarg;
				if (globalSettings.tempdir[globalSettings.tempdir.length()-1] != '/') {
					globalSettings.tempdir.append("/");
				}
				break;
222
223
224
225
226
			case 'h':
				printSyntax();
				return 1;
				break;
			default:
227
228
				if (c != '?') cerr << "Unknown parameter: " << c << endl;
				return 1;
229
230
		}
	}
231
232
	//propagate overwritten values back to cfg
	cfg.setGlobal(globalSettings);
233

234
	//we now should know where the writable tempdir is
235
236
	//set up logger to file
	auto fileSink = boost::log::add_file_log(
237
238
239
240
		boost::log::keywords::file_name = globalSettings.tempdir + "dcdb_%N.log",	//	number logfiles ascending
		boost::log::keywords::rotation_size = 10 * 1024 * 1024,	//rotate logfile every 10 MiB
		//boost::log::keywords::time_based_rotation = boost::log::sinks::file::rotation_at_time_point(0, 0, 0),	//Throws bad year-exception for no obvious reason
		boost::log::keywords::format = //	format:		LineID [Timestamp] ThreadID <severity>: Message
241
242
243
244
245
246
247
248
249
250
251
		(	boost::log::expressions::stream
				<< boost::log::expressions::attr< unsigned int >("LineID")
				<< " [" << boost::log::expressions::format_date_time< boost::posix_time::ptime >("TimeStamp", "%Y-%m-%d, %H:%M:%S") << "]"
				<< " " << boost::log::expressions::attr<boost::log::attributes::current_thread_id::value_type >("ThreadID")
				<< " <" << boost::log::trivial::severity << ">"
				<< ": " << boost::log::expressions::smessage
		),
		boost::log::keywords::filter = (boost::log::trivial::severity >= boost::log::trivial::trace),
		boost::log::keywords::auto_flush = true
	);

252
253
254
255
	//invert log level, because 0 casts to trace (and 5 to fatal) but we want the meaning to be the other way round (0 equals fatal, 5 trace)
	globalSettings.logLevelCmd = invertLogLevel(globalSettings.logLevelCmd);
	globalSettings.logLevelFile = invertLogLevel(globalSettings.logLevelFile);

256
257
258
	//severity level may be overwritten (per option or config-file) --> set it according to globalSettings
	fileSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelFile);
	cmdSink->set_filter(boost::log::trivial::severity >= globalSettings.logLevelCmd);
259

260
261
	LOG(info) << "Logging setup complete";

262
263
	//Read in rest of configuration. Also creates all sensors
	if(!cfg.read()) {
264
		LOG(fatal) << "Failed to read configuration!";
265
266
		return 1;
	}
267
	plugins = cfg.getPlugins();
268

269
	//give some feedback
270
	LOG(info) << "Global Settings:";
271
	LOG(info) << "  REST Server: " << globalSettings.restHost << ":" << globalSettings.restPort;
272
273
274
	LOG(info) << "  Broker:      " << globalSettings.brokerHost << ":" << globalSettings.brokerPort;
	LOG(info) << "  MQTT-prefix: " << globalSettings.mqttPrefix;
	LOG(info) << "  Threads:     " << globalSettings.threads;
275
	if(globalSettings.daemonize) {
276
		LOG(info) << "  Daemonize:   Enabled";
277
	} else {
278
		LOG(info) << "  Daemonize:   Disabled";
279
	}
280
	LOG(info) << "  Write-Dir:   " << globalSettings.tempdir;
281
	LOG(info) << "  CacheInterval: " << globalSettings.cacheInterval / 1000 << " [s]";
282

Micha Mueller's avatar
Micha Mueller committed
283
	//Init all sensors
284
	LOG(info) << "Init sensors...";
285
286
287
288
289
290
	for(auto& p : plugins) {
		LOG(info) << "Init \"" << p.id << "\" plugin";
		for(auto s : p.configurator->getSensors()) {
			LOG(debug) << " -" << s->getName();
			s->init(io);
		}
291
292
293
294
	}

	//Start all sensors
	LOG(info) << "Starting sensors...";
295
296
297
298
299
	for(auto& p : plugins) {
		LOG(info) << "Start \"" << p.id << "\" plugin";
		for(auto s : p.configurator->getSensors()) {
			s->startPolling();
		}
300
301
	}

302
303
	LOG(info) << "Sensors started!";

304
	if (globalSettings.daemonize) {
305
306
		//boost.log does not support forking officially.
		//however, just don't touch the sinks after daemonizing and it should work nonetheless
307
308
		LOG(info) << "Detaching...";

309
		cmdSink->flush();
310
311
312
313
		boost::log::core::get()->remove_sink(cmdSink);
		cmdSink.reset();

		//daemonize
314
		dcdbdaemon();
315
316

		LOG(info) << "Now detached";
317
318
	}

319
320
	LOG(info) << "Creating threads...";

321
322
323
	//dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
	keepAliveWork = boost::make_shared<boost::asio::io_service::work>(io);

Micha Mueller's avatar
Micha Mueller committed
324
	//Create pool of threads which handle the sensors
325
	for(size_t i = 0; i < globalSettings.threads; i++) {
326
327
328
		threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
	}

329
	//MQTTPusher and Https server get their own threads
330
	MQTTPusher mqttPusher(globalSettings.brokerPort, globalSettings.brokerHost, globalSettings.mqttPrefix, plugins);
331
	HttpsServer httpsServer(globalSettings.restHost, globalSettings.restPort, plugins);
332

333
	boost::thread mqttThread(bind(&MQTTPusher::push, &mqttPusher));
334
	boost::thread restThread(bind(&HttpsServer::run, &httpsServer));
335

336
337
338
339
	LOG(info) << "Threads created!";
	LOG(info) << "Setup complete!";

	LOG(trace) << "Running...";
340

Micha Mueller's avatar
Micha Mueller committed
341
	//Run until Strg+C
342
343
	threads.join_all();

344
345
346
347
348
349
350
	//will only continue if interrupted by SIGINT and threads were stopped

	//Stop MQTTPusher
	LOG(info) << "Flushing MQTT queues...";
	keepRunning = 0;
	mqttThread.join();

351
352
353
354
355
	//Stop https server
	LOG(info) << "Stopping REST API Server...";
	httpsServer.stop();
	restThread.join();

Micha Mueller's avatar
Micha Mueller committed
356
	LOG(info) << "Exiting...Goodbye!";
357
358
	return 0;
}