Notice to GitKraken users: A vulnerability has been found in the SSH key generation of GitKraken versions 7.6.0 to 8.0.0 (https://www.gitkraken.com/blog/weak-ssh-key-fix). If you use GitKraken and have generated a SSH key using one of these versions, please remove it both from your local workstation and from your LRZ GitLab profile.

21.10.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit ed0d3654 authored by Alessio Netti's avatar Alessio Netti
Browse files

Merge remote-tracking branch 'remotes/origin/development'

parents 7142e0f0 47da1a33
...@@ -220,7 +220,7 @@ void OperatorManager::unloadPlugin(const string& id) { ...@@ -220,7 +220,7 @@ void OperatorManager::unloadPlugin(const string& id) {
_plugins.clear(); _plugins.clear();
} }
bool OperatorManager::init(boost::asio::io_service& io, const string& plugin) { bool OperatorManager::init(const string& plugin) {
if(_status != LOADED) { if(_status != LOADED) {
LOG(error) << "Cannot init, OperatorManager is not loaded!"; LOG(error) << "Cannot init, OperatorManager is not loaded!";
return false; return false;
...@@ -232,12 +232,12 @@ bool OperatorManager::init(boost::asio::io_service& io, const string& plugin) { ...@@ -232,12 +232,12 @@ bool OperatorManager::init(boost::asio::io_service& io, const string& plugin) {
out = true; out = true;
LOG(info) << "Init " << p.id << " operator plugin"; LOG(info) << "Init " << p.id << " operator plugin";
for (const auto &op : p.configurator->getOperators()) for (const auto &op : p.configurator->getOperators())
op->init(io); op->init(_io);
} }
return out; return out;
} }
bool OperatorManager::reload(boost::asio::io_service& io, const string& plugin) { bool OperatorManager::reload(const string& plugin) {
if(_status != LOADED) { if(_status != LOADED) {
LOG(error) << "Cannot reload, OperatorManager is not loaded!"; LOG(error) << "Cannot reload, OperatorManager is not loaded!";
return false; return false;
...@@ -259,7 +259,7 @@ bool OperatorManager::reload(boost::asio::io_service& io, const string& plugin) ...@@ -259,7 +259,7 @@ bool OperatorManager::reload(boost::asio::io_service& io, const string& plugin)
return false; return false;
} else } else
for (const auto &op : p.configurator->getOperators()) for (const auto &op : p.configurator->getOperators())
op->init(io); op->init(_io);
} }
return out; return out;
} }
......
...@@ -82,8 +82,9 @@ public: ...@@ -82,8 +82,9 @@ public:
/** /**
* @brief Class constructor * @brief Class constructor
* @param io Boost ASIO service to be used
*/ */
OperatorManager() { _status = CLEAR; } OperatorManager(boost::asio::io_context& io) : _io(io) { _status = CLEAR; }
/** /**
* @brief Class destructor * @brief Class destructor
...@@ -130,11 +131,10 @@ public: ...@@ -130,11 +131,10 @@ public:
* This method must be called after "load", and before "start". It will prepare operators for * This method must be called after "load", and before "start". It will prepare operators for
* operation, and initialize the related sensors and caches. * operation, and initialize the related sensors and caches.
* *
* @param io Boost ASIO service to be used
* @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected * @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected
* @return true if successful, false otherwise * @return true if successful, false otherwise
*/ */
bool init(boost::asio::io_service& io, const string& plugin=""); bool init(const string& plugin="");
/** /**
* @brief Reload one or more plugins * @brief Reload one or more plugins
...@@ -142,11 +142,10 @@ public: ...@@ -142,11 +142,10 @@ public:
* This method will cause all running operators of a plugin to be stopped and destroyed. The * This method will cause all running operators of a plugin to be stopped and destroyed. The
* configuration file is then read once again, and new operators are created and initialized. * configuration file is then read once again, and new operators are created and initialized.
* *
* @param io Boost ASIO service to be used
* @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected * @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected
* @return true if successful, false otherwise * @return true if successful, false otherwise
*/ */
bool reload(boost::asio::io_service& io, const string& plugin=""); bool reload(const string& plugin="");
/** /**
* @brief Start one or more stored plugins * @brief Start one or more stored plugins
...@@ -452,6 +451,7 @@ private: ...@@ -452,6 +451,7 @@ private:
*/ */
void PUT_analytics_operator(endpointArgs); void PUT_analytics_operator(endpointArgs);
boost::asio::io_context& _io;
}; };
#endif //PROJECT_ANALYTICSMANAGER_H #endif //PROJECT_ANALYTICSMANAGER_H
...@@ -237,7 +237,7 @@ public: ...@@ -237,7 +237,7 @@ public:
virtual void init(boost::asio::io_service& io) final override { virtual void init(boost::asio::io_service& io) final override {
OperatorInterface::init(io); OperatorInterface::init(io);
for(const auto u : _units) for(const auto& u : _units)
u->init(_interval, _queueSize); u->init(_interval, _queueSize);
this->execOnInit(); this->execOnInit();
......
...@@ -180,11 +180,11 @@ public: ...@@ -180,11 +180,11 @@ public:
* @param interval Sampling interval in milliseconds * @param interval Sampling interval in milliseconds
*/ */
void init(unsigned int interval, unsigned int queueSize) override { void init(unsigned int interval, unsigned int queueSize) override {
for(const auto s : _outputs) for(const auto &s : _outputs)
if (!s->isInit()) if (!s->isInit())
s->initSensor(interval, queueSize); s->initSensor(interval, queueSize);
for (const auto &su : _subUnits) for (const auto &su : _subUnits)
for (const auto s : su->getOutputs()) for (const auto &s : su->getOutputs())
if (!s->isInit()) if (!s->isInit())
s->initSensor(interval, queueSize); s->initSensor(interval, queueSize);
} }
......
...@@ -219,7 +219,7 @@ void CARestAPI::POST_write(endpointArgs) { ...@@ -219,7 +219,7 @@ void CARestAPI::POST_write(endpointArgs) {
DCDB::SensorId sid; DCDB::SensorId sid;
if (sid.mqttTopicConvert(mqttTopic)) { if (sid.mqttTopicConvert(mqttTopic)) {
_sensorCache->storeSensor(sid, ts.getRaw(), value); _sensorCache->storeSensor(sid, ts.getRaw(), value);
_sensorDataStore->insert(&sid, ts.getRaw(), value); _sensorDataStore->insert(sid, ts.getRaw(), value);
_influxCounter++; _influxCounter++;
if (_influxSettings->publish && (_influxSensors.find(sid.getId()) == _influxSensors.end())) { if (_influxSettings->publish && (_influxSensors.find(sid.getId()) == _influxSensors.end())) {
...@@ -267,7 +267,7 @@ void CARestAPI::PUT_analytics_reload(endpointArgs) { ...@@ -267,7 +267,7 @@ void CARestAPI::PUT_analytics_reload(endpointArgs) {
// Wait until controller is paused in order to reload plugins // Wait until controller is paused in order to reload plugins
_analyticsController->halt(true); _analyticsController->halt(true);
if (!_analyticsController->getManager()->reload(_analyticsController->getIoService(), plugin)) { if (!_analyticsController->getManager()->reload(plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n"; res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found); res.result(http::status::not_found);
} else if (!_analyticsController->getManager()->start(plugin)) { } else if (!_analyticsController->getManager()->start(plugin)) {
...@@ -297,7 +297,7 @@ void CARestAPI::PUT_analytics_load(endpointArgs) { ...@@ -297,7 +297,7 @@ void CARestAPI::PUT_analytics_load(endpointArgs) {
res.body() = "Operator plugin " + plugin + " successfully loaded!\n"; res.body() = "Operator plugin " + plugin + " successfully loaded!\n";
res.result(http::status::ok); res.result(http::status::ok);
_analyticsController->getManager()->init(_analyticsController->getIoService(), plugin); _analyticsController->getManager()->init(plugin);
} else { } else {
res.body() = "Failed to load operator plugin " + plugin + "!\n"; res.body() = "Failed to load operator plugin " + plugin + "!\n";
res.result(http::status::internal_server_error); res.result(http::status::internal_server_error);
......
...@@ -86,16 +86,6 @@ bool AnalyticsController::initialize(Configuration& settings) { ...@@ -86,16 +86,6 @@ bool AnalyticsController::initialize(Configuration& settings) {
if(!_queryEngine.updating.is_lock_free()) if(!_queryEngine.updating.is_lock_free())
LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded."; LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
LOG(info) << "Creating threads...";
// Dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
// Inherited from DCDB Pusher
_keepAliveWork = make_shared<boost::asio::io_service::work>(_io);
// Create pool of threads which handle the sensors
for(size_t i = 0; i < _settings.threads; i++) {
_threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &_io));
}
LOG(info) << "Threads created!";
_initialized = true; _initialized = true;
return true; return true;
} }
...@@ -106,7 +96,7 @@ void AnalyticsController::run() { ...@@ -106,7 +96,7 @@ void AnalyticsController::run() {
return; return;
LOG(info) << "Init operators..."; LOG(info) << "Init operators...";
_manager->init(_io); _manager->init();
LOG(info) << "Starting operators..."; LOG(info) << "Starting operators...";
_manager->start(); _manager->start();
LOG(info) << "Sensors started!"; LOG(info) << "Sensors started!";
......
...@@ -65,10 +65,10 @@ public: ...@@ -65,10 +65,10 @@ public:
* @param dcdbCfg SensorConfig object to be used to retrieve sensor meta-data from Cassandra * @param dcdbCfg SensorConfig object to be used to retrieve sensor meta-data from Cassandra
* @param dcdbStore SensorDataStore object to be used to insert sensor readings into Cassandra * @param dcdbStore SensorDataStore object to be used to insert sensor readings into Cassandra
*/ */
AnalyticsController(DCDB::SensorConfig *dcdbCfg, DCDB::SensorDataStore *dcdbStore) { AnalyticsController(DCDB::SensorConfig *dcdbCfg, DCDB::SensorDataStore *dcdbStore, boost::asio::io_context& io)
_dcdbCfg = dcdbCfg; : _dcdbCfg(dcdbCfg),
_dcdbStore = dcdbStore; _dcdbStore(dcdbStore) {
_manager = make_shared<OperatorManager>(); _manager = make_shared<OperatorManager>(io);
_navigator = nullptr; _navigator = nullptr;
_sensorCache = nullptr; _sensorCache = nullptr;
_metadataStore = nullptr; _metadataStore = nullptr;
...@@ -179,13 +179,6 @@ public: ...@@ -179,13 +179,6 @@ public:
*/ */
uint64_t getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; } uint64_t getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; }
/**
* @brief Return the io_service used by the analytics controller.
*
* @return Reference to this object's boost::asio::io_service.
*/
boost::asio::io_service& getIoService() { return _io; }
/** /**
* @brief Rebuilds the internal sensor navigator. * @brief Rebuilds the internal sensor navigator.
* *
...@@ -227,8 +220,6 @@ private: ...@@ -227,8 +220,6 @@ private:
// Main management thread for the analytics controller // Main management thread for the analytics controller
boost::thread _mainThread; boost::thread _mainThread;
// IO service for the operators
boost::asio::io_service _io;
// Underlying thread pool // Underlying thread pool
boost::thread_group _threads; boost::thread_group _threads;
// Dummy task to keep thread pool alive // Dummy task to keep thread pool alive
......
...@@ -54,6 +54,7 @@ ...@@ -54,6 +54,7 @@
#include <boost/property_tree/json_parser.hpp> #include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp> #include <boost/property_tree/ptree.hpp>
#include <dcdb/libconfig.h>
#include <dcdb/connection.h> #include <dcdb/connection.h>
#include <dcdb/sensordatastore.h> #include <dcdb/sensordatastore.h>
#include <dcdb/jobdatastore.h> #include <dcdb/jobdatastore.h>
...@@ -92,6 +93,7 @@ uint64_t msgCtr; ...@@ -92,6 +93,7 @@ uint64_t msgCtr;
uint64_t readingCtr; uint64_t readingCtr;
uint64_t dbQueryCtr; uint64_t dbQueryCtr;
uint64_t cachedQueryCtr; uint64_t cachedQueryCtr;
uint64_t missesQueryCtr;
SensorCache mySensorCache; SensorCache mySensorCache;
AnalyticsController* analyticsController; AnalyticsController* analyticsController;
DCDB::Connection* dcdbConn; DCDB::Connection* dcdbConn;
...@@ -103,6 +105,7 @@ MetadataStore *metadataStore; ...@@ -103,6 +105,7 @@ MetadataStore *metadataStore;
CARestAPI* httpsServer = nullptr; CARestAPI* httpsServer = nullptr;
DCDB::SCError err; DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance(); QueryEngine& queryEngine = QueryEngine::getInstance();
boost::shared_ptr<boost::asio::io_context::work> keepAliveWork;
logger_t lg; logger_t lg;
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) { bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
...@@ -202,6 +205,8 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s ...@@ -202,6 +205,8 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s
reading.timestamp = r.timeStamp.getRaw(); reading.timestamp = r.timeStamp.getRaw();
buffer.push_back(reading); buffer.push_back(reading);
} }
} else {
missesQueryCtr += topics.size();
} }
} }
catch (const std::exception &e) {} catch (const std::exception &e) {}
...@@ -271,6 +276,8 @@ void sigHandler(int sig) ...@@ -271,6 +276,8 @@ void sigHandler(int sig)
LOG(fatal) << "Received SIGUSR1 via REST API"; LOG(fatal) << "Received SIGUSR1 via REST API";
retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode(); retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode();
} }
keepAliveWork.reset();
keepRunning = 0; keepRunning = 0;
} }
...@@ -669,6 +676,9 @@ int main(int argc, char* const argv[]) { ...@@ -669,6 +676,9 @@ int main(int argc, char* const argv[]) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
libConfig.init();
libConfig.setTempDir(pluginSettings.tempdir);
//set up logger to file //set up logger to file
if (settings.logLevelFile >= 0) { if (settings.logLevelFile >= 0) {
...@@ -754,8 +764,11 @@ int main(int argc, char* const argv[]) { ...@@ -754,8 +764,11 @@ int main(int argc, char* const argv[]) {
metadataStore->store(*sBuf.getPattern(), sBuf); metadataStore->store(*sBuf.getPattern(), sBuf);
} }
publicSensors.clear(); publicSensors.clear();
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore); boost::asio::io_context io;
boost::thread_group threads;
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore, io);
analyticsController->setCache(&mySensorCache); analyticsController->setCache(&mySensorCache);
analyticsController->setMetadataStore(metadataStore); analyticsController->setMetadataStore(metadataStore);
queryEngine.setFilter(analyticsSettings.filter); queryEngine.setFilter(analyticsSettings.filter);
...@@ -779,10 +792,12 @@ int main(int argc, char* const argv[]) { ...@@ -779,10 +792,12 @@ int main(int argc, char* const argv[]) {
LOG(info) << " MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort; LOG(info) << " MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
LOG(info) << " CacheInterval: " << int(pluginSettings.cacheInterval/1000) << " [s]"; LOG(info) << " CacheInterval: " << int(pluginSettings.cacheInterval/1000) << " [s]";
LOG(info) << " CleaningInterval: " << settings.cleaningInterval << " [s]"; LOG(info) << " CleaningInterval: " << settings.cleaningInterval << " [s]";
LOG(info) << " Threads: " << settings.threads;
LOG(info) << " MessageThreads: " << settings.messageThreads; LOG(info) << " MessageThreads: " << settings.messageThreads;
LOG(info) << " MessageSlots: " << settings.messageSlots; LOG(info) << " MessageSlots: " << settings.messageSlots;
LOG(info) << " Daemonize: " << (settings.daemonize ? "Enabled" : "Disabled"); LOG(info) << " Daemonize: " << (settings.daemonize ? "Enabled" : "Disabled");
LOG(info) << " StatisticsInterval: " << settings.statisticsInterval << " [s]"; LOG(info) << " StatisticsInterval: " << settings.statisticsInterval << " [s]";
LOG(info) << " StatisticsMqttPart: " << settings.statisticsMqttPart;
LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix; LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix;
LOG(info) << " Auto-publish: " << (pluginSettings.autoPublish ? "Enabled" : "Disabled"); LOG(info) << " Auto-publish: " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
LOG(info) << " Write-Dir: " << pluginSettings.tempdir; LOG(info) << " Write-Dir: " << pluginSettings.tempdir;
...@@ -852,9 +867,20 @@ int main(int argc, char* const argv[]) { ...@@ -852,9 +867,20 @@ int main(int argc, char* const argv[]) {
if (settings.validateConfig) if (settings.validateConfig)
return EXIT_SUCCESS; return EXIT_SUCCESS;
else
analyticsController->start(); LOG(info) << "Creating threads...";
// Dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
// Inherited from DCDB Pusher
keepAliveWork = boost::make_shared<boost::asio::io_context::work>(io);
// Create pool of threads which handle the sensors
for(size_t i = 0; i < settings.threads; i++) {
threads.create_thread(bind(static_cast< size_t (boost::asio::io_context::*) () >(&boost::asio::io_context::run), &io));
}
LOG(info) << "Threads created!";
analyticsController->start();
LOG(info) << "AnalyticsController running...";
/* /*
* Start the MQTT Message Server. * Start the MQTT Message Server.
*/ */
...@@ -869,7 +895,7 @@ int main(int argc, char* const argv[]) { ...@@ -869,7 +895,7 @@ int main(int argc, char* const argv[]) {
* Start the HTTP Server for the REST API * Start the HTTP Server for the REST API
*/ */
if (restAPISettings.enabled) { if (restAPISettings.enabled) {
httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms, analyticsController->getIoService()); httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms, io);
config.readRestAPIUsers(httpsServer); config.readRestAPIUsers(httpsServer);
httpsServer->start(); httpsServer->start();
LOG(info) << "HTTP Server running..."; LOG(info) << "HTTP Server running...";
...@@ -886,6 +912,7 @@ int main(int argc, char* const argv[]) { ...@@ -886,6 +912,7 @@ int main(int argc, char* const argv[]) {
readingCtr = 0; readingCtr = 0;
dbQueryCtr = 0; dbQueryCtr = 0;
cachedQueryCtr = 0; cachedQueryCtr = 0;
missesQueryCtr = 0;
start = getTimestamp(); start = getTimestamp();
uint64_t lastCleanup = start; uint64_t lastCleanup = start;
...@@ -893,31 +920,32 @@ int main(int argc, char* const argv[]) { ...@@ -893,31 +920,32 @@ int main(int argc, char* const argv[]) {
LOG(info) << "Collect Agent running..."; LOG(info) << "Collect Agent running...";
while(keepRunning) { while(keepRunning) {
start = getTimestamp(); start = getTimestamp();
if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) { if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) {
uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval)); uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval));
lastCleanup = start; lastCleanup = start;
if(purged > 0) if(purged > 0)
LOG(info) << "Cache: purged " << purged << " obsolete entries"; LOG(info) << "Cache: purged " << purged << " obsolete entries";
} }
if(newAutoPub) { if(newAutoPub) {
newAutoPub = false; newAutoPub = false;
mySensorConfig->setPublishedSensorsWritetime(getTimestamp()); mySensorConfig->setPublishedSensorsWritetime(getTimestamp());
} }
sleep(sleepInterval); sleep(sleepInterval);
if((settings.statisticsInterval > 0) && keepRunning) { if((settings.statisticsInterval > 0) && keepRunning) {
/* not really thread safe but will do the job */ /* not really thread safe but will do the job */
end = getTimestamp(); end = getTimestamp();
elapsed = (float)(NS_TO_S(end) - NS_TO_S(start)); elapsed = (float)(NS_TO_S(end) - NS_TO_S(start));
float aIns = ceil(((float)analyticsController->getReadingCtr()) / elapsed); float aIns = ceil(((float)analyticsController->getReadingCtr()) / elapsed);
float cacheReq = ceil(((float)cachedQueryCtr) / elapsed); float cacheReq = ceil(((float)cachedQueryCtr) / elapsed);
float missesReq = ceil(((float)missesQueryCtr) / elapsed);
float dbReq = ceil(((float)dbQueryCtr) / elapsed); float dbReq = ceil(((float)dbQueryCtr) / elapsed);
float rIns = restAPISettings.enabled ? ceil(((float)httpsServer->getInfluxCounter()) / elapsed) : 0.0f; float rIns = restAPISettings.enabled ? ceil(((float)httpsServer->getInfluxCounter()) / elapsed) : 0.0f;
float mIns = ceil(((float)readingCtr) / elapsed); float mIns = ceil(((float)readingCtr) / elapsed);
float mMsg = ceil(((float) msgCtr) / elapsed); float mMsg = ceil(((float) msgCtr) / elapsed);
LOG(info) << "Performance: MQTT [" << std::fixed << std::setprecision(0) << mIns << " ins/s|" << mMsg << " msg/s] REST [" << rIns << " ins/s] Analytics [" << aIns << " ins/s] Cache [" << cacheReq << " req/s] DB [" << dbReq << " req/s]"; LOG(info) << "Performance: MQTT [" << std::fixed << std::setprecision(0) << mIns << " ins/s|" << mMsg << " msg/s] REST [" << rIns << " ins/s] Analytics [" << aIns << " ins/s] Cache [" << cacheReq << " req/s] DB [" << dbReq << " req/s] Miss [" << missesReq << " req/s]";
std::map<std::string, hostInfo_t> lastSeen = ms.collectLastSeen(); std::map<std::string, hostInfo_t> lastSeen = ms.collectLastSeen();
uint64_t connectedHosts = 0; uint64_t connectedHosts = 0;
for (auto h: lastSeen) { for (auto h: lastSeen) {
...@@ -926,8 +954,25 @@ int main(int argc, char* const argv[]) { ...@@ -926,8 +954,25 @@ int main(int argc, char* const argv[]) {
} }
} }
LOG(info) << "Connected hosts: " << connectedHosts; LOG(info) << "Connected hosts: " << connectedHosts;
if (settings.statisticsMqttPart.size() > 0) {
std::string statisticsMqttTopic = pluginSettings.mqttPrefix + settings.statisticsMqttPart;
std::list<SensorDataStoreReading> stats;
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/msgsRcvd"), end, msgCtr));
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/cachedQueries"), end, cachedQueryCtr));
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/missedQueries"), end, missesQueryCtr));
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/dbQueries"), end, dbQueryCtr));
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/readingsRcvd"), end, readingCtr));
stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/hosts"), end, connectedHosts));
for (auto s: stats) {
mySensorDataStore->insert(s);
mySensorCache.storeSensor(s);
}
}
msgCtr = 0; msgCtr = 0;
cachedQueryCtr = 0; cachedQueryCtr = 0;
missesQueryCtr = 0;
dbQueryCtr = 0; dbQueryCtr = 0;
readingCtr = 0; readingCtr = 0;
} }
......
...@@ -61,6 +61,10 @@ void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) { ...@@ -61,6 +61,10 @@ void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
} }
} }
void SensorCache::storeSensor(const SensorDataStoreReading& s) {
storeSensor(s.sensorId, s.timeStamp.getRaw(), s.value);
}
int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) { int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
/* Remove the reserved bytes to leverage the standard find function */ /* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0); sid.setRsvd(0);
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <atomic> #include <atomic>
#include <dcdb/sensorid.h> #include <dcdb/sensorid.h>
#include <dcdb/timestamp.h> #include <dcdb/timestamp.h>
#include <dcdb/sensordatastore.h>
#include "cacheentry.h" #include "cacheentry.h"
using namespace DCDB; using namespace DCDB;
...@@ -63,6 +64,13 @@ public: ...@@ -63,6 +64,13 @@ public:
**/ **/
void storeSensor(SensorId sid, uint64_t ts, int64_t val); void storeSensor(SensorId sid, uint64_t ts, int64_t val);
/**
* @brief Store a sensor reading in the SensorCache.
*
* @param s The SensorDataStoreReading object of the sensor data to be cached.
**/
void storeSensor(const SensorDataStoreReading& s);
/** /**
* @brief Return a sensor reading or the average of the last readings * @brief Return a sensor reading or the average of the last readings
* from the SensorCache. * from the SensorCache.
......
...@@ -207,6 +207,7 @@ public: ...@@ -207,6 +207,7 @@ public:
bool validateConfig = false;