Commit f2970bda authored by Alessio Netti's avatar Alessio Netti
Browse files

DA: filter for SensorNavigator and binary search fixes

- a "filter" configuration parameter has been added, which is used
to discard sensor names when building the sensor navigator
- Can be used to e.g. build a sensor navigator in a collectagent
using only the sensors that are directly collected by it
- Implemented a switch to select leftmost/rightmost binary search
in CacheEntry, used for the QueryEngine
parent c7f0cdbf
......@@ -50,6 +50,7 @@ bool AnalyticsController::initialize(Configuration& settings, const string& conf
// Building the sensor navigator
try {
_navigator->setFilter(_settings.analyticsSettings.filter);
_navigator->buildTree(_settings.analyticsSettings.hierarchy, &names, &topics);
} catch (const std::invalid_argument &e) {
LOG(error) << e.what();
......
......@@ -636,6 +636,7 @@ int main(int argc, char* const argv[]) {
LOG(info) << "Analytics Settings:";
LOG(info) << " Hierarchy: " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
LOG(info) << " Filter: " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
LOG(info) << "Cassandra Driver Settings:";
LOG(info) << " Address: " << cassandraSettings.host << ":" << cassandraSettings.port;
......
......@@ -132,8 +132,8 @@ public:
uint64_t startTsInt = rel ? now - startTs : startTs;
uint64_t endTsInt = rel ? now - endTs : endTs;
//Getting the cache indexes to access sensor data
int64_t startIdx = rel ? getOffset(startTs) : searchTimestamp(startTs);
int64_t endIdx = rel ? getOffset(endTs) : searchTimestamp(endTs);
int64_t startIdx = rel ? getOffset(startTs) : searchTimestamp(startTs, true);
int64_t endIdx = rel ? getOffset(endTs) : searchTimestamp(endTs, false);
//Managing invalid time offsets
if(startIdx < 0 || endIdx < 0)
......@@ -219,31 +219,44 @@ public:
* index is returned if successful.
*
* @param t Timestamp to be searched, in nanoseconds.
* @param leftmost If true, leftmost binary search is performed. Rightmost otherwise.
*
* @return The index of the closest sensor reading to t, or -1 if out of bounds.
**/
int64_t searchTimestamp(uint64_t t) const {
int64_t searchTimestamp(uint64_t t, bool leftmost=true) const {
// Cache is empty or has only one element
if(!_stable || _cache.empty())
return -1;
// Target timestamp (relative or absolute) is outside of the time frame contained in the cache
else if(t > _cache[_cacheIndex].timestamp || t < _cache[(_cacheIndex+1) % _cache.size()].timestamp)
return -1;
int64_t pivot=0, pivotReal=0, aPoint=0, bPoint=_cache.size()-1;
// Attention! aPoint and bPoint are linearized indexes, and do not take into account the presence of cacheIndex
// When computing the position of the pivot, we map it to the actual index in the circular array
// Standard (leftmost) binary search algorithm below
while(aPoint < bPoint) {
pivot = (aPoint + bPoint)/2;
pivotReal = (_cacheIndex + 1 + pivot) % _cache.size();
if(t <= _cache[pivotReal].timestamp)
bPoint = pivot;
else
aPoint = pivot + 1;
int64_t pivot = 0, pivotReal = 0, aPoint = 0, bPoint = _cache.size();
if(leftmost) {
// Attention! aPoint and bPoint are linearized indexes, and do not take into account the presence of cacheIndex
// When computing the position of the pivot, we map it to the actual index in the circular array
// Standard binary search algorithm below
while (aPoint < bPoint) {
pivot = (aPoint + bPoint) / 2;
pivotReal = (_cacheIndex + 1 + pivot) % _cache.size();
if (t <= _cache[pivotReal].timestamp)
bPoint = pivot;
else
aPoint = pivot + 1;
}
return (_cacheIndex + 1 + aPoint) % _cache.size();
}
else {
while (aPoint < bPoint) {
pivot = (aPoint + bPoint) / 2;
pivotReal = (_cacheIndex + 1 + pivot) % _cache.size();
if (t < _cache[pivotReal].timestamp)
bPoint = pivot;
else
aPoint = pivot + 1;
}
return (_cacheIndex + aPoint) % _cache.size();
}
return (_cacheIndex + 1 + aPoint) % _cache.size();
}
/**
......
......@@ -38,6 +38,7 @@ class analyticsSettings_t {
public:
analyticsSettings_t() {}
std::string hierarchy = "";
std::string filter = "";
};
/**
......
......@@ -53,6 +53,7 @@ public:
_sensorTree = NULL;
_treeDepth = -1;
_usingTopics = false;
_filter = "";
}
/**
......@@ -293,6 +294,17 @@ public:
* @return A full MQTT topic for the specified sensor
*/
string buildTopicForNode(const string& node, const string& suffix, int len=28);
/**
* @brief Sets an internal regex which is used to filter out sensors
*
* The filter is applied upon building the sensor tree (buildTree), and all sensor names not
* containing a partial match with the filter are eliminated. The filter is NOT applied when
* using the buildCassandraTree method.
*
* @param f The filter regex to be used
*/
void setFilter(string f) { _filter = f; }
protected:
......@@ -315,6 +327,8 @@ protected:
//Vector of regular expression objects that define the sensor hierarchy
vector<boost::regex> *_hierarchy;
boost::cmatch _match;
//Internal regex to filter out sensor names
std::string _filter;
};
......
......@@ -59,6 +59,8 @@ bool GlobalConfiguration::readConfig() {
BOOST_FOREACH(boost::property_tree::iptree::value_type & global, cfg.get_child("analytics")) {
if (boost::iequals(global.first, "hierarchy")) {
analyticsSettings.hierarchy = global.second.data();
} else if (boost::iequals(global.first, "filter")) {
analyticsSettings.filter = global.second.data();
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
......
......@@ -90,6 +90,7 @@ void SensorNavigator::buildTree(const vector<string> *hierarchy, const vector<st
throw invalid_argument("SensorNavigator: cannot build sensor hierarchy!");
bool autoMode = false;
boost::regex filterReg(_filter);
if(!hierarchy || hierarchy->empty()) {
_hierarchy = NULL;
autoMode = true;
......@@ -113,10 +114,12 @@ void SensorNavigator::buildTree(const vector<string> *hierarchy, const vector<st
_sensorTree->insert(make_pair(rootKey, rootNode));
//We iterate over all sensor names that were supplied and build up the tree
for(int i=0; i < (int)sensors->size(); i++)
if(autoMode)
_addAutoSensor(sensors->at(i), topics ? topics->at(i) : "");
else
_addSensor(sensors->at(i), topics ? topics->at(i) : "");
if(_filter=="" || boost::regex_search(sensors->at(i).c_str(), _match, filterReg)) {
if (autoMode)
_addAutoSensor(sensors->at(i), topics ? topics->at(i) : "");
else
_addSensor(sensors->at(i), topics ? topics->at(i) : "");
}
}
void SensorNavigator::buildCassandraTree(const map<string, vector<string> > *table, const string& root, const string& ignore) {
......
......@@ -270,6 +270,7 @@ int main(int argc, char** argv) {
topics.push_back(s->getMqtt());
}
try {
navigator->setFilter(analyticsSettings.filter);
navigator->buildTree(analyticsSettings.hierarchy, &names, &topics);
LOG(info) << "Built a sensor hierarchy tree of size " << navigator->getTreeSize() << " and depth " << navigator->getTreeDepth() << ".";
_queryEngine.setNavigator(navigator);
......@@ -314,6 +315,7 @@ int main(int argc, char** argv) {
}
LOG(info) << "Analytics Settings:";
LOG(info) << " Hierarchy: " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
LOG(info) << " Filter: " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
LOG(info) << "RestAPI Settings:";
LOG(info) << " REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
#ifdef DEBUG
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment