Commit c3e7a8cf authored by daniele's avatar daniele
Browse files

Merge branch 'master' of ssh://deep-ras.srv.lrz.de/git/dcdb

parents 1f6a1ebd 6467864e
......@@ -140,14 +140,13 @@ void mqttCallback(SimpleMQTTMessage *msg)
uint64_t val;
uint64_t ts;
uint64_t msgTs = Messaging::calculateTimestamp();
//In the 64 bit message case, the collect agent provides a timestamp
if(msg->getPayloadLength()==sizeof(uint64_t)) {
//printf("Providing timestamp...\n");
val = *((uint64_t*)msg->getPayload());
ts = msgTs;
ts = Messaging::calculateTimestamp();
//printf("Val = %" PRIu64 ", timestamp = %" PRIu64 "\n", val, ts);
}
......@@ -192,6 +191,7 @@ void mqttCallback(SimpleMQTTMessage *msg)
#endif
mySensorDataStore->insert(&sid, ts, val);
mySensorCache.storeSensor(sid, ts, val);
//mySensorCache.dump();
}
#if 1
else {
......
......@@ -6,6 +6,7 @@
*/
#include "sensorcache.h"
#include "messaging.h"
#include <exception>
#include <iostream>
......@@ -21,23 +22,47 @@ SensorCache::~SensorCache() {
}
void SensorCache::storeSensor(SensorId sid, uint64_t ts, uint64_t val) {
cacheEntry_t e(val, ts);
cacheEntry_t e;
/* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0);
sensorCache_t::iterator it = sensorCache.find(sid);
if (it != sensorCache.end()) {
e = it->second;
e.deltaT.push_back((ts - e.timestamp)/1000000);
if (e.deltaT.size() > 5) {
e.deltaT.pop_front();
}
}
e.val = val;
e.timestamp = ts;
sensorCache[sid] = e;
}
uint64_t SensorCache::getSensor(SensorId sid) {
/* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0);
sensorCache_t::iterator it = sensorCache.find(sid);
if (it == sensorCache.end()) {
throw std::out_of_range("Sid not found");
throw std::invalid_argument("Sid not found");
}
if (it->second.deltaT.size()) {
uint64_t ts = Messaging::calculateTimestamp() / 1000000;
uint64_t avg = 0;
for (std::list<uint32_t>::iterator dt = it->second.deltaT.begin(); dt != it->second.deltaT.end(); dt++) {
avg+= *dt;
}
avg/= it->second.deltaT.size();
if ((ts - it->second.timestamp) > 5 * avg) {
throw std::out_of_range("Sid outdated");
}
}
return it->second.first;
return it->second.val;
}
void SensorCache::dump() {
......@@ -45,7 +70,17 @@ void SensorCache::dump() {
std::cout << "SensorCache Dump:" << std::endl;
for (it = sensorCache.begin(); it != sensorCache.end(); it++) {
std::cout << " id=" << it->first.toString() << " val=" << it->second.first << " ts=" << it->second.second << std::endl;
std::cout << " id=" << it->first.toString() << " val=" << it->second.val << " ts=" << it->second.timestamp;
if (it->second.deltaT.size()) {
uint64_t avg = 0;
std::cout << " deltaT=[";
for (std::list<uint32_t>::iterator dt = it->second.deltaT.begin(); dt != it->second.deltaT.end(); dt++) {
avg+= *dt;
std::cout << " " << *dt;
}
std::cout << "] avg=" << avg/it->second.deltaT.size();
}
std::cout << std::endl;
}
}
......
......@@ -9,12 +9,17 @@
#define COLLECTAGENT_SENSORCACHE_H_
#include <map>
#include <list>
#include <utility>
#include <dcdb/sensorid.h>
namespace DCDB {
typedef std::pair<uint64_t, uint64_t> cacheEntry_t;
typedef struct {
int64_t val;
uint64_t timestamp;
std::list<uint32_t> deltaT;
} cacheEntry_t;
typedef std::map<SensorId, cacheEntry_t> sensorCache_t;
class SensorCache {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment