Commit cb13a825 authored by Michael Ott's avatar Michael Ott
Browse files

Add auto-publish for InfluxDB

parent bb4d7a9a
......@@ -41,12 +41,14 @@ CARestAPI::CARestAPI(serverSettings_t settings,
influx_t* influxSettings,
SensorCache* sensorCache,
SensorDataStore* sensorDataStore,
SensorConfig *sensorConfig,
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer) :
RESTHttpsServer(settings),
_influxSettings(influxSettings),
_sensorCache(sensorCache),
_sensorDataStore(sensorDataStore),
_sensorConfig(sensorConfig),
_analyticsController(analyticsController),
_mqttServer(mqttServer) {
......@@ -215,7 +217,12 @@ void CARestAPI::POST_write(endpointArgs) {
DCDB::SensorId sid;
if (sid.mqttTopicConvert(mqttTopic)) {
_sensorCache->storeSensor(sid, ts.getRaw(), value);
_sensorDataStore->insert(&sid, ts.getRaw(), value, 0); //Fixme: TTL
_sensorDataStore->insert(&sid, ts.getRaw(), value);
if (_influxSettings->publish && (_influxSensors.find(sid.getId()) == _influxSensors.end())) {
_influxSensors.insert(sid.getId());
_sensorConfig->publishSensor(sid.getId().c_str(), sid.getId().c_str());
}
}
#ifdef DEBUG
LOG(debug) << "influx insert: " << mqttTopic << " " << ts.getRaw() << " " << value;
......
......@@ -34,6 +34,7 @@
#include "mqttchecker.h"
#include "configuration.h"
#include "simplemqttserver.h"
#include <dcdb/sensorconfig.h>
#include <signal.h>
/**
......@@ -47,6 +48,7 @@ public:
influx_t* influxSettings,
SensorCache* sensorCache,
SensorDataStore* sensorDataStore,
SensorConfig *sensorConfig,
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer);
......@@ -199,11 +201,13 @@ private:
*/
void PUT_analytics_navigator(endpointArgs);
SensorCache* _sensorCache;
SensorDataStore* _sensorDataStore;
AnalyticsController* _analyticsController;
SimpleMQTTServer* _mqttServer;
influx_t* _influxSettings;
influx_t* _influxSettings;
SensorCache* _sensorCache;
SensorDataStore* _sensorDataStore;
SensorConfig* _sensorConfig;
AnalyticsController* _analyticsController;
SimpleMQTTServer* _mqttServer;
std::set<std::string> _influxSensors;
};
#endif /* COLLECTAGENT_CARESTAPI_H_ */
......@@ -810,15 +810,18 @@ int main(int argc, char* const argv[]) {
if (config.influxSettings.measurements.size() > 0) {
LOG(info) << "InfluxDB Settings:";
LOG(info) << " MQTT-Prefix: " << config.influxSettings.mqttPrefix;
LOG(info) << " Auto-Publish: " << (config.influxSettings.publish ? "Enabled" : "Disabled");
for (auto &m: config.influxSettings.measurements) {
LOG(info) << " Measurement: " << m.first;
LOG(info) << " MQTT-Part: " << m.second.mqttPart;
LOG(info) << " Tag: " << m.second.tag;
if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0))
if (m.second.tagSubstitution != "&") {
LOG(info) << " TagFilter: s/" << m.second.tagRegex.str() << "/" << m.second.tagSubstitution << "/";
} else {
LOG(info) << " TagFilter: " << m.second.tagRegex.str();
if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0)) {
if (m.second.tagSubstitution != "&") {
LOG(info) << " TagFilter: s/" << m.second.tagRegex.str() << "/" << m.second.tagSubstitution << "/";
} else {
LOG(info) << " TagFilter: " << m.second.tagRegex.str();
}
}
if (m.second.fields.size() > 0) {
stringstream ss;
......@@ -856,7 +859,7 @@ int main(int argc, char* const argv[]) {
* Start the HTTP Server for the REST API
*/
if (restAPISettings.enabled) {
httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, analyticsController, &ms);
httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms);
config.readRestAPIUsers(httpsServer);
httpsServer->start();
LOG(info) << "HTTP Server running...";
......
......@@ -78,6 +78,8 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
BOOST_FOREACH(boost::property_tree::iptree::value_type & global, cfg.get_child("influx")) {
if (boost::iequals(global.first, "mqttprefix")) {
influxSettings.mqttPrefix = MQTTChecker::formatTopic(global.second.data());
} else if (boost::iequals(global.first, "publish")) {
influxSettings.publish = to_bool(global.second.data());
} else if (boost::iequals(global.first, "measurement")) {
influx_measurement_t measurement;
BOOST_FOREACH(boost::property_tree::iptree::value_type &m, global.second) {
......
......@@ -80,6 +80,7 @@ class influx_t {
public:
influx_t() {}
std::string mqttPrefix;
bool publish = false;
std::map<std::string, influx_measurement_t> measurements;
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment