Commit 6bf413cc authored by Micha Mueller's avatar Micha Mueller
Browse files

Preparatory refactoring to enable re-loading of config files in the future

parent ae7a9da7
......@@ -242,7 +242,7 @@ bool Configuration::read() {
return true;
}
bool Configuration::readAuthkeys(HttpsServer& server) {
bool Configuration::readAuthkeys(HttpsServer* server) {
//open file
std::string globalConfig = _cfgFilePath;
globalConfig.append("global.conf");
......@@ -280,7 +280,7 @@ bool Configuration::readAuthkeys(HttpsServer& server) {
#endif
}
}
if (!server.addAuthkey(authkeyMap_t::value_type(global.second.data(), permissions))) {
if (!server->addAuthkey(authkeyMap_t::value_type(global.second.data(), permissions))) {
#ifdef DEBUG
LOG(warning) << "Authkey already present!";
#endif
......@@ -311,11 +311,7 @@ bool Configuration::checkMqtt(const std::string& mqtt) {
return true;
}
void Configuration::setGlobal(const global_t& global) {
_global = global;
}
global_t Configuration::getGlobal() const {
global_t& Configuration::getGlobal() {
return _global;
}
......
......@@ -67,7 +67,7 @@ public:
* @param server The Rest API server where to add the authkeys
* @return true on success, false otherwise
*/
bool readAuthkeys(HttpsServer& server);
bool readAuthkeys(HttpsServer* server);
/**
* Read and set general sensor values (like interval, minvalues, ...).
......@@ -85,8 +85,11 @@ public:
*/
bool checkMqtt(const std::string& mqtt);
void setGlobal(const global_t& global);
global_t getGlobal() const;
/*
* Return global settings
*/
global_t& getGlobal();
/*
* Return all plugins (in form of their dl_t struct)
......
......@@ -12,12 +12,13 @@
#define LOGM(sev) LOG(sev) << "Mosquitto: "
extern volatile int keepRunning;
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
const std::string& mqttPrefix, pluginVector_t& plugins) :
_brokerPort(brokerPort), _brokerHost(brokerHost),
_plugins(plugins),_connected(false) {
_brokerPort(brokerPort),
_brokerHost(brokerHost),
_plugins(plugins),
_connected(false),
_keepRunning(true) {
//first print some info
int mosqMajor, mosqMinor, mosqRevision;
......@@ -51,7 +52,7 @@ MQTTPusher::~MQTTPusher() {
void MQTTPusher::push() {
//connect to broker (if necessary)
while (keepRunning && !_connected) {
while (_keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
sleep(1);
......@@ -63,7 +64,7 @@ void MQTTPusher::push() {
if (mosquitto_loop_start(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(fatal) << "Setup failed";
keepRunning = 0;
_keepRunning = false;
mosquitto_disconnect(_mosq);
return;
}
......@@ -71,7 +72,7 @@ void MQTTPusher::push() {
//collect sensor-data
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
while (_keepRunning || totalCount) {
totalCount = 0;
for(auto& p : _plugins) {
for(auto s : p.configurator->getSensors()) {
......
......@@ -22,12 +22,21 @@ public:
void push();
void start() {
_keepRunning = true;
}
void stop() {
_keepRunning = false;
}
private:
int _brokerPort;
std::string _brokerHost;
pluginVector_t& _plugins;
struct mosquitto* _mosq;
bool _connected;
bool _keepRunning;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
......
......@@ -47,8 +47,10 @@
using namespace std;
volatile int keepRunning;
pluginVector_t plugins;
Configuration* _configuration;
MQTTPusher* _mqttPusher;
HttpsServer* _httpsServer;
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
void sigintHandler(int sig) {
......@@ -56,7 +58,7 @@ void sigintHandler(int sig) {
LOG(fatal) << "Received SIGINT";
//Stop all sensors
LOG(info) << "Stopping sensors...";
for(auto& p : plugins) {
for(auto& p : _configuration->getPlugins()) {
LOG(info) << "Stop \"" << p.id << "\" plugin";
for(auto s : p.configurator->getSensors()) {
s->stopPolling();
......@@ -64,6 +66,14 @@ void sigintHandler(int sig) {
}
//Stop io service by killing keepAliveWork
keepAliveWork.reset();
//Stop MQTTPusher
LOG(info) << "Flushing MQTT queues...";
_mqttPusher->stop();
//Stop https server
LOG(info) << "Stopping REST API Server...";
_httpsServer->stop();
}
void sigtermHandler(int sig) {
......@@ -132,6 +142,9 @@ boost::log::trivial::severity_level invertLogLevel(boost::log::trivial::severity
}
int main(int argc, char** argv) {
boost::asio::io_service io;
boost::thread_group threads;
if (argc <= 1) {
cout << "Please specify a path to the config-directory" << endl << endl;
printSyntax();
......@@ -174,20 +187,18 @@ int main(int argc, char** argv) {
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
//finished logging startup for the moment (file log added later)
boost::asio::io_service io;
boost::thread_group threads;
Configuration cfg(argv[argc-1]);
global_t globalSettings;
keepRunning = 1;
signal(SIGINT, sigintHandler); //Handle Strg+C
signal(SIGTERM, sigtermHandler); //Handle termination
_configuration = new Configuration(argv[argc-1]);
//Read global variables from config file
if(!cfg.readGlobal()) {
if(!_configuration->readGlobal()) {
LOG(fatal) << "Failed to read global configuration!";
return 1;
}
globalSettings = cfg.getGlobal();
global_t& globalSettings = _configuration->getGlobal();
//plugin and restAPI settings are actually part of globalSettings
//use the references as shortcut, so that we do not have to prefix with 'globalSettings.' all the time
pluginSettings_t& pluginSettings = globalSettings.pluginSettings;
restAPISettings_t& restAPISettings = globalSettings.restAPISettings;
//reset getopt()
optind = 1;
......@@ -202,7 +213,7 @@ int main(int argc, char** argv) {
globalSettings.brokerPort = atoi(optarg);
break;
case 'm':
globalSettings.pluginSettings.mqttPrefix = optarg;
pluginSettings.mqttPrefix = optarg;
break;
case 't':
globalSettings.threads = stoul(optarg);
......@@ -214,9 +225,9 @@ int main(int argc, char** argv) {
globalSettings.daemonize = 1;
break;
case 'w':
globalSettings.pluginSettings.tempdir = optarg;
if (globalSettings.pluginSettings.tempdir[globalSettings.pluginSettings.tempdir.length()-1] != '/') {
globalSettings.pluginSettings.tempdir.append("/");
pluginSettings.tempdir = optarg;
if (pluginSettings.tempdir[pluginSettings.tempdir.length()-1] != '/') {
pluginSettings.tempdir.append("/");
}
break;
case 'h':
......@@ -228,13 +239,11 @@ int main(int argc, char** argv) {
return 1;
}
}
//propagate overwritten values back to cfg
cfg.setGlobal(globalSettings);
//we now should know where the writable tempdir is
//set up logger to file
auto fileSink = boost::log::add_file_log(
boost::log::keywords::file_name = globalSettings.pluginSettings.tempdir + "dcdb_%N.log", // number logfiles ascending
boost::log::keywords::file_name = pluginSettings.tempdir + "dcdb_%N.log", // number logfiles ascending
boost::log::keywords::rotation_size = 10 * 1024 * 1024, //rotate logfile every 10 MiB
//boost::log::keywords::time_based_rotation = boost::log::sinks::file::rotation_at_time_point(0, 0, 0), //Throws bad year-exception for no obvious reason
boost::log::keywords::format = // format: LineID [Timestamp] ThreadID <severity>: Message
......@@ -260,11 +269,10 @@ int main(int argc, char** argv) {
LOG(info) << "Logging setup complete";
//Read in rest of configuration. Also creates all sensors
if(!cfg.read()) {
if(!_configuration->read()) {
LOG(fatal) << "Failed to read configuration!";
return 1;
}
plugins = cfg.getPlugins();
//give some feedback
LOG(info) << "Global Settings:";
......@@ -275,23 +283,23 @@ int main(int argc, char** argv) {
} else {
LOG(info) << " Daemonize: Disabled";
}
LOG(info) << " MQTT-prefix: " << globalSettings.pluginSettings.mqttPrefix;
LOG(info) << " Write-Dir: " << globalSettings.pluginSettings.tempdir;
LOG(info) << " CacheInterval: " << globalSettings.pluginSettings.cacheInterval / 1000 << " [s]";
LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix;
LOG(info) << " Write-Dir: " << pluginSettings.tempdir;
LOG(info) << " CacheInterval: " << pluginSettings.cacheInterval / 1000 << " [s]";
LOG(info) << "RestAPI Settings:";
LOG(info) << " REST Server: " << globalSettings.restAPISettings.restHost << ":" << globalSettings.restAPISettings.restPort;
LOG(info) << " REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
#ifdef DEBUG
LOG(info) << " Certificate: " << globalSettings.restAPISettings.certificate;
LOG(info) << " Private key file: " << globalSettings.restAPISettings.privateKey;
LOG(info) << " DH params from: " << globalSettings.restAPISettings.dhFile;
LOG(info) << " Certificate: " << restAPISettings.certificate;
LOG(info) << " Private key file: " << restAPISettings.privateKey;
LOG(info) << " DH params from: " << restAPISettings.dhFile;
#else
LOG(info) << " Certificate, private key and DH-param file not printed.";
#endif
//Init all sensors
LOG(info) << "Init sensors...";
for(auto& p : plugins) {
for(auto& p : _configuration->getPlugins()) {
LOG(info) << "Init \"" << p.id << "\" plugin";
for(auto s : p.configurator->getSensors()) {
LOG(debug) << " -" << s->getName();
......@@ -301,7 +309,7 @@ int main(int argc, char** argv) {
//Start all sensors
LOG(info) << "Starting sensors...";
for(auto& p : plugins) {
for(auto& p : _configuration->getPlugins()) {
LOG(info) << "Start \"" << p.id << "\" plugin";
for(auto s : p.configurator->getSensors()) {
s->startPolling();
......@@ -336,14 +344,20 @@ int main(int argc, char** argv) {
}
//MQTTPusher and Https server get their own threads
MQTTPusher mqttPusher(globalSettings.brokerPort, globalSettings.brokerHost, globalSettings.pluginSettings.mqttPrefix, plugins);
HttpsServer httpsServer(globalSettings.restAPISettings, plugins);
cfg.readAuthkeys(httpsServer);
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.mqttPrefix, _configuration->getPlugins());
_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins());
_configuration->readAuthkeys(_httpsServer);
boost::thread mqttThread(bind(&MQTTPusher::push, &mqttPusher));
boost::thread restThread(bind(&HttpsServer::run, &httpsServer));
boost::thread mqttThread(bind(&MQTTPusher::push, _mqttPusher));
boost::thread restThread(bind(&HttpsServer::run, _httpsServer));
LOG(info) << "Threads created!";
LOG(info) << "Registering signal handlers...";
signal(SIGINT, sigintHandler); //Handle Strg+C
signal(SIGTERM, sigtermHandler); //Handle termination
LOG(info) << "Signal handlers registered!";
LOG(info) << "Setup complete!";
LOG(trace) << "Running...";
......@@ -353,15 +367,11 @@ int main(int argc, char** argv) {
//will only continue if interrupted by SIGINT and threads were stopped
//Stop MQTTPusher
LOG(info) << "Flushing MQTT queues...";
keepRunning = 0;
mqttThread.join();
LOG(info) << "MQTTPusher stopped";
//Stop https server
LOG(info) << "Stopping REST API Server...";
httpsServer.stop();
restThread.join();
LOG(info) << "REST API Server stopped";
LOG(info) << "Exiting...Goodbye!";
return 0;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment