Commit 642469ae authored by Micha Mueller's avatar Micha Mueller
Browse files

Add code infrastructure for first idea of how sensor groups could be realized

parent f271b192
......@@ -16,7 +16,7 @@ include $(DCDBCOREPATH)/common.mk
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lmosquitto -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lpthread -lcrypto -lssl -lcppnetlib-server-parsers -lcppnetlib-uri -rdynamic
OBJS = src/dcdbpusher.o src/Configuration.o src/Sensor.o src/MQTTPusher.o src/HttpsServer.o
OBJS = src/dcdbpusher.o src/Configuration.o src/Sensor.o src/MQTTPusher.o src/HttpsServer.o src/SensorGroup.o
PLUGINS_BASE = libdcdbplugin_pdu libdcdbplugin_sysfs libdcdbplugin_ipmi libdcdbplugin_bacnet libdcdbplugin_snmp
ifeq ($(OS),Darwin)
......
......@@ -9,6 +9,7 @@
#define SRC_CONFIGURATOR_H_
#include "Sensor.h"
#include "SensorGroup.h"
#include <vector>
#include <string>
......@@ -50,12 +51,21 @@ public:
for (auto s : _sensors) {
s->stopPolling();
}
for(auto g : _sensorGroups) {
g->stopPolling();
}
for (auto s : _sensors) {
s->wait();
}
for(auto g : _sensorGroups) {
g->wait();
}
for (auto s : _sensors) {
delete s;
}
for(auto g : _sensorGroups) {
delete g;
}
_sensors.clear();
return true;
}
......@@ -69,11 +79,16 @@ public:
return _sensors;
}
std::vector<SensorGroup*>& getSensorGroups() {
return _sensorGroups;
}
protected:
std::string _cfgPath;
std::string _mqttPrefix;
unsigned int _cacheInterval;
std::vector<Sensor*> _sensors;
std::vector<SensorGroup*> _sensorGroups;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
......
......@@ -150,6 +150,13 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(auto s : p.configurator->getSensors()) {
sensors.put(s->getName(), "");
}
for(auto g : p.configurator->getSensorGroups()) {
boost::property_tree::ptree group;
for(auto s : g->getSensors()) {
group.put(s->getName(), "");
}
sensors.add_child(g->getName(), group);
}
root.add_child(p.id, sensors);
boost::property_tree::write_info(data, root);
response = "";
......@@ -191,25 +198,20 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
response = "Sensor not found!";
for(auto s : p.configurator->getSensors()) {
if (s->getName() == sensor) {
uint64_t avg = 0;
const reading_t * const cache = s->getCache();
unsigned size = s->getCacheSize();
unsigned count = 0;
for(unsigned i = 0; i < size; i++) {
if (cache[i].timestamp > time) {
avg += cache[i].value;
count++;
}
}
if (count > 0)
avg /= count;
response = pathStrs[0] + "::" + sensor + " Average of last " + std::to_string(count) + " values is " + std::to_string(avg);
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(s, time);
connection->set_status(server::connection::ok);
break;
}
}
for(auto g : p.configurator->getSensorGroups()) {
for(auto s : g->getSensors()) {
if (s->getName() == sensor) {
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(s, time);
connection->set_status(server::connection::ok);
break;
}
}
}
}
}
}
......@@ -242,6 +244,9 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(auto s : p.configurator->getSensors()) {
s->startPolling();
}
for(auto g : p.configurator->getSensorGroups()) {
g->startPolling();
}
response = "Plugin " + pathStrs[0] + ": Sensors started";
connection->set_status(server::connection::ok);
break;
......@@ -253,6 +258,9 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(auto s : p.configurator->getSensors()) {
s->stopPolling();
}
for(auto g : p.configurator->getSensorGroups()) {
g->stopPolling();
}
response = "Plugin " + pathStrs[0] + ": Sensors stopped";
connection->set_status(server::connection::ok);
break;
......@@ -329,6 +337,24 @@ HttpsServer::~HttpsServer() {
delete _server;
}
std::string HttpsServer::calcAvg(Sensor* const s, uint64_t time) {
uint64_t avg = 0;
const reading_t * const cache = s->getCache();
unsigned size = s->getCacheSize();
unsigned count = 0;
for(unsigned i = 0; i < size; i++) {
if (cache[i].timestamp > time) {
avg += cache[i].value;
count++;
}
}
if (count > 0)
avg /= count;
return " Average of last " + std::to_string(count) + " values is " + std::to_string(avg);
}
bool HttpsServer::check_authkey(const std::string& authkey, permission requiredPerm) {
authkeyMap_t::iterator it = _authkeys.find(authkey);
if (it != _authkeys.end()) {
......
......@@ -86,6 +86,16 @@ private:
static std::string password_callback(std::size_t max_length, asio::ssl::context_base::password_purpose purpose);
*/
/*
* Calculate the average of the values in sensors s cache
*
* @param s Sensor of whose cache the average should be calculated
* @param time Restrict the average calculation only to values newer than time
*
* @return Response message of the form " Average of last *count* values is *avg*"
*/
std::string calcAvg(Sensor* const s, uint64_t time);
/*
* Check if the authkey is valid and has the permission requiredPerm associated
*
......
......@@ -83,52 +83,22 @@ void MQTTPusher::push() {
_halted = false;
totalCount = 0;
for(auto& p : _plugins) {
if(_doHalt) {
//for faster response
continue;
}
for(auto s : p.configurator->getSensors()) {
if (s->getSizeOfReadingQueue() >= s->getMinValues()) {
sendReadings(s, reads, totalCount);
}
//there was a (unintended) disconnect in the meantime --> reconnect
if (!_connected) {
LOGM(error) << "Lost connection. Reconnecting...";
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
sleep(5);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
}
if (_connected) {
//get all sensor values out of its queue
std::size_t count = s->popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
LOGM(debug) << "Sending " << count << " values from " << s->getName();
#endif
#if DEBUG
for (std::size_t i=0; i<count; i++) {
LOG(debug) << " " << reads[i].timestamp << " " << reads[i].value;
}
#endif
//try to send them to the broker
if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 1, false) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
LOGM(error) << "Could not send message! Trying again later";
_connected = false;
s->pushReadingQueue(reads, count);
totalCount -= count;
sleep(5);
break;
}
}
for(auto g : p.configurator->getSensorGroups()) {
for(auto s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
sendReadings(s, reads, totalCount);
}
}
if(_doHalt) {
//for faster response
continue;
}
}
}
sleep(1);
......@@ -136,3 +106,43 @@ void MQTTPusher::push() {
mosquitto_disconnect(_mosq);
mosquitto_loop_stop(_mosq, false);
}
void MQTTPusher::sendReadings(Sensor* s, reading_t* reads, std::size_t& totalCount) {
//there was a (unintended) disconnect in the meantime --> reconnect
if (!_connected) {
LOGM(error) << "Lost connection. Reconnecting...";
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
sleep(5);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
}
if (_connected) {
//get all sensor values out of its queue
std::size_t count = s->popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
LOGM(debug) << "Sending " << count << " values from " << s->getName();
#endif
#if DEBUG
for (std::size_t i=0; i<count; i++) {
LOG(debug) << " " << reads[i].timestamp << " " << reads[i].value;
}
#endif
//try to send them to the broker
if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 1, false) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
LOGM(error) << "Could not send message! Trying again later";
_connected = false;
s->pushReadingQueue(reads, count);
totalCount -= count;
sleep(5);
}
}
}
......@@ -43,6 +43,8 @@ public:
}
private:
void sendReadings(Sensor* s, reading_t* reads, std::size_t& totalCount);
int _brokerPort;
std::string _brokerHost;
pluginVector_t& _plugins;
......
/*
* SensorGroup.cpp
*
* Created on: 03.08.2018
* Author: Micha Mueller
*/
#include "SensorGroup.h"
SensorGroup::SensorGroup(const std::string name) :
_name(name), _keepRunning(0), _minValues(1), _interval(1000),
_cacheInterval(900000), _cacheSize(0), _cacheIndex(0), _pendingTasks(0) {
_timer = 0;
}
SensorGroup::~SensorGroup() {
if(_timer) {
delete _timer;
}
for(auto s : _sensors) {
delete s;
}
}
const std::string& SensorGroup::getName() const {
return _name;
}
void SensorGroup::setName(const std::string& name) {
_name = name;
}
unsigned SensorGroup::getMinValues() const {
return _minValues;
}
void SensorGroup::setMinValues(unsigned minValues) {
_minValues = minValues;
}
unsigned SensorGroup::getInterval() const {
return _interval;
}
void SensorGroup::setInterval(unsigned interval) {
_interval = interval;
}
unsigned SensorGroup::getCacheSize() const {
return _cacheSize;
}
void SensorGroup::setCacheInterval(unsigned cacheInterval) {
_cacheInterval = cacheInterval;
}
void SensorGroup::init(boost::asio::io_service& io) {
if (!_timer) {
_timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(0));
}
for (auto s : _sensors) {
s->init(io);
}
}
void SensorGroup::wait() {
while(_pendingTasks) {
sleep(1);
}
}
/*
* SensorGroup.h
*
* Created on: 03.08.2018
* Author: Micha Mueller
*/
#ifndef SENSORGROUP_H_
#define SENSORGROUP_H_
#include <string>
#include <vector>
#include <boost/asio.hpp>
#include "Logging.h"
#include "Sensor.h"
class SensorGroup {
public:
SensorGroup(const std::string name);
virtual ~SensorGroup();
//non-overwriteable methods
const std::string& getName() const;
void setName(const std::string& name);
unsigned getMinValues() const;
void setMinValues(unsigned minValues);
unsigned getInterval() const;
void setInterval(unsigned interval);
unsigned getCacheSize() const;
void setCacheInterval(unsigned cacheInterval);
//can be overwritten methods
virtual void init(boost::asio::io_service& io);
//have to be overwritten methods
virtual void read() = 0;
virtual void readAsync() = 0;
virtual void startPolling() = 0;
virtual void stopPolling() = 0;
std::vector<Sensor*>& getSensors() {
return _sensors;
}
void wait();
protected:
std::string _name;
int _keepRunning;
unsigned int _minValues;
unsigned int _interval;
unsigned int _cacheInterval;
unsigned int _cacheSize;
unsigned int _cacheIndex;
unsigned int _pendingTasks;
std::vector<Sensor*> _sensors;
boost::asio::deadline_timer* _timer;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
#endif /* SENSORGROUP_H_ */
......@@ -63,6 +63,9 @@ void sigintHandler(int sig) {
for(auto s : p.configurator->getSensors()) {
s->stopPolling();
}
for(auto g : p.configurator->getSensorGroups()) {
g->stopPolling();
}
}
//Stop io service by killing keepAliveWork
keepAliveWork.reset();
......@@ -309,6 +312,11 @@ int main(int argc, char** argv) {
LOG(debug) << " -" << s->getName();
s->init(io);
}
for(auto g : p.configurator->getSensorGroups()) {
LOG(debug) << " -Group: " << g->getName();
g->init(io);
}
}
//Start all sensors
......@@ -318,6 +326,10 @@ int main(int argc, char** argv) {
for(auto s : p.configurator->getSensors()) {
s->startPolling();
}
for(auto g : p.configurator->getSensorGroups()) {
g->startPolling();
}
}
LOG(info) << "Sensors started!";
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment