Commit bb4d7a9a authored by Michael Ott's avatar Michael Ott
Browse files

Add global parameters for InfluxDB

parent e139e2ce
...@@ -38,13 +38,13 @@ ...@@ -38,13 +38,13 @@
std::placeholders::_3) std::placeholders::_3)
CARestAPI::CARestAPI(serverSettings_t settings, CARestAPI::CARestAPI(serverSettings_t settings,
std::map<std::string, influx_t>& influxMap, influx_t* influxSettings,
SensorCache* sensorCache, SensorCache* sensorCache,
SensorDataStore* sensorDataStore, SensorDataStore* sensorDataStore,
AnalyticsController* analyticsController, AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer) : SimpleMQTTServer* mqttServer) :
RESTHttpsServer(settings), RESTHttpsServer(settings),
_influxMap(influxMap), _influxSettings(influxSettings),
_sensorCache(sensorCache), _sensorCache(sensorCache),
_sensorDataStore(sensorDataStore), _sensorDataStore(sensorDataStore),
_analyticsController(analyticsController), _analyticsController(analyticsController),
...@@ -163,9 +163,9 @@ void CARestAPI::POST_write(endpointArgs) { ...@@ -163,9 +163,9 @@ void CARestAPI::POST_write(endpointArgs) {
boost::smatch m1, m2; boost::smatch m1, m2;
if (boost::regex_search(line, m1, r1)) { if (boost::regex_search(line, m1, r1)) {
std::string measurement = m1[1].str(); std::string measurement = m1[1].str();
auto m = _influxMap.find(measurement); auto m = _influxSettings->measurements.find(measurement);
if (m != _influxMap.end()) { if (m != _influxSettings->measurements.end()) {
influx_t influx = m->second; influx_measurement_t influx = m->second;
// Parse tags into a map // Parse tags into a map
std::map<std::string, std::string> tags; std::map<std::string, std::string> tags;
...@@ -204,7 +204,7 @@ void CARestAPI::POST_write(endpointArgs) { ...@@ -204,7 +204,7 @@ void CARestAPI::POST_write(endpointArgs) {
for (auto &f: fields) { for (auto &f: fields) {
// If no fields were defined, we take any field // If no fields were defined, we take any field
if (influx.fields.empty() || (influx.fields.find(f.first) != influx.fields.end())) { if (influx.fields.empty() || (influx.fields.find(f.first) != influx.fields.end())) {
std::string mqttTopic = m->second.mqttPrefix + "/" + tagName + "/" + f.first; std::string mqttTopic = _influxSettings->mqttPrefix + m->second.mqttPart + "/" + tagName + "/" + f.first;
uint64_t value = 0; uint64_t value = 0;
try { try {
value = stoull(f.second); value = stoull(f.second);
......
...@@ -44,7 +44,7 @@ ...@@ -44,7 +44,7 @@
class CARestAPI : public RESTHttpsServer { class CARestAPI : public RESTHttpsServer {
public: public:
CARestAPI(serverSettings_t settings, CARestAPI(serverSettings_t settings,
std::map<std::string, influx_t>& influxMap, influx_t* influxSettings,
SensorCache* sensorCache, SensorCache* sensorCache,
SensorDataStore* sensorDataStore, SensorDataStore* sensorDataStore,
AnalyticsController* analyticsController, AnalyticsController* analyticsController,
...@@ -202,8 +202,8 @@ private: ...@@ -202,8 +202,8 @@ private:
SensorCache* _sensorCache; SensorCache* _sensorCache;
SensorDataStore* _sensorDataStore; SensorDataStore* _sensorDataStore;
AnalyticsController* _analyticsController; AnalyticsController* _analyticsController;
SimpleMQTTServer* _mqttServer; SimpleMQTTServer* _mqttServer;
std::map<std::string, influx_t> _influxMap; influx_t* _influxSettings;
}; };
#endif /* COLLECTAGENT_CARESTAPI_H_ */ #endif /* COLLECTAGENT_CARESTAPI_H_ */
...@@ -807,11 +807,12 @@ int main(int argc, char* const argv[]) { ...@@ -807,11 +807,12 @@ int main(int argc, char* const argv[]) {
LOG(info) << " Certificate: " << restAPISettings.certificate; LOG(info) << " Certificate: " << restAPISettings.certificate;
LOG(info) << " Private key file: " << restAPISettings.privateKey; LOG(info) << " Private key file: " << restAPISettings.privateKey;
if (config.influxMap.size() > 0) { if (config.influxSettings.measurements.size() > 0) {
LOG(info) << "InfluxDB Settings:"; LOG(info) << "InfluxDB Settings:";
for (auto &m: config.influxMap) { LOG(info) << " MQTT-Prefix: " << config.influxSettings.mqttPrefix;
for (auto &m: config.influxSettings.measurements) {
LOG(info) << " Measurement: " << m.first; LOG(info) << " Measurement: " << m.first;
LOG(info) << " MQTT-Prefix: " << m.second.mqttPrefix; LOG(info) << " MQTT-Part: " << m.second.mqttPart;
LOG(info) << " Tag: " << m.second.tag; LOG(info) << " Tag: " << m.second.tag;
if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0)) if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0))
if (m.second.tagSubstitution != "&") { if (m.second.tagSubstitution != "&") {
...@@ -855,7 +856,7 @@ int main(int argc, char* const argv[]) { ...@@ -855,7 +856,7 @@ int main(int argc, char* const argv[]) {
* Start the HTTP Server for the REST API * Start the HTTP Server for the REST API
*/ */
if (restAPISettings.enabled) { if (restAPISettings.enabled) {
httpsServer = new CARestAPI(restAPISettings, config.influxMap, &mySensorCache, mySensorDataStore, analyticsController, &ms); httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, analyticsController, &ms);
config.readRestAPIUsers(httpsServer); config.readRestAPIUsers(httpsServer);
httpsServer->start(); httpsServer->start();
LOG(info) << "HTTP Server running..."; LOG(info) << "HTTP Server running...";
......
...@@ -76,11 +76,13 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) { ...@@ -76,11 +76,13 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
// ----- READING INFLUXDB LINE PROTOCOL SETTINGS ----- // ----- READING INFLUXDB LINE PROTOCOL SETTINGS -----
if (cfg.find("influx") != cfg.not_found()) { if (cfg.find("influx") != cfg.not_found()) {
BOOST_FOREACH(boost::property_tree::iptree::value_type & global, cfg.get_child("influx")) { BOOST_FOREACH(boost::property_tree::iptree::value_type & global, cfg.get_child("influx")) {
if (boost::iequals(global.first, "measurement")) { if (boost::iequals(global.first, "mqttprefix")) {
influx_t influx; influxSettings.mqttPrefix = MQTTChecker::formatTopic(global.second.data());
} else if (boost::iequals(global.first, "measurement")) {
influx_measurement_t measurement;
BOOST_FOREACH(boost::property_tree::iptree::value_type &m, global.second) { BOOST_FOREACH(boost::property_tree::iptree::value_type &m, global.second) {
if (boost::iequals(m.first, "tag")) { if (boost::iequals(m.first, "tag")) {
influx.tag = m.second.data(); measurement.tag = m.second.data();
} else if (boost::iequals(m.first, "tagfilter")) { } else if (boost::iequals(m.first, "tagfilter")) {
//check if input has sed format of "s/.../.../" for substitution //check if input has sed format of "s/.../.../" for substitution
boost::regex checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1"); boost::regex checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
...@@ -88,25 +90,25 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) { ...@@ -88,25 +90,25 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
if (regex_match(m.second.data(), matchResults, checkSubstitute)) { if (regex_match(m.second.data(), matchResults, checkSubstitute)) {
//input has substitute format //input has substitute format
influx.tagRegex = boost::regex(matchResults[2].str(), boost::regex_constants::extended); measurement.tagRegex = boost::regex(matchResults[2].str(), boost::regex_constants::extended);
influx.tagSubstitution = matchResults[3].str(); measurement.tagSubstitution = matchResults[3].str();
} else { } else {
//input is only a regex //input is only a regex
influx.tagRegex = boost::regex(m.second.data(), boost::regex_constants::extended); measurement.tagRegex = boost::regex(m.second.data(), boost::regex_constants::extended);
influx.tagSubstitution = "&"; measurement.tagSubstitution = "&";
} }
} else if (boost::iequals(m.first, "mqttprefix")) { } else if (boost::iequals(m.first, "mqttpart")) {
influx.mqttPrefix = m.second.data(); measurement.mqttPart = MQTTChecker::formatTopic(m.second.data());
} else if (boost::iequals(m.first, "fields")) { } else if (boost::iequals(m.first, "fields")) {
std::stringstream ss(m.second.data()); std::stringstream ss(m.second.data());
while (ss.good()) { while (ss.good()) {
std::string s; std::string s;
getline(ss, s, ','); getline(ss, s, ',');
influx.fields.insert(s); measurement.fields.insert(s);
} }
} }
} }
influxMap[global.second.data()] = influx; influxSettings.measurements[global.second.data()] = measurement;
} }
} }
} }
......
...@@ -65,16 +65,24 @@ public: ...@@ -65,16 +65,24 @@ public:
bool debugLog = false; bool debugLog = false;
}; };
class influx_t { class influx_measurement_t {
public: public:
influx_t() {} influx_measurement_t() {}
std::string mqttPrefix; std::string mqttPart;
std::string tag; std::string tag;
boost::regex tagRegex; boost::regex tagRegex;
std::string tagSubstitution; std::string tagSubstitution;
std::set<std::string> fields; std::set<std::string> fields;
}; };
class influx_t {
public:
influx_t() {}
std::string mqttPrefix;
std::map<std::string, influx_measurement_t> measurements;
};
/** /**
* @brief Class responsible for reading collect agent specific configuration. * @brief Class responsible for reading collect agent specific configuration.
* *
...@@ -105,7 +113,7 @@ public: ...@@ -105,7 +113,7 @@ public:
uint64_t messageThreads = 128; uint64_t messageThreads = 128;
uint64_t messageSlots = 16; uint64_t messageSlots = 16;
cassandra_t cassandraSettings; cassandra_t cassandraSettings;
std::map<std::string, influx_t> influxMap; influx_t influxSettings;
protected: protected:
void readAdditionalBlocks(boost::property_tree::iptree& cfg) override; void readAdditionalBlocks(boost::property_tree::iptree& cfg) override;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment