Commit 509656b2 authored by Michael Ott's avatar Michael Ott
Browse files

Reformatted dcdbpusher code with clang-format to establish common coding style

parent 2d8dc277
BasedOnStyle: LLVM
IndentWidth: 8
UseTab: ForContinuationAndIndentation
BreakBeforeBraces: Attach
AlignConsecutiveDeclarations: true
AllowShortBlocksOnASingleLine: true
AllowShortFunctionsOnASingleLine: Inline
ColumnLimit: 0
IndentCaseLabels: true
......@@ -35,10 +35,10 @@ bool Configuration::readAdditionalValues(boost::property_tree::iptree::value_typ
// ----- READING ADDITIONAL GLOBAL SETTINGS -----
if (boost::iequals(global.first, "mqttBroker")) {
brokerHost = parseNetworkHost(global.second.data());
brokerPort = parseNetworkPort(global.second.data())=="" ? BROKERPORT : stoi(parseNetworkPort(global.second.data()));
brokerPort = parseNetworkPort(global.second.data()) == "" ? BROKERPORT : stoi(parseNetworkPort(global.second.data()));
} else if (boost::iequals(global.first, "qosLevel")) {
qosLevel = stoi(global.second.data());
if(qosLevel>2 || qosLevel<0)
if (qosLevel > 2 || qosLevel < 0)
qosLevel = 1;
} else if (boost::iequals(global.first, "maxInflightMsgNum")) {
maxInflightMsgNum = stoull(global.second.data());
......@@ -52,32 +52,32 @@ bool Configuration::readAdditionalValues(boost::property_tree::iptree::value_typ
return true;
}
bool Configuration::readPlugins(PluginManager& pluginManager) {
bool Configuration::readPlugins(PluginManager &pluginManager) {
std::string globalConfig = _cfgFilePath;
globalConfig.append(_cfgFileName);
boost::property_tree::iptree cfg;
try {
boost::property_tree::read_info(globalConfig, cfg);
} catch (boost::property_tree::info_parser_error& e) {
} catch (boost::property_tree::info_parser_error &e) {
LOG(error) << "Error when reading plugins from " << _cfgFileName << ": " << e.what();
return false;
}
pluginManager.setCfgFilePath(_cfgFilePath);
//read plugins
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("plugins")) {
BOOST_FOREACH (boost::property_tree::iptree::value_type &plugin, cfg.get_child("plugins")) {
if (boost::iequals(plugin.first, "plugin")) {
if (!plugin.second.data().empty()) {
std::string pluginName = plugin.second.data();
std::string pluginConfig = ""; // path to config file for plugin
std::string pluginPath = ""; // path to plugin
std::string pluginName = plugin.second.data();
std::string pluginConfig = ""; // path to config file for plugin
std::string pluginPath = ""; // path to plugin
LOG(info) << "Read plugin " << pluginName << "...";
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, plugin.second) {
BOOST_FOREACH (boost::property_tree::iptree::value_type &val, plugin.second) {
if (boost::iequals(val.first, "path")) {
pluginPath = val.second.data();
pluginPath = val.second.data();
} else if (boost::iequals(val.first, "config")) {
pluginConfig = val.second.data();
} else {
......@@ -85,10 +85,10 @@ bool Configuration::readPlugins(PluginManager& pluginManager) {
}
}
if(!pluginManager.loadPlugin(pluginName, pluginPath, pluginConfig)) {
LOG(error) << "Could not load plugin " << pluginName;
pluginManager.unloadPlugin();
return false;
if (!pluginManager.loadPlugin(pluginName, pluginPath, pluginConfig)) {
LOG(error) << "Could not load plugin " << pluginName;
pluginManager.unloadPlugin();
return false;
}
}
}
......
......@@ -43,17 +43,17 @@
* @ingroup pusher
*/
class Configuration : public GlobalConfiguration {
public:
public:
/**
* Create new Configuration. Sets global config file to read from to cfgFile.
*
* @param cfgFilePath Path to where all config-files are located
* @param cfgFileName Name of the file containing the config
*/
Configuration(const std::string& cfgFilePath, const std::string& cfgFileName) : GlobalConfiguration(cfgFilePath, cfgFileName) {}
Configuration(const std::string &cfgFilePath, const std::string &cfgFileName)
: GlobalConfiguration(cfgFilePath, cfgFileName) {}
virtual ~Configuration() {}
/**
......@@ -64,18 +64,17 @@ public:
*
* @return true on success, false otherwise
*/
bool readPlugins(PluginManager& pluginManager);
bool readPlugins(PluginManager &pluginManager);
// Additional configuration parameters to be parsed and stored in the global block
int qosLevel = 1;
int qosLevel = 1;
unsigned int maxInflightMsgNum = 20;
unsigned int maxQueuedMsgNum = 0;
int brokerPort = BROKERPORT;
std::string brokerHost = BROKERHOST;
int maxMsgNum = 0;
protected:
int brokerPort = BROKERPORT;
std::string brokerHost = BROKERHOST;
int maxMsgNum = 0;
protected:
bool readAdditionalValues(boost::property_tree::iptree::value_type &global) override;
};
......
......@@ -26,27 +26,27 @@
//================================================================================
#include "MQTTPusher.h"
#include "timestamp.h"
#include <iostream>
#include <string>
#include <unistd.h>
#include "timestamp.h"
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const bool autoPublish, int qosLevel,
pusherPluginStorage_t& plugins, op_pluginVector_t& oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum) :
_qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
_autoPublish(autoPublish),
_plugins(plugins),
_operatorPlugins(oPlugins),
_connected(false),
_keepRunning(true),
_msgCap(DISABLED),
_doHalt(false),
_halted(false),
_maxNumberOfMessages(maxNumberOfMessages),
_maxInflightMsgNum(maxInflightMsgNum),
_maxQueuedMsgNum(maxQueuedMsgNum) {
MQTTPusher::MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel,
pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum)
: _qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
_autoPublish(autoPublish),
_plugins(plugins),
_operatorPlugins(oPlugins),
_connected(false),
_keepRunning(true),
_msgCap(DISABLED),
_doHalt(false),
_halted(false),
_maxNumberOfMessages(maxNumberOfMessages),
_maxInflightMsgNum(maxInflightMsgNum),
_maxQueuedMsgNum(maxQueuedMsgNum) {
//first print some info
int mosqMajor, mosqMinor, mosqRevision;
......@@ -75,7 +75,7 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const bool
}
MQTTPusher::~MQTTPusher() {
if(_connected) {
if (_connected) {
mosquitto_disconnect(_mosq);
}
mosquitto_destroy(_mosq);
......@@ -83,8 +83,8 @@ MQTTPusher::~MQTTPusher() {
}
void MQTTPusher::push() {
int mosqErr;
uint64_t idleTime = 0;
int mosqErr;
uint64_t idleTime = 0;
//connect to broker (if necessary)
while (_keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
......@@ -98,10 +98,10 @@ void MQTTPusher::push() {
//Performing auto-publish if necessary
sendMappings();
computeMsgRate();
//collect sensor-data
reading_t* reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
reading_t * reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
std::size_t totalCount = 0; //number of messages
while (_keepRunning || totalCount) {
if (_doHalt) {
......@@ -110,7 +110,7 @@ void MQTTPusher::push() {
continue;
}
_halted = false;
//there was a (unintended) disconnect in the meantime --> reconnect
if (!_connected) {
LOGM(error) << "Lost connection. Reconnecting...";
......@@ -124,41 +124,41 @@ void MQTTPusher::push() {
}
if (_connected) {
if(getTimestamp() - idleTime >= PUSHER_IDLETIME) {
idleTime = getTimestamp();
totalCount = 0;
// Push sensor data
for (auto &p : _plugins) {
if (_doHalt) {
//for faster response
break;
}
for (const auto &g : p.configurator->getSensorGroups()) {
for (const auto &s : g->acquireSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
} else {
break; //ultimately we will go to sleep 1 second
}
}
}
g->releaseSensors();
}
}
// Push output analytics sensors
for (auto &p : _operatorPlugins) {
if (_doHalt) {
break;
}
for (const auto &op : p.configurator->getOperators()) {
if(op->getStreaming()) {
if (getTimestamp() - idleTime >= PUSHER_IDLETIME) {
idleTime = getTimestamp();
totalCount = 0;
// Push sensor data
for (auto &p : _plugins) {
if (_doHalt) {
//for faster response
break;
}
for (const auto &g : p.configurator->getSensorGroups()) {
for (const auto &s : g->acquireSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
} else {
break; //ultimately we will go to sleep 1 second
}
}
}
g->releaseSensors();
}
}
// Push output analytics sensors
for (auto &p : _operatorPlugins) {
if (_doHalt) {
break;
}
for (const auto &op : p.configurator->getOperators()) {
if (op->getStreaming()) {
for (const auto &u : op->getUnits()) {
for (const auto &s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= op->getMinValues()) {
if (_msgCap == DISABLED || totalCount < (unsigned) _maxNumberOfMessages) {
if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
......@@ -170,9 +170,9 @@ void MQTTPusher::push() {
}
op->releaseUnits();
}
}
}
}
}
}
}
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
if (mosqErr == MOSQ_ERR_CONN_LOST) {
......@@ -188,23 +188,23 @@ void MQTTPusher::push() {
mosquitto_disconnect(_mosq);
}
int MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount) {
int MQTTPusher::sendReadings(SensorBase &s, reading_t *reads, std::size_t &totalCount) {
//get all sensor values out of its queue
std::size_t count = s.popReadingQueue(reads, SensorBase::QUEUE_MAXLIMIT);
//totalCount+= count;
totalCount+= 1;
totalCount += 1;
#ifdef DEBUG
LOGM(debug) << "Sending " << count << " values from " << s.getName();
#endif
#if DEBUG
for (std::size_t i=0; i<count; i++) {
for (std::size_t i = 0; i < count; i++) {
LOG(debug) << " " << reads[i].timestamp << " " << reads[i].value;
}
#endif
//try to send them to the broker
int rc;
if ((rc = mosquitto_publish(_mosq, NULL, s.getMqtt().c_str(), sizeof(reading_t)*count, reads, _qosLevel, false)) != MOSQ_ERR_SUCCESS) {
if ((rc = mosquitto_publish(_mosq, NULL, s.getMqtt().c_str(), sizeof(reading_t) * count, reads, _qosLevel, false)) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
if (rc == MOSQ_ERR_NOMEM) {
LOGM(info) << "Can\'t queue additional messages";
......@@ -221,15 +221,16 @@ int MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& total
}
bool MQTTPusher::sendMappings() {
if(!_autoPublish) return false;
if (!_autoPublish)
return false;
std::string topic, payload;
unsigned int publishCtr=0;
std::string topic, payload;
unsigned int publishCtr = 0;
// Performing auto-publish for sensors
for(auto& p: _plugins) {
for(auto& g: p.configurator->getSensorGroups()) {
for(auto& s: g->acquireSensors()) {
if(s->getPublish()) {
for (auto &p : _plugins) {
for (auto &g : p.configurator->getSensorGroups()) {
for (auto &s : g->acquireSensors()) {
if (s->getPublish()) {
if (!s->getMetadata()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
payload = s->getName();
......@@ -257,11 +258,11 @@ bool MQTTPusher::sendMappings() {
}
// Performing auto-publish for analytics output sensors
for(auto& p: _operatorPlugins)
for(auto& op: p.configurator->getOperators())
if(op->getStreaming() && !op->getDynamic()) {
for (auto &u: op->getUnits())
for (auto &s: u->getBaseOutputs()) {
for (auto &p : _operatorPlugins)
for (auto &op : p.configurator->getOperators())
if (op->getStreaming() && !op->getDynamic()) {
for (auto &u : op->getUnits())
for (auto &s : u->getBaseOutputs()) {
if (s->getPublish()) {
if (!s->getMetadata()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
......@@ -292,47 +293,47 @@ bool MQTTPusher::sendMappings() {
}
bool MQTTPusher::halt(unsigned short timeout) {
_doHalt = true;
_doHalt = true;
for (unsigned short i = 1; i <= timeout; i++) {
if (_halted) {
return true;
} else {
LOGM(info) << "Waiting for push cycle to pause... (" << i << "/" << timeout << ")";
sleep(1);
}
}
for (unsigned short i = 1; i <= timeout; i++) {
if (_halted) {
return true;
} else {
LOGM(info) << "Waiting for push cycle to pause... (" << i << "/" << timeout << ")";
sleep(1);
}
}
cont();
LOGM(info) << "Timeout: push cycle did not pause. Continuing...";
return false;
cont();
LOGM(info) << "Timeout: push cycle did not pause. Continuing...";
return false;
}
void MQTTPusher::computeMsgRate() {
// Computing number of sent MQTT messages per second
float msgRate = 0;
bool dynWarning = false;
for(auto& p : _plugins) {
for(const auto& g : p.configurator->getSensorGroups()) {
msgRate += (float)g->acquireSensors().size() * ( 1000.0f / (float)g->getInterval() ) / (float)g->getMinValues();
bool dynWarning = false;
for (auto &p : _plugins) {
for (const auto &g : p.configurator->getSensorGroups()) {
msgRate += (float)g->acquireSensors().size() * (1000.0f / (float)g->getInterval()) / (float)g->getMinValues();
g->releaseSensors();
}
}
for(auto& p : _operatorPlugins)
for(const auto& op : p.configurator->getOperators()) {
if (op->getStreaming() && !op->getDynamic()) {
for (const auto &u : op->getUnits())
msgRate += (float) u->getBaseOutputs().size() * (1000.0f / (float) op->getInterval()) / (float) op->getMinValues();
op->releaseUnits();
} else if (op->getDynamic())
dynWarning = true;
}
for (auto &p : _operatorPlugins)
for (const auto &op : p.configurator->getOperators()) {
if (op->getStreaming() && !op->getDynamic()) {
for (const auto &u : op->getUnits())
msgRate += (float)u->getBaseOutputs().size() * (1000.0f / (float)op->getInterval()) / (float)op->getMinValues();
op->releaseUnits();
} else if (op->getDynamic())
dynWarning = true;
}
// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
if(_maxNumberOfMessages >= 0 && _msgCap != MINIMUM) {
if (_maxNumberOfMessages >= 0 && _msgCap != MINIMUM) {
_msgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages ? DISABLED : ENABLED;
if (_msgCap == DISABLED && _maxNumberOfMessages > 0)
LOGM(warning) << "Cannot enforce max rate of " << _maxNumberOfMessages << " msg/s lower than actual " << msgRate << " msg/s!";
else if(_maxNumberOfMessages > 0)
else if (_maxNumberOfMessages > 0)
LOGM(info) << "Enforcing message cap of " << _maxNumberOfMessages << " msg/s against actual " << msgRate << " msg/s.";
else
LOGM(info) << "No message cap enforced. Predicted message rate is " << msgRate << " msg/s.";
......@@ -341,6 +342,6 @@ void MQTTPusher::computeMsgRate() {
_maxNumberOfMessages = msgRate + 10;
LOGM(info) << "Enforcing message cap of " << _maxNumberOfMessages << " msg/s against actual " << msgRate << " msg/s.";
}
if(_msgCap!=DISABLED && dynWarning)
LOGM(warning) << "Attention! The computed message rate does not account for analyzers with dynamically-generated sensors.";
if (_msgCap != DISABLED && dynWarning)
LOGM(warning) << "Attention! The computed message rate does not account for analyzers with dynamically-generated sensors.";
}
......@@ -32,13 +32,15 @@
#define DCDB_MET "/DCDB_MAP/METADATA/"
#define PUSHER_IDLETIME 1000000000
#include <mosquitto.h>
#include <map>
#include "../analytics/OperatorManager.h"
#include "PluginManager.h"
#include "sensorbase.h"
#include "../analytics/OperatorManager.h"
#include <map>
#include <mosquitto.h>
enum msgCap_t {DISABLED = 1, ENABLED = 2, MINIMUM = 3};
enum msgCap_t { DISABLED = 1,
ENABLED = 2,
MINIMUM = 3 };
/**
* @brief Collects values from the sensors and pushes them to the database.
......@@ -46,9 +48,9 @@ enum msgCap_t {DISABLED = 1, ENABLED = 2, MINIMUM = 3};
* @ingroup pusher
*/
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& brokerHost, const bool autoPublish, int qosLevel,
pusherPluginStorage_t& plugins, op_pluginVector_t& oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
public:
MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel,
pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
virtual ~MQTTPusher();
/**
......@@ -104,25 +106,25 @@ public:
_doHalt = false;
}
private:
int sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount);
private:
int sendReadings(SensorBase &s, reading_t *reads, std::size_t &totalCount);
void computeMsgRate();
int _qosLevel;
int _brokerPort;
std::string _brokerHost;
bool _autoPublish;
pusherPluginStorage_t& _plugins;
op_pluginVector_t& _operatorPlugins;
struct mosquitto* _mosq;
bool _connected;
bool _keepRunning;
msgCap_t _msgCap;
bool _doHalt;
bool _halted;
int _maxNumberOfMessages;
unsigned int _maxInflightMsgNum;
unsigned int _maxQueuedMsgNum;
int _qosLevel;
int _brokerPort;
std::string _brokerHost;
bool _autoPublish;
pusherPluginStorage_t &_plugins;
op_pluginVector_t & _operatorPlugins;
struct mosquitto * _mosq;
bool _connected;
bool _keepRunning;
msgCap_t _msgCap;
bool _doHalt;
bool _halted;
int _maxNumberOfMessages;
unsigned int _maxInflightMsgNum;
unsigned int _maxQueuedMsgNum;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
......
......@@ -33,266 +33,266 @@
using namespace std;
PluginManager::PluginManager(const pluginSettings_t& pluginSettings) :
_pluginSettings(pluginSettings),
_cfgFilePath("./") {}
PluginManager::PluginManager(const pluginSettings_t &pluginSettings)
: _pluginSettings(pluginSettings),
_cfgFilePath("./") {}
PluginManager::~PluginManager() {
for(const auto& p : _plugins)
p.destroy(p.configurator);
_plugins.clear();
for (const auto &p : _plugins)
p.destroy(p.configurator);
_plugins.clear();
}
bool PluginManager::loadPlugin(const string& name,
const string& pluginPath,
const string& config) {
bool PluginManager::loadPlugin(const string &name,
const string &pluginPath,
const string &config) {
LOG(info) << "Loading plugin " << name << "...";
string pluginLib; //path to the plugin-lib
string pluginConfig; //path to config file for plugin
LOG(info) << "Loading plugin " << name << "...";
string pluginLib; //path to the plugin-lib
string pluginConfig; //path to config file for plugin
// build plugin path
pluginLib = "libdcdbplugin_" + name; //TODO add version information?
// build plugin path
pluginLib = "libdcdbplugin_" + name; //TODO add version information?
#if __APPLE__
pluginLib+= ".dylib";
pluginLib += ".dylib";
#else
pluginLib+= ".so";
pluginLib += ".so";
#endif
if (pluginPath != "") {
if (pluginPath[pluginPath.length()-1] != '/') {
pluginLib = "/" + pluginLib;
}
pluginLib = pluginPath + pluginLib;
}
//if config-path not specified we will look for pluginName.conf in the dcdbpusher.conf directory
if (config == "") {
pluginConfig = _cfgFilePath + name + ".conf";
} else {
if (config[0] == '/') {
pluginConfig = config;
if (pluginPath != "") {
if (pluginPath[pluginPath.length() - 1] != '/') {
pluginLib = "/" + pluginLib;
}
pluginLib = pluginPath + pluginLib;