Commit 39db19f7 authored by Alessio Netti's avatar Alessio Netti

Merge remote-tracking branch 'remotes/origin/master' into development

parents af94a593 9f7d98e3
...@@ -54,9 +54,11 @@ bool AnalyticsController::initialize(Configuration& settings) { ...@@ -54,9 +54,11 @@ bool AnalyticsController::initialize(Configuration& settings) {
QueryEngine &_queryEngine = QueryEngine::getInstance(); QueryEngine &_queryEngine = QueryEngine::getInstance();
if(_manager->probe(settings.cfgFilePath, settings.cfgFileName)) { if(_manager->probe(settings.cfgFilePath, settings.cfgFileName)) {
vector<string> topics; vector<string> topics;
_metadataStore->wait();
for(const auto& kv : _metadataStore->getMap()) for(const auto& kv : _metadataStore->getMap())
if(kv.second.isValid()) if(kv.second.isValid())
topics.push_back(*kv.second.getPattern()); topics.push_back(*kv.second.getPattern());
_metadataStore->release();
// Building the sensor navigator // Building the sensor navigator
try { try {
......
...@@ -151,19 +151,15 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s ...@@ -151,19 +151,15 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s
DCDB::SensorId sid; DCDB::SensorId sid;
// Creating a SID to perform the query // Creating a SID to perform the query
if (sid.mqttTopicConvert(topic)) { if (sid.mqttTopicConvert(topic)) {
try { mySensorCache.wait();
mySensorCache.wait(); if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) {
if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) { // Data was found, can continue to next SID
// Data was found, can continue to next SID successCtr++;
successCtr++; } else {
} else { // This happens only if no data was found in the local cache
// This happens only if no data was found in the local cache
topics.push_back(sid);
}
// To handle nasty (yet rare) race conditions on the sensor cache
} catch(const std::exception& e) {
topics.push_back(sid); topics.push_back(sid);
} }
mySensorCache.release();
} }
} }
// If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
...@@ -226,15 +222,13 @@ bool metadataQueryCallback(const string& name, SensorMetadata& buffer) { ...@@ -226,15 +222,13 @@ bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
} catch(const std::domain_error& e) {} } catch(const std::domain_error& e) {}
bool local = false; bool local = false;
try { metadataStore->wait();
metadataStore->wait(); if(metadataStore->getMap().count(topic)) {
if(metadataStore->getMap().count(topic)) { buffer = metadataStore->get(topic);
buffer = metadataStore->get(topic); local = true;
local = true;
}
} }
catch(const std::exception& e) {} metadataStore->release();
if(!local) { if(!local) {
// If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
try { try {
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
SensorCache::SensorCache(uint64_t maxHistory) { SensorCache::SensorCache(uint64_t maxHistory) {
this->_maxHistory = maxHistory; this->_maxHistory = maxHistory;
this->_updating.store(false); this->_updating.store(false);
this->_access.store(0);
} }
SensorCache::~SensorCache() { SensorCache::~SensorCache() {
...@@ -42,43 +43,44 @@ sensorCache_t& SensorCache::getSensorMap() { ...@@ -42,43 +43,44 @@ sensorCache_t& SensorCache::getSensorMap() {
void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) { void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
reading_t s = { val, ts }; reading_t s = { val, ts };
bool ownLock = false;
/* Remove the reserved bytes to leverage the standard find function */ /* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0); sid.setRsvd(0);
try { wait();
if (sensorCache.find(sid) == sensorCache.end()) { auto sIt = sensorCache.find(sid);
// Spinning on the lock if(sIt!=sensorCache.end()) {
while (_updating.exchange(true)) {} sIt->second.store(s);
ownLock = true; release();
sensorCache[sid] = CacheEntry(_maxHistory); } else {
_updating.store(false); release();
} // Spinning on the lock
while (_updating.exchange(true)) {}
while(_access.load()>0) {}
sensorCache[sid] = CacheEntry(_maxHistory);
sensorCache[sid].store(s); sensorCache[sid].store(s);
} catch(const std::exception& e) { _updating.store(false);
if(ownLock)
_updating.store(false);
} }
} }
int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) { int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
/* Remove the reserved bytes to leverage the standard find function */ /* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0); sid.setRsvd(0);
wait();
sensorCache_t::iterator it = sensorCache.find(sid); sensorCache_t::iterator it = sensorCache.find(sid);
if (it == sensorCache.end()) { if (it == sensorCache.end()) {
release();
throw std::invalid_argument("Sid not found"); throw std::invalid_argument("Sid not found");
} }
if (!it->second.checkValid()) if (!it->second.checkValid())
{ {
release();
throw std::out_of_range("Sid outdated"); throw std::out_of_range("Sid outdated");
} }
if (avg) { int64_t val = avg ? it->second.getAverage(avg) : it->second.getLatest().value;
return it->second.getAverage(avg); release();
} else { return val;
return it->second.getLatest().value;
}
} }
// Wildcards are not supported with string topics // Wildcards are not supported with string topics
...@@ -153,6 +155,7 @@ uint64_t SensorCache::clean(uint64_t t) { ...@@ -153,6 +155,7 @@ uint64_t SensorCache::clean(uint64_t t) {
uint64_t ctr = 0; uint64_t ctr = 0;
// Spinning on the lock // Spinning on the lock
while (_updating.exchange(true)) {} while (_updating.exchange(true)) {}
while(_access.load()>0) {}
for (auto it = sensorCache.cbegin(); it != sensorCache.cend();) { for (auto it = sensorCache.cbegin(); it != sensorCache.cend();) {
uint64_t latestTs = it->second.getLatest().timestamp; uint64_t latestTs = it->second.getLatest().timestamp;
if (latestTs!=0 && latestTs < thresh) { if (latestTs!=0 && latestTs < thresh) {
......
...@@ -110,7 +110,14 @@ public: ...@@ -110,7 +110,14 @@ public:
*/ */
const void wait() { const void wait() {
while(_updating.load()) {} while(_updating.load()) {}
return; ++_access;
}
/**
* @brief Reduces the internal reading counter.
*/
const void release() {
--_access;
} }
/** /**
...@@ -134,6 +141,7 @@ private: ...@@ -134,6 +141,7 @@ private:
uint64_t _maxHistory; uint64_t _maxHistory;
// Spinlock to regulate map modifications // Spinlock to regulate map modifications
std::atomic<bool> _updating; std::atomic<bool> _updating;
std::atomic<int> _access;
}; };
......
...@@ -324,6 +324,7 @@ public: ...@@ -324,6 +324,7 @@ public:
*/ */
MetadataStore() { MetadataStore() {
_updating.store(false); _updating.store(false);
_access.store(0);
} }
/** /**
...@@ -352,7 +353,14 @@ public: ...@@ -352,7 +353,14 @@ public:
*/ */
const void wait() { const void wait() {
while(_updating.load()) {} while(_updating.load()) {}
return; ++_access;
}
/**
* @brief Reduces the internal reading counter.
*/
const void release() {
--_access;
} }
/** /**
...@@ -367,6 +375,7 @@ public: ...@@ -367,6 +375,7 @@ public:
bool store(const string& key, const SensorMetadata& s) { bool store(const string& key, const SensorMetadata& s) {
// Spinlock to update the metadata store // Spinlock to update the metadata store
while(_updating.exchange(true)) {} while(_updating.exchange(true)) {}
while(_access.load()>0) {}
bool overwritten = !_metadata.count(key); bool overwritten = !_metadata.count(key);
_metadata[key] = s; _metadata[key] = s;
_updating.store(false); _updating.store(false);
...@@ -413,10 +422,17 @@ public: ...@@ -413,10 +422,17 @@ public:
* @param key Sensor key to be queried * @param key Sensor key to be queried
* @return A reference to a sensorMetadata_t object * @return A reference to a sensorMetadata_t object
*/ */
const SensorMetadata& get(const string& key) { SensorMetadata get(const string& key) {
if(!_metadata.count(key)) wait();
auto it = _metadata.find(key);
if(it==_metadata.end()) {
release();
throw invalid_argument("MetadataStore: key " + key + " does not exist!"); throw invalid_argument("MetadataStore: key " + key + " does not exist!");
return _metadata[key]; } else {
SensorMetadata sm = it->second;
release();
return sm;
}
} }
/** /**
...@@ -431,13 +447,10 @@ public: ...@@ -431,13 +447,10 @@ public:
*/ */
int64_t getTTL(const string& key) { int64_t getTTL(const string& key) {
int64_t ttl = 0; int64_t ttl = 0;
try { wait();
wait(); auto it = _metadata.find(key);
auto it = _metadata.find(key); ttl = it==_metadata.end() || !it->second.getTTL() ? -1 : *it->second.getTTL()/1000000000;
ttl = it==_metadata.end() || !it->second.getTTL() ? -1 : *it->second.getTTL()/1000000000; release();
} catch(const std::exception& e) {
ttl = -1;
}
return ttl; return ttl;
} }
...@@ -469,6 +482,7 @@ protected: ...@@ -469,6 +482,7 @@ protected:
unordered_map<string, SensorMetadata> _metadata; unordered_map<string, SensorMetadata> _metadata;
atomic<bool> _updating; atomic<bool> _updating;
atomic<int> _access;
}; };
......
...@@ -315,7 +315,10 @@ void MQTTPusher::computeMsgRate() { ...@@ -315,7 +315,10 @@ void MQTTPusher::computeMsgRate() {
bool dynWarning = false; bool dynWarning = false;
for (auto &p : _plugins) { for (auto &p : _plugins) {
for (const auto &g : p.configurator->getSensorGroups()) { for (const auto &g : p.configurator->getSensorGroups()) {
msgRate += (float)g->acquireSensors().size() * (1000.0f / (float)g->getInterval()) / (float)g->getMinValues(); for(const auto &s : g->acquireSensors()) {
if (s->getSubsampling() > 0)
msgRate += (1000.0f / (float) g->getInterval()) / ((float) g->getMinValues() * s->getSubsampling());
}
g->releaseSensors(); g->releaseSensors();
} }
} }
...@@ -323,7 +326,10 @@ void MQTTPusher::computeMsgRate() { ...@@ -323,7 +326,10 @@ void MQTTPusher::computeMsgRate() {
for (const auto &op : p.configurator->getOperators()) { for (const auto &op : p.configurator->getOperators()) {
if (op->getStreaming() && !op->getDynamic()) { if (op->getStreaming() && !op->getDynamic()) {
for (const auto &u : op->getUnits()) for (const auto &u : op->getUnits())
msgRate += (float)u->getBaseOutputs().size() * (1000.0f / (float)op->getInterval()) / (float)op->getMinValues(); for(const auto &s : u->getBaseOutputs()) {
if (s->getSubsampling() > 0)
msgRate += (1000.0f / (float) op->getInterval()) / ((float) op->getMinValues() * s->getSubsampling());
}
op->releaseUnits(); op->releaseUnits();
} else if (op->getDynamic()) } else if (op->getDynamic())
dynWarning = true; dynWarning = true;
......
...@@ -372,6 +372,8 @@ bool ConnectionImpl::connect() { ...@@ -372,6 +372,8 @@ bool ConnectionImpl::connect() {
cass_cluster_set_num_threads_io(cluster, numThreadsIo); cass_cluster_set_num_threads_io(cluster, numThreadsIo);
cass_cluster_set_queue_size_io(cluster, queueSizeIo); cass_cluster_set_queue_size_io(cluster, queueSizeIo);
cass_cluster_set_core_connections_per_host(cluster, coreConnPerHost); cass_cluster_set_core_connections_per_host(cluster, coreConnPerHost);
//TODO: avoid this and use actual paging in queries
cass_cluster_set_request_timeout(cluster, 300000);
/* Force protcol version to 1 */ /* Force protcol version to 1 */
cass_cluster_set_protocol_version(cluster, 1); cass_cluster_set_protocol_version(cluster, 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment