Commit 5af0eb23 authored by Michael Ott's avatar Michael Ott
Browse files

Add tagFilter to InfluxDB REST API for filtering measurements by tags

parent b96bd99e
......@@ -146,7 +146,9 @@ void CARestAPI::POST_write(endpointArgs) {
std::istringstream body(req.body());
std::string line;
while (std::getline(body, line)) {
// Regex to split line into measurement, tags, fields, timestamp
boost::regex r1("^([^,]*)(,[^ ]*)? ([^ ]*)( .*)?$", boost::regex::extended);
// Regex to split comma-separated tags and fields into individual entries
boost::regex r2(",?([^,=]*)=([^,]*)", boost::regex::extended);
boost::smatch m1, m2;
......@@ -154,20 +156,34 @@ void CARestAPI::POST_write(endpointArgs) {
std::string measurement = m1[1].str();
auto m = _influxMap.find(measurement);
if (m != _influxMap.end()) {
influx_t influx = m->second;
// Parse tags into a map
std::map<std::string, std::string> tags;
std::string tagStr = m1[2].str();
while (boost::regex_search(tagStr, m2, r2)) {
std::string tagList = m1[2].str();
while (boost::regex_search(tagList, m2, r2)) {
tags[m2[1].str()] = m2[2].str();
tagStr = m2.suffix().str();
tagList = m2.suffix().str();
}
auto t = tags.find(m->second.tag);
auto t = tags.find(influx.tag);
if (t != tags.end()) {
std::string tagName = t->second;
// Perform pattern filter or substitution via regex on tag
if (!influx.tagRegex.empty()) {
std::string input(tagName);
tagName = "";
boost::regex_replace(std::back_inserter(tagName), input.begin(), input.end(), influx.tagRegex, influx.tagSubstitution.c_str(), boost::regex_constants::format_sed | boost::regex_constants::format_no_copy);
if (tagName.size() == 0) {
// There was no match
break;
}
}
std::map<std::string, std::string> fields;
std::string fieldStr = m1[3].str();
while (boost::regex_search(fieldStr, m2, r2)) {
std::string fieldList = m1[3].str();
while (boost::regex_search(fieldList, m2, r2)) {
fields[m2[1].str()] = m2[2].str();
fieldStr = m2.suffix().str();
fieldList = m2.suffix().str();
}
DCDB::TimeStamp ts;
......@@ -176,12 +192,13 @@ void CARestAPI::POST_write(endpointArgs) {
} catch (...) {
}
for (auto &f: m->second.fields) {
if (fields.find(f) != fields.end()) {
std::string mqttTopic = m->second.mqttPrefix + "/" + t->second + "/" + f;
for (auto &f: fields) {
// If no fields were defined, we take any field
if (influx.fields.empty() || (influx.fields.find(f.first) != influx.fields.end())) {
std::string mqttTopic = m->second.mqttPrefix + "/" + tagName + "/" + f.first;
uint64_t value = 0;
try {
value = stoull(fields[f]);
value = stoull(f.second);
} catch (...) {
break;
}
......@@ -191,11 +208,14 @@ void CARestAPI::POST_write(endpointArgs) {
_sensorCache->storeSensor(sid, ts.getRaw(), value);
_sensorDataStore->insert(&sid, ts.getRaw(), value, 0); //Fixme: TTL
}
LOG(debug) << "influx insert: " << mqttTopic << " " << ts.getRaw() << " " << fields[f];
#ifdef DEBUG
LOG(debug) << "influx insert: " << mqttTopic << " " << ts.getRaw() << " " << value;
#endif
}
}
}
} else {
LOG(debug) << "influx: unknown measurement " << measurement;
}
}
}
......
......@@ -81,6 +81,20 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &m, global.second) {
if (boost::iequals(m.first, "tag")) {
influx.tag = m.second.data();
} else if (boost::iequals(m.first, "tagfilter")) {
//check if input has sed format of "s/.../.../" for substitution
boost::regex checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
boost::smatch matchResults;
if (regex_match(m.second.data(), matchResults, checkSubstitute)) {
//input has substitute format
influx.tagRegex = boost::regex(matchResults[2].str(), boost::regex_constants::extended);
influx.tagSubstitution = matchResults[3].str();
} else {
//input is only a regex
influx.tagRegex = boost::regex(m.second.data(), boost::regex_constants::extended);
influx.tagSubstitution = "&";
}
} else if (boost::iequals(m.first, "mqttprefix")) {
influx.mqttPrefix = m.second.data();
} else if (boost::iequals(m.first, "fields")) {
......
......@@ -31,6 +31,8 @@
#include <string>
#include <unistd.h>
#include <boost/algorithm/string/trim.hpp>
#include <boost/regex.hpp>
#include "logging.h"
#include "globalconfiguration.h"
......@@ -66,8 +68,10 @@ public:
class influx_t {
public:
influx_t() {}
std::string mqttPrefix;
std::string tag;
std::string mqttPrefix;
std::string tag;
boost::regex tagRegex;
std::string tagSubstitution;
std::set<std::string> fields;
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment