Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 3d12f1f9 authored by Michael Ott's avatar Michael Ott
Browse files

Initial InfluxDB line protocol implementation for the CollectAgent's REST API

parent ca41b786
......@@ -26,6 +26,7 @@
//================================================================================
#include "CARestAPI.h"
#include <boost/beast/http/field.hpp>
#define stdBind(fun) std::bind(&CARestAPI::fun, \
this, \
......@@ -34,11 +35,15 @@
std::placeholders::_3)
CARestAPI::CARestAPI(serverSettings_t settings,
std::map<std::string, influx_t>& influxMap,
SensorCache* sensorCache,
SensorDataStore* sensorDataStore,
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer) :
RESTHttpsServer(settings),
_influxMap(influxMap),
_sensorCache(sensorCache),
_sensorDataStore(sensorDataStore),
_analyticsController(analyticsController),
_mqttServer(mqttServer) {
......@@ -47,6 +52,10 @@ CARestAPI::CARestAPI(serverSettings_t settings,
addEndpoint("/average", {http::verb::get, stdBind(GET_average)});
addEndpoint("/quit", {http::verb::put, stdBind(PUT_quit)});
addEndpoint("/ping", {http::verb::get, stdBind(GET_ping)});
addEndpoint("/query", {http::verb::post, stdBind(POST_query)});
addEndpoint("/write", {http::verb::post, stdBind(POST_write)});
_analyticsController->getManager()->addRestEndpoints(this);
addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
......@@ -122,6 +131,78 @@ void CARestAPI::GET_average(endpointArgs) {
}
}
void CARestAPI::GET_ping(endpointArgs) {
res.body() = "";
res.result(http::status::ok);
}
void CARestAPI::POST_query(endpointArgs) {
res.set(http::field::content_type, "application/json");
res.body() = "{results: [{statement_id: 0}]}";
res.result(http::status::ok);
}
void CARestAPI::POST_write(endpointArgs) {
std::istringstream body(req.body());
std::string line;
while (std::getline(body, line)) {
boost::regex r1("^([^,]*)(,[^ ]*)? ([^ ]*)( .*)?$", boost::regex::extended);
boost::regex r2(",?([^,=]*)=([^,]*)", boost::regex::extended);
boost::smatch m1, m2;
if (boost::regex_search(line, m1, r1)) {
std::string measurement = m1[1].str();
auto m = _influxMap.find(measurement);
if (m != _influxMap.end()) {
std::map<std::string, std::string> tags;
std::string tagStr = m1[2].str();
while (boost::regex_search(tagStr, m2, r2)) {
tags[m2[1].str()] = m2[2].str();
tagStr = m2.suffix().str();
}
auto t = tags.find(m->second.tag);
if (t != tags.end()) {
std::map<std::string, std::string> fields;
std::string fieldStr = m1[3].str();
while (boost::regex_search(fieldStr, m2, r2)) {
fields[m2[1].str()] = m2[2].str();
fieldStr = m2.suffix().str();
}
DCDB::TimeStamp ts;
try {
ts = TimeStamp(m1[4].str());
} catch (...) {
}
for (auto &f: m->second.fields) {
if (fields.find(f) != fields.end()) {
std::string mqttTopic = m->second.mqttPrefix + "/" + t->second + "/" + f;
uint64_t value = 0;
try {
value = stoull(fields[f]);
} catch (...) {
break;
}
DCDB::SensorId sid;
if (sid.mqttTopicConvert(mqttTopic)) {
_sensorCache->storeSensor(sid, ts.getRaw(), value);
_sensorDataStore->insert(&sid, ts.getRaw(), value, 0); //Fixme: TTL
}
LOG(debug) << "influx insert: " << mqttTopic << " " << ts.getRaw() << " " << fields[f];
}
}
}
}
}
}
res.body() = "";
res.result(http::status::no_content);
}
void CARestAPI::PUT_quit(endpointArgs) {
int retCode = getQuery("code", queries)=="" ? 0 : std::stoi(getQuery("code", queries));
if(retCode<0 || retCode>255)
......
......@@ -44,7 +44,9 @@
class CARestAPI : public RESTHttpsServer {
public:
CARestAPI(serverSettings_t settings,
std::map<std::string, influx_t>& influxMap,
SensorCache* sensorCache,
SensorDataStore* sensorDataStore,
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer);
......@@ -120,6 +122,10 @@ private:
*/
void PUT_quit(endpointArgs);
void GET_ping(endpointArgs);
void POST_query(endpointArgs);
void POST_write(endpointArgs);
/**
* PUT "/analytics/reload"
*
......@@ -182,8 +188,10 @@ private:
void PUT_analytics_navigator(endpointArgs);
SensorCache* _sensorCache;
SensorDataStore* _sensorDataStore;
AnalyticsController* _analyticsController;
SimpleMQTTServer* _mqttServer;
std::map<std::string, influx_t> _influxMap;
};
#endif /* COLLECTAGENT_CARESTAPI_H_ */
......@@ -833,7 +833,7 @@ int main(int argc, char* const argv[]) {
* Start the HTTP Server for the REST API
*/
if (restAPISettings.enabled) {
httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController, &ms);
httpsServer = new CARestAPI(restAPISettings, config.influxMap, &mySensorCache, mySensorDataStore, analyticsController, &ms);
config.readRestAPIUsers(httpsServer);
httpsServer->start();
LOG(info) << "HTTP Server running...";
......
......@@ -72,4 +72,28 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
}
}
}
// ----- READING INFLUXDB LINE PROTOCOL SETTINGS -----
if (cfg.find("influx") != cfg.not_found()) {
BOOST_FOREACH(boost::property_tree::iptree::value_type & global, cfg.get_child("influx")) {
if (boost::iequals(global.first, "measurement")) {
influx_t influx;
BOOST_FOREACH(boost::property_tree::iptree::value_type &m, global.second) {
if (boost::iequals(m.first, "tag")) {
influx.tag = m.second.data();
} else if (boost::iequals(m.first, "mqttprefix")) {
influx.mqttPrefix = m.second.data();
} else if (boost::iequals(m.first, "fields")) {
std::stringstream ss(m.second.data());
while (ss.good()) {
std::string s;
getline(ss, s, ',');
influx.fields.insert(s);
}
}
}
influxMap[global.second.data()] = influx;
}
}
}
}
......@@ -63,6 +63,14 @@ public:
bool debugLog = false;
};
class influx_t {
public:
influx_t() {}
std::string mqttPrefix;
std::string tag;
std::set<std::string> fields;
};
/**
* @brief Class responsible for reading collect agent specific configuration.
*
......@@ -93,6 +101,7 @@ public:
uint64_t messageThreads = 128;
uint64_t messageSlots = 16;
cassandra_t cassandraSettings;
std::map<std::string, influx_t> influxMap;
protected:
void readAdditionalBlocks(boost::property_tree::iptree& cfg) override;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment