Commit 4bebf9ea authored by Micha Mueller's avatar Micha Mueller
Browse files

Add new sensor functionality stopPolling(): stops collection of sensor values...

Add new sensor functionality stopPolling(): stops collection of sensor values until startPolling() is called again
parent 68d1f169
......@@ -12,7 +12,7 @@
#include <functional>
Sensor::Sensor(const std::string name) :
_name(name), _mqtt(""), _minValues(1), _interval(1000) {
_name(name), _mqtt(""), _keepRunning(0), _minValues(1), _interval(1000) {
_latestValue.timestamp = 0;
_latestValue.value = 0;
......@@ -25,6 +25,9 @@ Sensor::~Sensor() {
if(_readingQueue) {
delete _readingQueue;
}
if(_timer) {
delete _timer;
}
}
const std::string& Sensor::getName() const {
......@@ -70,3 +73,12 @@ std::size_t Sensor::popReadingQueue(reading_t *reads, std::size_t max) const {
void Sensor::pushReadingQueue(reading_t *reads, std::size_t count) const {
_readingQueue->push(reads, count);
}
void Sensor::init(boost::asio::io_service& io) {
if(!_readingQueue) {
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
}
if(!_timer) {
_timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(0));
}
}
......@@ -47,14 +47,19 @@ public:
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const;
void pushReadingQueue(reading_t *reads, std::size_t count) const;
//can be overwritten methods
virtual void init(boost::asio::io_service& io);
//have to be overwritten methods
virtual void read() = 0;
virtual void readAsync() = 0;
virtual void startPolling(boost::asio::io_service& io) = 0;
virtual void startPolling() = 0;
virtual void stopPolling() = 0;
protected:
std::string _name;
std::string _mqtt;
int _keepRunning;
unsigned int _minValues;
unsigned int _interval;
reading_t _latestValue;
......
......@@ -46,10 +46,18 @@
using namespace std;
volatile int keepRunning;
sensorVector_t sensors;
void sigintHandler(int sig) {
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
LOG(info) << "Received SIGINT. Stopping sensor threads and flushing MQTT queues.";
LOG(info) << "Received SIGINT";
//Stop all sensors
LOG(info) << "Stopping sensors...";
for(auto s : sensors) {
s->stopPolling();
}
LOG(info) << "Flushing MQTT queues...";
//Stop MQTTPusher
keepRunning = 0;
}
......@@ -132,7 +140,6 @@ int main(int argc, char** argv) {
boost::thread_group threads;
Configuration cfg(argv[argc-1]);
global_t globalSettings;
sensorVector_t sensors;
keepRunning = 1;
signal(SIGINT, sigintHandler); //Handle Strg+C
signal(SIGTERM, sigtermHandler); //Handle termination
......@@ -230,10 +237,16 @@ int main(int argc, char** argv) {
LOG(info) << " Write-Dir: " << globalSettings.tempdir;
//Init all sensors
LOG(info) << "Starting sensors...";
LOG(info) << "Init sensors...";
for(auto s : sensors) {
LOG(debug) << " -" << s->getName();
s->startPolling(io);
s->init(io);
}
//Start all sensors
LOG(info) << "Starting sensors...";
for(auto s : sensors) {
s->startPolling();
}
LOG(info) << "Sensors started!";
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment