Commit 35cfab1b authored by Alessio Netti's avatar Alessio Netti
Browse files

DA: bugfixes and documentation

- Integration of data analytics framework in collectagent complete
- REST API integration still pending
- Added some documentation to the code
parent 8e875c36
......@@ -399,4 +399,4 @@ The configuration parameters specific to the _Aggregator_ plugin are the followi
Generating a DCDBAnalytics plugin requires implementing a _Analyzer_ and _Configurator_ class which contain all logic
tied to the specific plugin. Such classes should be derived from _AnalyzerTemplate_ and _AnalyzerConfiguratorTemplate_
respectively, which contain all plugin-agnostic configuration and runtime features. Please refer to the documentation
of the _Average_ plugin for an overview of how a basic plugin can be implemented.
of the _Aggregator_ plugin for an overview of how a basic plugin can be implemented.
......@@ -130,6 +130,7 @@ void AnalyticsController::run() {
}
bool AnalyticsController::publishSensors() {
// Performing auto-publish (if required) for the sensors instantiated by the data analytics framework
if(_settings.pluginSettings.sensorPattern=="")
return false;
......
......@@ -19,10 +19,24 @@
using namespace std;
/**
* Class implementing a wrapper around the AnalyticsManager
*
* This class provides a wrapper around many features required for the instantiation of the Analytics Manager - namely
* the instantiation of a SensorNavigator, the creation of a dedicated thread pool, and the insertion of generated
* sensor values into the Cassandra datastore. Most of these features were already available in DCDBPusher to handle
* regular sensors, but they had to be ported over to the CollectAgent.
*/
class AnalyticsController {
public:
/**
* @brief Class constructor
*
* @param dcdbCfg SensorConfig object to be used to retrieve sensor meta-data from Cassandra
* @param dcdbStore SensorDataStore object to be used to insert sensor readings into Cassandra
*/
AnalyticsController(DCDB::SensorConfig *dcdbCfg, DCDB::SensorDataStore *dcdbStore) {
_dcdbCfg = dcdbCfg;
_dcdbStore = dcdbStore;
......@@ -35,19 +49,83 @@ public:
_initialized = false;
}
/**
* @brief Class destructor
*/
~AnalyticsController() {}
/**
* @brief Starts the internal thread of the controller
*
* Initialization must have been performed already at this point.
*/
void start();
/**
* @brief Stops the internal management thread
*
* This will also stop and join all threads in the BOOST ASIO pool.
*/
void stop();
// Initializes the sensor navigator and data analytics manager
/**
* @brief Initializes the data analytics infrastructure
*
* This method will build a Sensor Navigator by fecthing sensor names from the Cassandra datastore,
* and then create an AnalyticsManager object, which will take care of instantiating and
* preparing plugins.
*
* @param settings Settings structure containing user-specified configuration parameters
* @param configPath Path to the configuration files for the data analytics framework
* @return True if successful, false otherwise
*/
bool initialize(globalCA_t& settings, const string& configPath);
/**
* @brief Sets the cache to be used for sensors
*
* This method allows to set the SensorCache object to be used to store sensor readings produced
* by the data analytics framework.
* @param cache The SensorCache object to be used as cache
*/
void setCache(SensorCache* cache) { _sensorCache = cache; }
/**
* @brief Returns the status of the internal thread
*
* @return True if the controller is currently stopped, false otherwise
*/
bool isStopped() { return _halted; }
/**
* @brief Returns the internal AnalyticsManager object
*
* @return A shared pointer to the internal AnalyticsManager object
*/
shared_ptr<AnalyticsManager> getManager() { return _manager; }
/**
* @brief Returns the internal SensorNavigator object
*
* @return A shared pointer to the internal SensorNavigator object
*/
shared_ptr<SensorNavigator> getNavigator() { return _navigator; }
/**
* @brief Returns the SensorCache object used to store readings
*
* @return A pointer to a SensorCache object
*/
SensorCache* getCache() { return _sensorCache; }
/**
* @brief Returns an insert counter for data analytics readings
*
* This counter keeps track of how many inserts were performed into the Cassandra datastore for
* data analytics-related operations since the last call to this method.
*
* @return An insert counter to the Cassandra datastore
*/
uint64_t getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; }
private:
......
......@@ -72,6 +72,7 @@ logger_t lg;
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
std::string topic;
// Getting the topic of the queried sensor from the Navigator
try {
topic = queryEngine.getNavigator()->getNodeTopic(name);
} catch(const std::domain_error& e) {
......@@ -79,10 +80,12 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
}
std::vector <reading_t> *output = NULL;
DCDB::SensorId sid;
// Creating a SID to perform the query
sid.mqttTopicConvert(topic);
if(mySensorCache.getSensorMap().count(sid) > 0) {
CacheEntry &entry = mySensorCache.getSensorMap()[sid];
output = entry.getView(startTs, endTs, buffer, rel, true);
// getView is called with live=false to drop strict staleness checks
output = entry.getView(startTs, endTs, buffer, rel, false);
if (output->size() > 0)
return output;
}
......@@ -99,6 +102,7 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
// Dealing with allocations that may have been performed by the cache search
if(output)
output->clear();
else if(buffer) {
......
......@@ -46,6 +46,7 @@ public:
~CacheEntry() {
_cache.clear();
}
/**
* @brief Returns the time frame (in nanoseconds) covered by the cache.
......@@ -102,7 +103,7 @@ public:
buffer = new std::vector<reading_t>();
buffer->clear();
uint64_t interval = (_maxHistory / (_cache.size() - 1)) * 2;
uint64_t staleThreshold = (_maxHistory / (_cache.size() - 1)) * 4;
uint64_t now = getTimestamp();
//Converting absolute timestamps to relative offsets for cache access
uint64_t startTsInt = rel ? startTs : now - startTs;
......@@ -115,8 +116,12 @@ public:
if( startIdx < 0 || endIdx < 0)
return buffer;
//Managing obsolete data
if(live && (now - startTsInt > _cache[startIdx].timestamp + interval || now - endTsInt > _cache[endIdx].timestamp + interval))
if(live && (now - startTsInt > _cache[startIdx].timestamp + staleThreshold || now - endTsInt > _cache[endIdx].timestamp + staleThreshold))
return buffer;
//If no liveness check is performed, we still make sure that the latest entry in the cache
// is not older than its maximum length
else if(!live && now - getLatest().timestamp > _maxHistory)
return buffer;
if(startIdx <= endIdx)
buffer->insert(buffer->end(), _cache.begin() + startIdx, _cache.begin() + endIdx + 1);
else {
......@@ -135,6 +140,7 @@ public:
*
* @return True if the cache is still valid, False otherwise
**/
//TODO: update this method to make it independent from the sampling rate
bool checkValid() const {
if (_cache.size() > 2) {
// Cache element right after cacheIndex is the oldest entry (circular array)
......@@ -157,7 +163,7 @@ public:
}
return true;
}
/**
* @brief Returns an average of recent sensor readings.
*
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment