Commit ae99a12a authored by Daniele Tafani's avatar Daniele Tafani
Browse files

Created an include folder for header files common to all pushers

parent d1e3e317
include ../config.mk
CXXFLAGS = -O0 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/
CXXFLAGS = -O0 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/
OBJS = collectagent.o \
simplemqttserver.o \
simplemqttserverthread.o \
......
......@@ -16,6 +16,7 @@
#include <dcdb/sensordatastore.h>
#include "simplemqttserver.h"
#include "messaging.h"
using namespace std;
......@@ -53,25 +54,16 @@ void mqttCallback(SimpleMQTTMessage *msg)
if(msg->getPayloadLength()==sizeof(uint64_t)) {
val = *((uint64_t*)msg->getPayload());
/*
* Calculate Time Stamp.
*/
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = now - epoch;
ts = diff.total_nanoseconds();
ts = Messaging::calculateTimestamp();
}
//...otherwise it just retrieves it from the MQTT message payload.
else {
else if((msg->getPayloadLength()%sizeof(mqttPayload)==0) && (msg->getPayloadLength()>0)){
uint64_t *payload;
payload = (uint64_t*)msg->getPayload();
mqttPayload *payload = (mqttPayload*)msg->getPayload();
val = payload[0];
ts = payload[1];
val = payload->value; // payload[n].value
ts = payload->timestamp;
}
/*
......
/*
* messaging.h
*
* Created on: Mar 20, 2014
* Author: Daniele Tafani
*/
#include <boost/thread/mutex.hpp>
#ifndef MESSAGING_H_
#define MESSAGING_H_
typedef struct {
uint64_t value;
uint64_t timestamp;
} mqttPayload;
class Messaging
{
public:
static uint64_t calculateTimestamp() {
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = now - epoch;
return diff.total_nanoseconds();
}
};
#endif /* MESSAGING_H_ */
/*
* pusherpqueue.h
*
* Created on: Aug 29, 2013
* Author: Axel Auweter
*/
#include <list>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/lock_types.hpp>
#ifndef PUSHERPQUEUE_H_
#define PUSHERPQUEUE_H_
class ListSemaphore {
private:
unsigned int count_;
boost::mutex countMtx_;
boost::condition_variable cond_;
public:
ListSemaphore(unsigned int initialCount) {
count_ = initialCount;
}
void post() {
boost::unique_lock<boost::mutex> lock(countMtx_);
count_++;
cond_.notify_one();
}
void wait() {
boost::unique_lock<boost::mutex> lock(countMtx_);
while (count_ == 0) {
cond_.wait(lock);
}
count_--;
}
};
template <class T>
class PusherPQueue
{
protected:
struct pListElement_ {
T elem_;
boost::posix_time::ptime dueTime_;
};
ListSemaphore pListSem_;
std::list<struct pListElement_> pList_;
bool abort_;
public:
void insert(T element, boost::posix_time::ptime dueTime);
void sleepUntilNext();
T popNext();
bool empty();
void abort();
unsigned int size();
PusherPQueue();
virtual ~PusherPQueue();
};
template <class T>
void PusherPQueue<T>::insert(T element, boost::posix_time::ptime dueTime)
{
/*
* Insert the element into the priority queue and increase
* the queue length semaphore.
*/
struct pListElement_ e;
e.elem_ = element;
e.dueTime_ = dueTime;
typename std::list<struct pListElement_>::iterator iter;
for (iter = pList_.begin(); iter != pList_.end(); iter++) {
if ((*iter).dueTime_ > dueTime) {
pList_.insert(iter, e);
pListSem_.post();
return;
}
}
/*
* In case the list was empty or the due time of
* the element is after the last element in the
* list, we'll end up here.
*/
pList_.push_back(e);
pListSem_.post();
}
template <class T>
void PusherPQueue<T>::sleepUntilNext()
{
/*
* Block if there is no element in the list. As the
* element is not being removed by this function,
* post the semaphore right after.
*/
pListSem_.wait();
if(abort_)
return;
pListSem_.post();
/*
* Sleep until the first element in the queue
* is due.
*/
pListElement_ first;
first = pList_.front();
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = first.dueTime_ - now;
if (diff.is_negative())
return;
usleep(diff.total_microseconds());
}
template <class T>
T PusherPQueue<T>::popNext()
{
/*
* Pop the first element from the priority queue.
*/
pListSem_.wait();
T ret = pList_.front().elem_;
pList_.pop_front();
return ret;
}
template <class T>
bool PusherPQueue<T>::empty()
{
/*
* Return true if the list is empty.
*/
return pList_.empty();
}
template <class T>
void PusherPQueue<T>::abort()
{
/*
* Break out of a blocked sleepUntilNext()...
*/
abort_ = true;
pListSem_.post();
}
template <class T>
unsigned int PusherPQueue<T>::size()
{
/*
* Return the size of the list
*/
return pList_.size();
}
template <class T>
PusherPQueue<T>::PusherPQueue() : pListSem_(0), abort_(false)
{
/*
* Nothing to be done here...
*/
}
template <class T>
PusherPQueue<T>::~PusherPQueue()
{
/*
* Nothing to be done here...
*/
}
#endif /* PUSHERPQUEUE_H_ */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment