Commit 7cd2bcac authored by Alessio Netti's avatar Alessio Netti
Browse files

Proper reader-writer synchronization in SensorCache

parent ed361bad
Loading
Loading
Loading
Loading
+13 −19
Original line number Diff line number Diff line
@@ -151,7 +151,6 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
            try {
            mySensorCache.wait();
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) {
                // Data was found, can continue to next SID
@@ -160,10 +159,7 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s
                // This happens only if no data was found in the local cache
                topics.push_back(sid);
            }
            // To handle nasty (yet rare) race conditions on the sensor cache
            } catch(const std::exception& e) {
                topics.push_back(sid);
            }
            mySensorCache.release();
        }
    }
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
@@ -226,14 +222,12 @@ bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    } catch(const std::domain_error& e) {}
    
    bool local = false;
    try {
    metadataStore->wait();
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
        local = true;
    }
    }
    catch(const std::exception& e) {}
    metadataStore->release();
        
    if(!local) {
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
+15 −12
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@
SensorCache::SensorCache(uint64_t maxHistory) {
    this->_maxHistory = maxHistory;
    this->_updating.store(false);
    this->_access.store(0);
}

SensorCache::~SensorCache() {
@@ -42,20 +43,21 @@ sensorCache_t& SensorCache::getSensorMap() {

void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
  reading_t s = { val, ts };
  bool ownLock = false;
  /* Remove the reserved bytes to leverage the standard find function */
  sid.setRsvd(0);
  try {
      if (sensorCache.find(sid) == sensorCache.end()) {
  wait();
  auto sIt = sensorCache.find(sid);
  if(sIt!=sensorCache.end()) {
      CacheEntry& ce = sIt->second;
      release();
      ce.store(s);
  } else {
      release();
      // Spinning on the lock
      while (_updating.exchange(true)) {}
          ownLock = true;
      while(_access.load()>0) {}
      sensorCache[sid] = CacheEntry(_maxHistory);
          _updating.store(false);
      }
      sensorCache[sid].store(s);
  } catch(const std::exception& e) {
      if(ownLock)
      _updating.store(false);
  }
}
@@ -153,6 +155,7 @@ uint64_t SensorCache::clean(uint64_t t) {
    uint64_t ctr = 0;
    // Spinning on the lock
    while (_updating.exchange(true)) {}
    while(_access.load()>0) {}
    for (auto it = sensorCache.cbegin(); it != sensorCache.cend();) {
        uint64_t latestTs = it->second.getLatest().timestamp;
        if (latestTs!=0 && latestTs < thresh) {
+9 −1
Original line number Diff line number Diff line
@@ -110,7 +110,14 @@ public:
	*/
	const void wait() {
		while(_updating.load()) {}
		return;
		++_access;
	}

	/**
	* @brief               Reduces the internal reading counter.
	*/
	const void release() {
		--_access;
	}

    /**
@@ -134,6 +141,7 @@ private:
	uint64_t _maxHistory;
	// Spinlock to regulate map modifications
	std::atomic<bool> _updating;
	std::atomic<int> _access;

};

+24 −10
Original line number Diff line number Diff line
@@ -324,6 +324,7 @@ public:
     */
    MetadataStore() {
        _updating.store(false);
        _access.store(0);
    }
    
    /**
@@ -352,7 +353,14 @@ public:
     */
    const void wait() {
        while(_updating.load()) {}
        return;
        ++_access;
    }

    /**
     * @brief               Reduces the internal reading counter.
     */
    const void release() {
        --_access;
    }
    
    /**
@@ -367,6 +375,7 @@ public:
    bool store(const string& key, const SensorMetadata& s) {
        // Spinlock to update the metadata store
        while(_updating.exchange(true)) {}
        while(_access.load()>0) {}
        bool overwritten = !_metadata.count(key);
        _metadata[key] = s;
        _updating.store(false);
@@ -414,9 +423,16 @@ public:
     * @return              A reference to a sensorMetadata_t object
     */
    const SensorMetadata& get(const string& key) {
        if(!_metadata.count(key))
        wait();
        auto it = _metadata.find(key);
        if(it==_metadata.end()) {
            release();
            throw invalid_argument("MetadataStore: key " + key + " does not exist!");
        return _metadata[key];
        } else {
            const SensorMetadata& sm = it->second;
            release();
            return sm;
        }
    }

    /**
@@ -431,13 +447,10 @@ public:
     */
    int64_t getTTL(const string& key) {
        int64_t ttl = 0;
        try {
        wait();
        auto it = _metadata.find(key);
        ttl = it==_metadata.end() || !it->second.getTTL() ? -1 : *it->second.getTTL()/1000000000;
        } catch(const std::exception& e) {
            ttl = -1;
        }
        release();
        return ttl;
    }
    
@@ -469,6 +482,7 @@ protected:
    
    unordered_map<string, SensorMetadata> _metadata;
    atomic<bool> _updating;
    atomic<int>  _access;
    
};