Notice to GitKraken users: A vulnerability has been found in the SSH key generation of GitKraken versions 7.6.0 to 8.0.0 (https://www.gitkraken.com/blog/weak-ssh-key-fix). If you use GitKraken and have generated a SSH key using one of these versions, please remove it both from your local workstation and from your LRZ GitLab profile.

21.10.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit d6ff5f6a authored by Michael Ott's avatar Michael Ott
Browse files

Store dcdbpusher and CollectAgent statistics in database

parent b8e1afc1
...@@ -783,6 +783,7 @@ int main(int argc, char* const argv[]) { ...@@ -783,6 +783,7 @@ int main(int argc, char* const argv[]) {
LOG(info) << " MessageSlots: " << settings.messageSlots; LOG(info) << " MessageSlots: " << settings.messageSlots;
LOG(info) << " Daemonize: " << (settings.daemonize ? "Enabled" : "Disabled"); LOG(info) << " Daemonize: " << (settings.daemonize ? "Enabled" : "Disabled");
LOG(info) << " StatisticsInterval: " << settings.statisticsInterval << " [s]"; LOG(info) << " StatisticsInterval: " << settings.statisticsInterval << " [s]";
LOG(info) << " StatisticsMqttPart: " << settings.statisticsMqttPart;
LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix; LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix;
LOG(info) << " Auto-publish: " << (pluginSettings.autoPublish ? "Enabled" : "Disabled"); LOG(info) << " Auto-publish: " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
LOG(info) << " Write-Dir: " << pluginSettings.tempdir; LOG(info) << " Write-Dir: " << pluginSettings.tempdir;
...@@ -893,20 +894,20 @@ int main(int argc, char* const argv[]) { ...@@ -893,20 +894,20 @@ int main(int argc, char* const argv[]) {
LOG(info) << "Collect Agent running..."; LOG(info) << "Collect Agent running...";
while(keepRunning) { while(keepRunning) {
start = getTimestamp(); start = getTimestamp();
if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) { if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) {
uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval)); uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval));
lastCleanup = start; lastCleanup = start;
if(purged > 0) if(purged > 0)
LOG(info) << "Cache: purged " << purged << " obsolete entries"; LOG(info) << "Cache: purged " << purged << " obsolete entries";
} }
if(newAutoPub) { if(newAutoPub) {
newAutoPub = false; newAutoPub = false;
mySensorConfig->setPublishedSensorsWritetime(getTimestamp()); mySensorConfig->setPublishedSensorsWritetime(getTimestamp());
} }
sleep(sleepInterval); sleep(sleepInterval);
if((settings.statisticsInterval > 0) && keepRunning) { if((settings.statisticsInterval > 0) && keepRunning) {
/* not really thread safe but will do the job */ /* not really thread safe but will do the job */
end = getTimestamp(); end = getTimestamp();
...@@ -926,6 +927,21 @@ int main(int argc, char* const argv[]) { ...@@ -926,6 +927,21 @@ int main(int argc, char* const argv[]) {
} }
} }
LOG(info) << "Connected hosts: " << connectedHosts; LOG(info) << "Connected hosts: " << connectedHosts;
if (settings.statisticsMqttPart.size() > 0) {
std::string statisticsMqttTopic = pluginSettings.mqttPrefix + settings.statisticsMqttPart;
std::list<SensorDataStoreReading> stats;
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/msgs"), end, msgCtr));
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/cachedQueries"), end, cachedQueryCtr));
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/dbQueries"), end, dbQueryCtr));
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/readings"), end, readingCtr));
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/hosts"), end, connectedHosts));
for (auto s: stats) {
mySensorDataStore->insert(s);
mySensorCache.storeSensor(s);
}
}
msgCtr = 0; msgCtr = 0;
cachedQueryCtr = 0; cachedQueryCtr = 0;
dbQueryCtr = 0; dbQueryCtr = 0;
......
...@@ -61,6 +61,10 @@ void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) { ...@@ -61,6 +61,10 @@ void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
} }
} }
void SensorCache::storeSensor(const SensorDataStoreReading& s) {
storeSensor(s.sensorId, s.timeStamp.getRaw(), s.value);
}
int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) { int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
/* Remove the reserved bytes to leverage the standard find function */ /* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0); sid.setRsvd(0);
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <atomic> #include <atomic>
#include <dcdb/sensorid.h> #include <dcdb/sensorid.h>
#include <dcdb/timestamp.h> #include <dcdb/timestamp.h>
#include <dcdb/sensordatastore.h>
#include "cacheentry.h" #include "cacheentry.h"
using namespace DCDB; using namespace DCDB;
...@@ -63,6 +64,13 @@ public: ...@@ -63,6 +64,13 @@ public:
**/ **/
void storeSensor(SensorId sid, uint64_t ts, int64_t val); void storeSensor(SensorId sid, uint64_t ts, int64_t val);
/**
* @brief Store a sensor reading in the SensorCache.
*
* @param s The SensorDataStoreReading object of the sensor data to be cached.
**/
void storeSensor(const SensorDataStoreReading& s);
/** /**
* @brief Return a sensor reading or the average of the last readings * @brief Return a sensor reading or the average of the last readings
* from the SensorCache. * from the SensorCache.
......
...@@ -207,6 +207,7 @@ public: ...@@ -207,6 +207,7 @@ public:
bool validateConfig = false; bool validateConfig = false;
bool daemonize = false; bool daemonize = false;
int statisticsInterval = 60; int statisticsInterval = 60;
std::string statisticsMqttPart;
uint64_t threads = DEFAULT_THREADS; uint64_t threads = DEFAULT_THREADS;
int logLevelFile = -1; int logLevelFile = -1;
int logLevelCmd = DEFAULT_LOGLEVEL; int logLevelCmd = DEFAULT_LOGLEVEL;
......
...@@ -61,7 +61,7 @@ void GlobalConfiguration::readConfig() { ...@@ -61,7 +61,7 @@ void GlobalConfiguration::readConfig() {
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) { BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
// ----- READING PLUGIN SETTINGS ----- // ----- READING PLUGIN SETTINGS -----
if (boost::iequals(global.first, "mqttprefix")) { if (boost::iequals(global.first, "mqttprefix")) {
pluginSettings.mqttPrefix = global.second.data(); pluginSettings.mqttPrefix = MQTTChecker::formatTopic(global.second.data());
} else if (boost::iequals(global.first, "autoPublish")) { } else if (boost::iequals(global.first, "autoPublish")) {
pluginSettings.autoPublish = to_bool(global.second.data()); pluginSettings.autoPublish = to_bool(global.second.data());
} else if (boost::iequals(global.first, "tempdir")) { } else if (boost::iequals(global.first, "tempdir")) {
...@@ -81,6 +81,8 @@ void GlobalConfiguration::readConfig() { ...@@ -81,6 +81,8 @@ void GlobalConfiguration::readConfig() {
logLevelFile = stoi(global.second.data()); logLevelFile = stoi(global.second.data());
} else if (boost::iequals(global.first, "statisticsInterval")) { } else if (boost::iequals(global.first, "statisticsInterval")) {
statisticsInterval = stoi(global.second.data()); statisticsInterval = stoi(global.second.data());
} else if (boost::iequals(global.first, "statisticsMqttPart")) {
statisticsMqttPart = MQTTChecker::formatTopic(global.second.data());
} else if (!readAdditionalValues(global)) { } else if (!readAdditionalValues(global)) {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting"; LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
} }
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
#include <unistd.h> #include <unistd.h>
MQTTPusher::MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel, MQTTPusher::MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel,
pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum, unsigned int statisticsInterval) pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum, unsigned int statisticsInterval, std::string statisticsMqttTopic)
: _qosLevel(qosLevel), : _qosLevel(qosLevel),
_brokerPort(brokerPort), _brokerPort(brokerPort),
_brokerHost(brokerHost), _brokerHost(brokerHost),
...@@ -47,7 +47,8 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string &brokerHost, const bool ...@@ -47,7 +47,8 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string &brokerHost, const bool
_maxNumberOfMessages(maxNumberOfMessages), _maxNumberOfMessages(maxNumberOfMessages),
_maxInflightMsgNum(maxInflightMsgNum), _maxInflightMsgNum(maxInflightMsgNum),
_maxQueuedMsgNum(maxQueuedMsgNum), _maxQueuedMsgNum(maxQueuedMsgNum),
_statisticsInterval(statisticsInterval) { _statisticsInterval(statisticsInterval),
_statisticsMqttTopic(statisticsMqttTopic) {
//first print some info //first print some info
int mosqMajor, mosqMinor, mosqRevision; int mosqMajor, mosqMinor, mosqRevision;
...@@ -203,10 +204,22 @@ void MQTTPusher::push() { ...@@ -203,10 +204,22 @@ void MQTTPusher::push() {
} }
if (_statisticsInterval > 0) { if (_statisticsInterval > 0) {
uint64_t elapsed = getTimestamp() - lastStats; uint64_t ts = getTimestamp();
uint64_t elapsed = ts - lastStats;
if (NS_TO_S(elapsed) > _statisticsInterval) { if (NS_TO_S(elapsed) > _statisticsInterval) {
LOG(info) << "Statistics: " << (float) msgCtr*1000/(NS_TO_MS(elapsed)) << " messages/s, " << (float) readingCtr*1000/(NS_TO_MS(elapsed)) << " readings/s"; LOG(info) << "Statistics: " << (float) msgCtr*1000/(NS_TO_MS(elapsed)) << " messages/s, " << (float) readingCtr*1000/(NS_TO_MS(elapsed)) << " readings/s";
lastStats = getTimestamp(); if (_statisticsMqttTopic.size() != 0) {
reading_t r = { (int64_t) msgCtr, ts };
int rc = MOSQ_ERR_SUCCESS;
rc+= mosquitto_publish(_mosq, NULL, std::string(_statisticsMqttTopic+"/msgs").c_str(), sizeof(reading_t), &r, _qosLevel, false);
r = { (int64_t) readingCtr, ts };
rc+= mosquitto_publish(_mosq, NULL, std::string(_statisticsMqttTopic+"/readings").c_str(), sizeof(reading_t), &r, _qosLevel, false);
if (rc != MOSQ_ERR_SUCCESS) {
LOGM(info) << "Error sending statistics via MQTT: " << mosquitto_strerror(rc);
_connected = false;
}
}
lastStats = ts;
msgCtr = 0; msgCtr = 0;
readingCtr = 0; readingCtr = 0;
} }
......
...@@ -50,7 +50,7 @@ enum msgCap_t { DISABLED = 1, ...@@ -50,7 +50,7 @@ enum msgCap_t { DISABLED = 1,
class MQTTPusher { class MQTTPusher {
public: public:
MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel, MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel,
pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum, unsigned int statisticsInterval); pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum, unsigned int statisticsInterval, std::string statisticsMqttTopic);
virtual ~MQTTPusher(); virtual ~MQTTPusher();
/** /**
...@@ -126,6 +126,7 @@ class MQTTPusher { ...@@ -126,6 +126,7 @@ class MQTTPusher {
unsigned int _maxInflightMsgNum; unsigned int _maxInflightMsgNum;
unsigned int _maxQueuedMsgNum; unsigned int _maxQueuedMsgNum;
unsigned int _statisticsInterval; unsigned int _statisticsInterval;
std::string _statisticsMqttTopic;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg; boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
}; };
......
...@@ -250,7 +250,7 @@ int main(int argc, char **argv) { ...@@ -250,7 +250,7 @@ int main(int argc, char **argv) {
globalSettings.brokerPort = parseNetworkPort(optarg) == "" ? DEFAULT_BROKERPORT : stoi(parseNetworkPort(optarg)); globalSettings.brokerPort = parseNetworkPort(optarg) == "" ? DEFAULT_BROKERPORT : stoi(parseNetworkPort(optarg));
break; break;
case 'm': case 'm':
pluginSettings.mqttPrefix = optarg; pluginSettings.mqttPrefix = MQTTChecker::formatTopic(optarg);
break; break;
case 'v': case 'v':
globalSettings.logLevelCmd = stoi(optarg); globalSettings.logLevelCmd = stoi(optarg);
...@@ -380,6 +380,7 @@ int main(int argc, char **argv) { ...@@ -380,6 +380,7 @@ int main(int argc, char **argv) {
LOG(info) << " Write-Dir: " << pluginSettings.tempdir; LOG(info) << " Write-Dir: " << pluginSettings.tempdir;
LOG(info) << " CacheInterval: " << pluginSettings.cacheInterval / 1000 << " [s]"; LOG(info) << " CacheInterval: " << pluginSettings.cacheInterval / 1000 << " [s]";
LOG(info) << " StatisticsInterval: " << globalSettings.statisticsInterval << " [s]"; LOG(info) << " StatisticsInterval: " << globalSettings.statisticsInterval << " [s]";
LOG(info) << " StatisticsMqttPart: " << globalSettings.statisticsMqttPart;
if (globalSettings.validateConfig) { if (globalSettings.validateConfig) {
LOG(info) << " Only validating config files."; LOG(info) << " Only validating config files.";
...@@ -414,8 +415,13 @@ int main(int argc, char **argv) { ...@@ -414,8 +415,13 @@ int main(int argc, char **argv) {
LOG_VAR(vLogLevel) << "----- End Configuration -----"; LOG_VAR(vLogLevel) << "----- End Configuration -----";
//MQTTPusher and Https server get their own threads //MQTTPusher and Https server get their own threads
std::string statisticsMqttTopic;
if (globalSettings.statisticsMqttPart.size() > 0) {
statisticsMqttTopic = pluginSettings.mqttPrefix + globalSettings.statisticsMqttPart;
}
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.autoPublish, globalSettings.qosLevel, _mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.autoPublish, globalSettings.qosLevel,
_pluginManager->getPlugins(), _operatorManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum, globalSettings.statisticsInterval); _pluginManager->getPlugins(), _operatorManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum, globalSettings.statisticsInterval,
statisticsMqttTopic);
if (restAPISettings.enabled) { if (restAPISettings.enabled) {
_httpsServer = new RestAPI(restAPISettings, _pluginManager, _mqttPusher, _operatorManager, io); _httpsServer = new RestAPI(restAPISettings, _pluginManager, _mqttPusher, _operatorManager, io);
_configuration->readRestAPIUsers(_httpsServer); _configuration->readRestAPIUsers(_httpsServer);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment