Commit 25c554c8 authored by Alessio Netti's avatar Alessio Netti

Merge remote-tracking branch 'remotes/origin/master' into development

parents 214281cf ed361bad
......@@ -151,11 +151,17 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s
DCDB::SensorId sid;
// Creating a SID to perform the query
if (sid.mqttTopicConvert(topic)) {
if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) {
// Data was found, can continue to next SID
successCtr++;
} else {
// This happens only if no data was found in the local cache
try {
mySensorCache.wait();
if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) {
// Data was found, can continue to next SID
successCtr++;
} else {
// This happens only if no data was found in the local cache
topics.push_back(sid);
}
// To handle nasty (yet rare) race conditions on the sensor cache
} catch(const std::exception& e) {
topics.push_back(sid);
}
}
......@@ -219,9 +225,17 @@ bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
topic = queryEngine.getNavigator()->getNodeTopic(name);
} catch(const std::domain_error& e) {}
if(metadataStore->getMap().count(topic)) {
buffer = metadataStore->get(topic);
} else {
bool local = false;
try {
metadataStore->wait();
if(metadataStore->getMap().count(topic)) {
buffer = metadataStore->get(topic);
local = true;
}
}
catch(const std::exception& e) {}
if(!local) {
// If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
try {
DCDB::PublicSensor publicSensor;
......
......@@ -29,6 +29,7 @@
SensorCache::SensorCache(uint64_t maxHistory) {
this->_maxHistory = maxHistory;
this->_updating.store(false);
}
SensorCache::~SensorCache() {
......@@ -41,12 +42,22 @@ sensorCache_t& SensorCache::getSensorMap() {
void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
reading_t s = { val, ts };
bool ownLock = false;
/* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0);
//TODO: Check for thread-safety
if(sensorCache.find(sid) == sensorCache.end())
sensorCache[sid] = CacheEntry(_maxHistory);
sensorCache[sid].store(s);
try {
if (sensorCache.find(sid) == sensorCache.end()) {
// Spinning on the lock
while (_updating.exchange(true)) {}
ownLock = true;
sensorCache[sid] = CacheEntry(_maxHistory);
_updating.store(false);
}
sensorCache[sid].store(s);
} catch(const std::exception& e) {
if(ownLock)
_updating.store(false);
}
}
int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
......@@ -140,6 +151,8 @@ void SensorCache::dump() {
uint64_t SensorCache::clean(uint64_t t) {
uint64_t thresh = getTimestamp() - t;
uint64_t ctr = 0;
// Spinning on the lock
while (_updating.exchange(true)) {}
for (auto it = sensorCache.cbegin(); it != sensorCache.cend();) {
uint64_t latestTs = it->second.getLatest().timestamp;
if (latestTs!=0 && latestTs < thresh) {
......@@ -148,5 +161,6 @@ uint64_t SensorCache::clean(uint64_t t) {
} else
++it;
}
_updating.store(false);
return ctr;
}
......@@ -29,6 +29,7 @@
#define COLLECTAGENT_SENSORCACHE_H_
#include <map>
#include <atomic>
#include <dcdb/sensorid.h>
#include <dcdb/timestamp.h>
#include "cacheentry.h"
......@@ -104,6 +105,14 @@ public:
**/
uint64_t clean(uint64_t t);
/**
* @brief Waits for internal updates to finish.
*/
const void wait() {
while(_updating.load()) {}
return;
}
/**
* @brief Set a new maximum cache length.
*
......@@ -123,6 +132,8 @@ private:
sensorCache_t sensorCache;
// Global maximum allowed time frame for the sensor caches
uint64_t _maxHistory;
// Spinlock to regulate map modifications
std::atomic<bool> _updating;
};
......
......@@ -30,6 +30,7 @@
#include <set>
#include <unordered_map>
#include <atomic>
#include <string>
#include <boost/property_tree/ptree.hpp>
#include <boost/foreach.hpp>
......@@ -321,7 +322,9 @@ public:
/**
* @brief Public class constructor.
*/
MetadataStore() {}
MetadataStore() {
_updating.store(false);
}
/**
* @brief Class destructor.
......@@ -344,6 +347,14 @@ public:
return _metadata;
}
/**
* @brief Waits for internal updates to finish.
*/
const void wait() {
while(_updating.load()) {}
return;
}
/**
* @brief Stores a sensorMetadata_t object in the internal map.
*
......@@ -354,8 +365,11 @@ public:
* @return True if "key" is unique, False if there was a collision
*/
bool store(const string& key, const SensorMetadata& s) {
// Spinlock to update the metadata store
while(_updating.exchange(true)) {}
bool overwritten = !_metadata.count(key);
_metadata[key] = s;
_updating.store(false);
return overwritten;
}
......@@ -416,8 +430,15 @@ public:
* @return TTL value in seconds
*/
int64_t getTTL(const string& key) {
auto it = _metadata.find(key);
return it==_metadata.end() || !it->second.getTTL() ? -1 : *it->second.getTTL()/1000000000;
int64_t ttl = 0;
try {
wait();
auto it = _metadata.find(key);
ttl = it==_metadata.end() || !it->second.getTTL() ? -1 : *it->second.getTTL()/1000000000;
} catch(const std::exception& e) {
ttl = -1;
}
return ttl;
}
/**
......@@ -447,6 +468,7 @@ public:
protected:
unordered_map<string, SensorMetadata> _metadata;
atomic<bool> _updating;
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment