Commit ec42b6ae authored by Alessio Netti's avatar Alessio Netti

Analytics: minor fixes and changes

- CA: sensors from job operators are not cached anymore
- CA: the analytics controller is now de-allocated properly
- Added a "tol" parameter to the QueryEngine interface for fuzzy queries
parent 1dd95e62
......@@ -44,9 +44,9 @@ struct qeJobData {
};
//Typedef for the callback used to retrieve sensors
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool, const uint64_t);
//Typedef for the callback used to retrieve sensors
typedef bool (*QueryEngineGroupCallback)(const vector<string>&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
typedef bool (*QueryEngineGroupCallback)(const vector<string>&, const uint64_t, const uint64_t, vector<reading_t>&, const bool, const uint64_t);
//Typedef for the job retrieval callback
typedef bool (*QueryEngineJobCallback)(const string&, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
//Typedef for the metadata retrieval callback
......@@ -256,14 +256,15 @@ public:
* @param endTs End timestamp (in nanoseconds) of the time range for the query. Must be >= startTs
* @param buffer Reference to a vector in which readings must be stored.
* @param rel If true, the input timestamps are considered to be relative offset against "now"
* @param tol Tolerance (in ns) for returned timestamps. Does not affect Cassandra range queries.
* @return True if successful, false otherwise
*/
bool querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true) {
bool querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true, const uint64_t tol=3600000000000) {
if(!_callback)
throw runtime_error("Query Engine: callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
throw invalid_argument("Query Engine: invalid time range!");
return _callback(name, startTs, endTs, buffer, rel);
return _callback(name, startTs, endTs, buffer, rel, tol);
}
/**
......@@ -274,12 +275,12 @@ public:
*
* @return True if successful, false otherwise
*/
bool querySensor(const vector<string>& names, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true) {
bool querySensor(const vector<string>& names, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true, const uint64_t tol=3600000000000) {
if(!_gCallback)
throw runtime_error("Query Engine: callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
throw invalid_argument("Query Engine: invalid time range!");
return _gCallback(names, startTs, endTs, buffer, rel);
return _gCallback(names, startTs, endTs, buffer, rel, tol);
}
/**
......
......@@ -138,9 +138,14 @@ void AnalyticsController::run() {
sensorQueue = s->getReadingQueue();
while (sensorQueue->pop(readingBuf)) {
readings.push_back(DCDB::SensorDataStoreReading(sid, readingBuf.timestamp, readingBuf.value));
_sensorCache->storeSensor(sid, readingBuf.timestamp, readingBuf.value);
}
_sensorCache->getSensorMap()[sid].updateBatchSize(op->getMinValues());
// The readings of operators that are dynamic (e.g., job operators) are not cached
if(!op->getDynamic()) {
for(const auto &r : readings) {
_sensorCache->storeSensor(sid, r.timeStamp.getRaw(), r.value);
}
_sensorCache->getSensorMap()[sid].updateBatchSize(op->getMinValues());
}
_dcdbStore->insertBatch(readings, _metadataStore->getTTL(s->getMqtt()));
_readingCtr += readings.size();
}
......
......@@ -135,7 +135,7 @@ bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_
return true;
}
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
// Returning NULL if the query engine is being updated
if(queryEngine.updating.load()) return false;
++queryEngine.access;
......@@ -154,7 +154,7 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s
// Creating a SID to perform the query
if (sid.mqttTopicConvert(topic)) {
mySensorCache.wait();
if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) {
if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel, tol)) {
// Data was found, can continue to next SID
successCtr++;
} else {
......@@ -177,7 +177,7 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s
// If timestamps are equal we perform a fuzzy query
if(startTsInt == endTsInt) {
topics.front().setRsvd(startWs);
mySensorDataStore->fuzzyQuery(results, topics, start, 3600000000000, false);
mySensorDataStore->fuzzyQuery(results, topics, start, tol, false);
}
// Else, we iterate over the weekstamps (if more than one) and perform range queries
else {
......@@ -204,12 +204,12 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s
return successCtr>0;
}
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
// Returning NULL if the query engine is being updated
if(queryEngine.updating.load()) return false;
std::vector<std::string> nameWrapper;
nameWrapper.push_back(name);
return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel);
return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel, tol);
}
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
......@@ -887,6 +887,7 @@ int main(int argc, char* const argv[]) {
dcdbConn->disconnect();
delete dcdbConn;
delete metadataStore;
delete analyticsController;
LOG(info) << "Collect Agent closed. Bye bye...";
}
catch (const std::runtime_error& e) {
......
......@@ -152,14 +152,18 @@ public:
* @param endTs End timestamp of the desired view
* @param buffer Reference to a vector to be used to store the view.
* @param rel If true, startTs and endTs are interpreted as relative timestamps against "the most recent sensor reading"
* @param tol Maximum tolerance (in ns) for returned timestamps. Limited by the internal staleness threshold.
* @return True if successful, false otherwise
**/
bool getView(uint64_t startTs, uint64_t endTs, std::vector<reading_t>& buffer, bool rel=false) const {
bool getView(uint64_t startTs, uint64_t endTs, std::vector<reading_t>& buffer, bool rel=false, uint64_t tol=3600000000000) const {
// We add the estimated batch size in the computation of the stale threshold (set to 1 if not used)
uint64_t cacheSize = _cache.size()>1 ? _cache.size()-1 : 1;
uint64_t staleThreshold = (_maxHistory / cacheSize) * (uint64_t)_batchSize * 4;
uint64_t now = getTimestamp();
if(tol < staleThreshold)
staleThreshold = tol;
if(startTs!=endTs) {
//Converting relative offsets to absolute timestamp for staleness checking
uint64_t startTsInt = rel ? now - startTs : startTs;
......
......@@ -78,7 +78,7 @@ QueryEngine & _queryEngine = QueryEngine::getInstance();
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
bool sensorQueryCallback(const string &name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t> &buffer, const bool rel) {
bool sensorQueryCallback(const string &name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t> &buffer, const bool rel, const uint64_t tol) {
// Returning NULL if the query engine is being updated
if (_queryEngine.updating.load())
return false;
......@@ -88,19 +88,19 @@ bool sensorQueryCallback(const string &name, const uint64_t startTs, const uint6
if (sensorMap != nullptr && sensorMap->count(name) > 0) {
SBasePtr sensor = sensorMap->at(name);
found = sensor->isInit() ? sensor->getCache()->getView(startTs, endTs, buffer, rel) : false;
found = sensor->isInit() ? sensor->getCache()->getView(startTs, endTs, buffer, rel, tol) : false;
}
--_queryEngine.access;
return found;
}
bool sensorGroupQueryCallback(const std::vector<string> &names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t> &buffer, const bool rel) {
bool sensorGroupQueryCallback(const std::vector<string> &names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t> &buffer, const bool rel, const uint64_t tol) {
// Returning NULL if the query engine is being updated
if (_queryEngine.updating.load())
return false;
bool outcome = false;
for(const auto& name : names) {
outcome = sensorQueryCallback(name, startTs, endTs, buffer, rel) || outcome;
outcome = sensorQueryCallback(name, startTs, endTs, buffer, rel, tol) || outcome;
}
return outcome;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment