Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 3ba3288e authored by Michael Ott's avatar Michael Ott
Browse files

Add statistics to dcdbpusher

parent 6468b5e4
......@@ -32,7 +32,7 @@
#include <unistd.h>
MQTTPusher::MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel,
pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum)
pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum, unsigned int statisticsInterval)
: _qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
......@@ -46,7 +46,8 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string &brokerHost, const bool
_halted(false),
_maxNumberOfMessages(maxNumberOfMessages),
_maxInflightMsgNum(maxInflightMsgNum),
_maxQueuedMsgNum(maxQueuedMsgNum) {
_maxQueuedMsgNum(maxQueuedMsgNum),
_statisticsInterval(statisticsInterval) {
//first print some info
int mosqMajor, mosqMinor, mosqRevision;
......@@ -109,6 +110,9 @@ void MQTTPusher::push() {
//collect sensor-data
reading_t * reads = new reading_t[1024];
std::size_t totalCount = 0; //number of messages
uint64_t msgCtr = 0;
uint64_t readingCtr = 0;
uint64_t lastStats = getTimestamp();
while (_keepRunning || totalCount) {
if (_doHalt) {
_halted = true;
......@@ -136,6 +140,7 @@ void MQTTPusher::push() {
if (_connected) {
if (getTimestamp() - idleTime >= PUSHER_IDLETIME) {
idleTime = getTimestamp();
readingCtr+= totalCount;
totalCount = 0;
// Push sensor data
for (auto &p : _plugins) {
......@@ -147,7 +152,9 @@ void MQTTPusher::push() {
for (const auto &s : g->acquireSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
if (sendReadings(*s, reads, totalCount) == 0) {
msgCtr++;
} else {
break;
}
} else {
......@@ -169,7 +176,9 @@ void MQTTPusher::push() {
for (const auto &s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= op->getMinValues()) {
if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
if (sendReadings(*s, reads, totalCount) == 0) {
msgCtr++;
} else {
break;
}
} else {
......@@ -192,6 +201,16 @@ void MQTTPusher::push() {
LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
}
}
if (_statisticsInterval > 0) {
uint64_t elapsed = getTimestamp() - lastStats;
if (NS_TO_S(elapsed) > _statisticsInterval) {
LOG(info) << "Statistics: " << (float) msgCtr*1000/(NS_TO_MS(elapsed)) << " messages/s, " << (float) readingCtr*1000/(NS_TO_MS(elapsed)) << " readings/s";
lastStats = getTimestamp();
msgCtr = 0;
readingCtr = 0;
}
}
}
}
delete[] reads;
......@@ -201,8 +220,8 @@ void MQTTPusher::push() {
int MQTTPusher::sendReadings(SensorBase &s, reading_t *reads, std::size_t &totalCount) {
//get all sensor values out of its queue
std::size_t count = s.popReadingQueue(reads, 1024);
//totalCount+= count;
totalCount += 1;
totalCount+= count;
// totalCount += 1;
#ifdef DEBUG
LOGM(debug) << "Sending " << count << " values from " << s.getName();
#endif
......@@ -223,8 +242,8 @@ int MQTTPusher::sendReadings(SensorBase &s, reading_t *reads, std::size_t &total
_connected = false;
}
s.pushReadingQueue(reads, count);
//totalCount -= count;
totalCount -= 1;
totalCount -= count;
// totalCount -= 1;
return 1;
}
return 0;
......
......@@ -50,7 +50,7 @@ enum msgCap_t { DISABLED = 1,
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel,
pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum, unsigned int statisticsInterval);
virtual ~MQTTPusher();
/**
......@@ -125,6 +125,7 @@ class MQTTPusher {
int _maxNumberOfMessages;
unsigned int _maxInflightMsgNum;
unsigned int _maxQueuedMsgNum;
unsigned int _statisticsInterval;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
......
......@@ -372,6 +372,8 @@ int main(int argc, char **argv) {
LOG(info) << " Auto-publish: " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
LOG(info) << " Write-Dir: " << pluginSettings.tempdir;
LOG(info) << " CacheInterval: " << pluginSettings.cacheInterval / 1000 << " [s]";
LOG(info) << " StatisticsInterval: " << globalSettings.statisticsInterval << " [s]";
if (globalSettings.validateConfig) {
LOG(info) << " Only validating config files.";
} else {
......@@ -406,7 +408,7 @@ int main(int argc, char **argv) {
//MQTTPusher and Https server get their own threads
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.autoPublish, globalSettings.qosLevel,
_pluginManager->getPlugins(), _operatorManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
_pluginManager->getPlugins(), _operatorManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum, globalSettings.statisticsInterval);
if (restAPISettings.enabled) {
_httpsServer = new RestAPI(restAPISettings, _pluginManager, _mqttPusher, _operatorManager, io);
_configuration->readRestAPIUsers(_httpsServer);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment