Commit f97e003d authored by lu43jih's avatar lu43jih
Browse files

spreading the messages over time

parent ff30abeb
# eclise files
.cproject
.project
...@@ -72,8 +72,8 @@ void MQTTPusher::push() { ...@@ -72,8 +72,8 @@ void MQTTPusher::push() {
} }
//collect sensor-data //collect sensor-data
reading_t* reads = new reading_t[1024]; reading_t* reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
std::size_t totalCount = 0; std::size_t totalCount = 0; //number of messages
while (_keepRunning || totalCount) { while (_keepRunning || totalCount) {
if (_doHalt) { if (_doHalt) {
_halted = true; _halted = true;
...@@ -82,6 +82,7 @@ void MQTTPusher::push() { ...@@ -82,6 +82,7 @@ void MQTTPusher::push() {
} }
_halted = false; _halted = false;
totalCount = 0; totalCount = 0;
unsigned int maxNumberOfMessages=100;
for(auto& p : _plugins) { for(auto& p : _plugins) {
if(_doHalt) { if(_doHalt) {
//for faster response //for faster response
...@@ -90,7 +91,11 @@ void MQTTPusher::push() { ...@@ -90,7 +91,11 @@ void MQTTPusher::push() {
for(auto g : p.configurator->getSensorGroups()) { for(auto g : p.configurator->getSensorGroups()) {
for(auto s : g->getSensors()) { for(auto s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) { if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
sendReadings(s, reads, totalCount); if(totalCount < maxNumberOfMessages){
sendReadings(s, reads, totalCount);
} else {
break; //ultimately we will go to sleep 1 second
}
} }
} }
} }
...@@ -117,8 +122,9 @@ void MQTTPusher::sendReadings(SensorBase* s, reading_t* reads, std::size_t& tota ...@@ -117,8 +122,9 @@ void MQTTPusher::sendReadings(SensorBase* s, reading_t* reads, std::size_t& tota
if (_connected) { if (_connected) {
//get all sensor values out of its queue //get all sensor values out of its queue
std::size_t count = s->popReadingQueue(reads, 1024); std::size_t count = s->popReadingQueue(reads, SensorBase::QUEUE_MAXLIMIT);
totalCount+= count; //totalCount+= count;
totalCount+= 1;
#ifdef DEBUG #ifdef DEBUG
LOGM(debug) << "Sending " << count << " values from " << s->getName(); LOGM(debug) << "Sending " << count << " values from " << s->getName();
#endif #endif
...@@ -135,7 +141,8 @@ void MQTTPusher::sendReadings(SensorBase* s, reading_t* reads, std::size_t& tota ...@@ -135,7 +141,8 @@ void MQTTPusher::sendReadings(SensorBase* s, reading_t* reads, std::size_t& tota
LOGM(error) << "Could not send message! Trying again later"; LOGM(error) << "Could not send message! Trying again later";
_connected = false; _connected = false;
s->pushReadingQueue(reads, count); s->pushReadingQueue(reads, count);
totalCount -= count; //totalCount -= count;
totalCount -= 1;
sleep(5); sleep(5);
} }
} }
......
...@@ -19,6 +19,8 @@ typedef struct { ...@@ -19,6 +19,8 @@ typedef struct {
class SensorBase { class SensorBase {
public: public:
static const size_t QUEUE_MAXLIMIT=1024;
SensorBase(const std::string& name) : SensorBase(const std::string& name) :
_name(name), _name(name),
_mqtt(""), _mqtt(""),
...@@ -69,7 +71,7 @@ public: ...@@ -69,7 +71,7 @@ public:
} }
} }
if(!_readingQueue) { if(!_readingQueue) {
_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(1024)); _readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment