Commit 67e7d960 authored by Michael Ott's avatar Michael Ott
Browse files

Add query parameter avg to REST API to average sensor readings

parent de067505
...@@ -7,7 +7,7 @@ OBJS = collectagent.o \ ...@@ -7,7 +7,7 @@ OBJS = collectagent.o \
simplemqttserver.o \ simplemqttserver.o \
simplemqttserverthread.o \ simplemqttserverthread.o \
simplemqttservermessage.o simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -L../lib -ldcdb -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_regex -lcppnetlib-server-parsers LIBS = -L$(DCDBDEPLOYPATH)/lib/ -L../lib -ldcdb -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_regex -lcppnetlib-server-parsers -lcppnetlib-uri
TARGET = collectagent TARGET = collectagent
.PHONY : clean install .PHONY : clean install
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include <boost/network/protocol/http/server.hpp> #include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp> #include <boost/network/utils/thread_pool.hpp>
#include <boost/network/uri.hpp>
#include <iostream> #include <iostream>
#include <cstdlib> #include <cstdlib>
#include <signal.h> #include <signal.h>
...@@ -78,7 +79,12 @@ struct httpHandler_t { ...@@ -78,7 +79,12 @@ struct httpHandler_t {
//try getting the latest value //try getting the latest value
try { try {
uint64_t val = mySensorCache.getSensor(request.destination); boost::network::uri::uri uri("http://localhost"+request.destination);
std::map<std::string, std::string> queries;
boost::network::uri::query_map(uri, queries);
int avg = atoi(queries.find("avg")->second.c_str());
uint64_t val = mySensorCache.getSensor(uri.path(), (uint64_t) avg);
data << val << "\n"; data << val << "\n";
//data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl; //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
......
...@@ -6,10 +6,13 @@ ...@@ -6,10 +6,13 @@
*/ */
#include "sensorcache.h" #include "sensorcache.h"
#include "messaging.h" #include <dcdb/timestamp.h>
#include <exception> #include <exception>
#include <iostream> #include <iostream>
#include <iomanip>
#include <algorithm>
namespace DCDB { namespace DCDB {
SensorCache::SensorCache() { SensorCache::SensorCache() {
...@@ -29,32 +32,31 @@ void SensorCache::storeSensor(SensorId sid, uint64_t ts, uint64_t val) { ...@@ -29,32 +32,31 @@ void SensorCache::storeSensor(SensorId sid, uint64_t ts, uint64_t val) {
sensorCache_t::iterator it = sensorCache.find(sid); sensorCache_t::iterator it = sensorCache.find(sid);
if (it != sensorCache.end()) { if (it != sensorCache.end()) {
e = it->second; e = it->second;
e.deltaT.push_back((ts - e.timestamp)/1000000); if (e.front().timestamp+MAX_HISTORY_NS > ts) {
if (e.deltaT.size() > 5) { e.pop_front();
e.deltaT.pop_front();
} }
} }
e.val = val; sensorReading_t s = { val, ts };
e.timestamp = ts; sensorCache[sid].push_back(s);
sensorCache[sid] = e;
} }
bool SensorCache::checkValid(cacheEntry_t &entry) { bool SensorCache::checkValid(cacheEntry_t &entry) {
if (entry.deltaT.size()) { if (entry.size() > 2) {
uint64_t ts = Messaging::calculateTimestamp(); TimeStamp ts;
uint64_t avg = 0; uint64_t avg = 0;
for (std::list<uint32_t>::iterator dt = entry.deltaT.begin(); dt != entry.deltaT.end(); dt++) { cacheEntry_t::iterator it = entry.begin();
avg+= *dt; uint64_t prev = it->timestamp;
for (it++; it != entry.end(); it++) {
avg+= it->timestamp - prev;
prev = it->timestamp;
} }
avg/= entry.deltaT.size(); avg/= (entry.size()-1);
/* /*
* A SID is outdated if it's older than 5x the average sampling period. * A SID is outdated if it's older than 5x the average sampling period.
* The 1000000 accounts for the conversion ns->ms in deltaT.
*/ */
if ((ts - entry.timestamp) > 5 * 1000000 * avg) { if ((ts.getRaw() - entry.back().timestamp) > 5 * avg) {
return false; return false;
} }
} }
...@@ -62,29 +64,67 @@ bool SensorCache::checkValid(cacheEntry_t &entry) { ...@@ -62,29 +64,67 @@ bool SensorCache::checkValid(cacheEntry_t &entry) {
return true; return true;
} }
uint64_t SensorCache::getSensor(SensorId sid) { int64_t SensorCache::getAverage(cacheEntry_t &entry, uint64_t avg) {
TimeStamp ts;
if (entry.size() > 0) {
//std::cout << ts.getRaw() - entry.back().timestamp << std::endl;
if (ts.getRaw() - entry.back().timestamp > avg * NS_PER_S) {
throw std::out_of_range("Sid outdated");
return 0;
}
double sum = 0;
cacheEntry_t::reverse_iterator it, prev;
it = prev = entry.rbegin();
it++;
while ((it != entry.rend()) && ((ts.getRaw() - it->timestamp) <= avg * NS_PER_S)) {
uint64_t deltaT = (prev->timestamp - it->timestamp);
sum+= ((it->val + prev->val) / 2) * deltaT;
//std::cout << "SensorCache::getAverage sum=" << sum << " deltaT=" <<deltaT << " it=(" << it->timestamp << "," <<it->val <<") prev=(" << prev->timestamp << "," << prev->val <<") " << (ts.getRaw() - it->timestamp) << std::endl;
prev = it++;
}
//std::cout << "SensorCache::getAverage (" << prev->timestamp << "," <<prev->val <<") (" << entry.back().timestamp << "," << entry.back().val << ") sum=" << sum << " deltaT=" << entry.back().timestamp - prev->timestamp << std::endl;
if (prev == entry.rbegin()) {
return entry.back().val;
} else {
return sum/(entry.back().timestamp - prev->timestamp);
}
}
throw std::invalid_argument("Sid not found");
return 0;
}
uint64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
/* Remove the reserved bytes to leverage the standard find function */ /* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0); sid.setRsvd(0);
sensorCache_t::iterator it = sensorCache.find(sid); sensorCache_t::iterator it = sensorCache.find(sid);
if (it == sensorCache.end()) { if (it == sensorCache.end()) {
throw std::invalid_argument("Sid not found"); throw std::invalid_argument("Sid not found");
return 0;
} }
if (!checkValid(it->second)) if (!checkValid(it->second))
{ {
throw std::out_of_range("Sid outdated"); throw std::out_of_range("Sid outdated");
return 0;
} }
return it->second.val; if (avg) {
return getAverage(it->second, avg);
} else {
return it->second.back().val;
}
} }
uint64_t SensorCache::getSensor(std::string topic) { uint64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
topic.erase(std::remove(topic.begin(), topic.end(), '/'), topic.end()); topic.erase(std::remove(topic.begin(), topic.end(), '/'), topic.end());
size_t wp = topic.find("*"); size_t wp = topic.find("*");
if (wp == std::string::npos) { if (wp == std::string::npos) {
return getSensor(DCDB::SensorId(topic)); return getSensor(DCDB::SensorId(topic), avg);
} }
int wl = 33 - topic.length(); int wl = 33 - topic.length();
...@@ -108,7 +148,7 @@ uint64_t SensorCache::getSensor(std::string topic) { ...@@ -108,7 +148,7 @@ uint64_t SensorCache::getSensor(std::string topic) {
/* We only return one value, even if multiple SensorIds would match. /* We only return one value, even if multiple SensorIds would match.
* At least make sure it's the most recent value * At least make sure it's the most recent value
*/ */
if (checkValid(it->second) && ((mostRecentSidIt == sensorCache.end()) || mostRecentSidIt->second.timestamp < it->second.timestamp)) { if (checkValid(it->second) && ((mostRecentSidIt == sensorCache.end()) || mostRecentSidIt->second.back().timestamp < it->second.back().timestamp)) {
mostRecentSidIt = it; mostRecentSidIt = it;
} }
} }
...@@ -124,26 +164,27 @@ uint64_t SensorCache::getSensor(std::string topic) { ...@@ -124,26 +164,27 @@ uint64_t SensorCache::getSensor(std::string topic) {
} }
} }
return mostRecentSidIt->second.val; if (avg) {
return getAverage(mostRecentSidIt->second, avg);
} else {
return mostRecentSidIt->second.back().val;
}
} }
void SensorCache::dump() { void SensorCache::dump() {
sensorCache_t::iterator it; std::cout << "SensorCache Dump:" << std::endl;
for (sensorCache_t::iterator sit = sensorCache.begin(); sit != sensorCache.end(); sit++) {
std::cout << "SensorCache Dump:" << std::endl; std::cout << " id=" << sit->first.toString() << " data=[";
for (it = sensorCache.begin(); it != sensorCache.end(); it++) { for (cacheEntry_t::iterator eit = sit->second.begin(); eit != sit->second.end(); eit++) {
std::cout << " id=" << it->first.toString() << " val=" << it->second.val << " ts=" << it->second.timestamp; if (eit != sit->second.begin()) {
if (it->second.deltaT.size()) { std::cout << ",";
uint64_t avg = 0; }
std::cout << " deltaT=["; std::cout << "(" << eit->val << "," << eit->timestamp/NS_PER_S << "." << std::setfill ('0') << std::setw (9) << eit->timestamp%NS_PER_S << ")";
for (std::list<uint32_t>::iterator dt = it->second.deltaT.begin(); dt != it->second.deltaT.end(); dt++) { }
avg+= *dt; std::cout << "]" << std::endl;
std::cout << " " << *dt; }
}
std::cout << "] avg=" << avg/it->second.deltaT.size();
}
std::cout << std::endl;
}
} }
} /* namespace DCDB */ } /* namespace DCDB */
...@@ -13,13 +13,15 @@ ...@@ -13,13 +13,15 @@
#include <utility> #include <utility>
#include <dcdb/sensorid.h> #include <dcdb/sensorid.h>
#define MAX_HISTORY_NS 60000000 // Store max 60s of historic data
namespace DCDB { namespace DCDB {
typedef struct { typedef struct {
int64_t val; int64_t val;
uint64_t timestamp; uint64_t timestamp;
std::list<uint32_t> deltaT; } sensorReading_t;
} cacheEntry_t; typedef std::list<sensorReading_t> cacheEntry_t;
typedef std::map<SensorId, cacheEntry_t> sensorCache_t; typedef std::map<SensorId, cacheEntry_t> sensorCache_t;
class SensorCache { class SensorCache {
...@@ -43,7 +45,7 @@ public: ...@@ -43,7 +45,7 @@ public:
* @throws std::invalid_argument if the SensorId doesn't exist in the SensorCache. * @throws std::invalid_argument if the SensorId doesn't exist in the SensorCache.
* @throws std::out_of_range if the sid was found in the cache entry but is outdated. * @throws std::out_of_range if the sid was found in the cache entry but is outdated.
*/ */
uint64_t getSensor(SensorId sid); uint64_t getSensor(SensorId sid, uint64_t avg=0);
/** /**
* @brief Return a sensor reading from the SensorCache. * @brief Return a sensor reading from the SensorCache.
...@@ -52,7 +54,7 @@ public: ...@@ -52,7 +54,7 @@ public:
* @throws std::invalid_argument if the topic couldn't be found in the SensorCache. * @throws std::invalid_argument if the topic couldn't be found in the SensorCache.
* @throws std::out_of_range if the topic was found in the cache entry but is outdated. * @throws std::out_of_range if the topic was found in the cache entry but is outdated.
*/ */
uint64_t getSensor(std::string topic); uint64_t getSensor(std::string topic, uint64_t avg=0);
/** /**
* @brief Dump the contents of the SensorCache to stdout. * @brief Dump the contents of the SensorCache to stdout.
...@@ -61,6 +63,7 @@ public: ...@@ -61,6 +63,7 @@ public:
private: private:
bool checkValid(cacheEntry_t &entry); bool checkValid(cacheEntry_t &entry);
int64_t getAverage(cacheEntry_t &entry, uint64_t avg);
sensorCache_t sensorCache; sensorCache_t sensorCache;
}; };
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment