Commit 5682c14f authored by Alessio Netti's avatar Alessio Netti
Browse files

Initial integration of metadata publishing

- Data is correctly parsed from DCDBPusher, sent as JSON, and received
by collectagents
- Integration in libdcdb is still pending
parent e80db3b5
......@@ -489,6 +489,14 @@ protected:
sBase.setDelta(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "subSampling")) {
sBase.setSubsampling(std::stoi(val.second.data()));
} else if (boost::iequals(val.first, "metadata")) {
SensorMetadata sm;
try {
sm.parsePTREE(val.second);
sBase.setMetadata(sm);
} catch(const std::exception& e) {
LOG(error) << " Metadata parsing failed for sensor " << sBase.getName() << "!" << std::endl;
}
}
}
sensorBase(sBase, config);
......@@ -608,11 +616,23 @@ protected:
bool constructSensorTopics(UnitTemplate<SBase>& u, Operator& op) {
std::string name;
// Performing name construction
for(auto& s: u.getOutputs())
for(auto& s: u.getOutputs()) {
s->setName(s->getMqtt());
SensorMetadata* sm = s->getMetadata();
if(sm) {
sm->publicName = s->getMqtt();
sm->isVirtual = false;
}
}
for(auto& subUnit: u.getSubUnits())
for(auto& s : subUnit->getOutputs())
for(auto& s : subUnit->getOutputs()) {
s->setName(s->getMqtt());
SensorMetadata* sm = s->getMetadata();
if(sm) {
sm->publicName = s->getMqtt();
sm->isVirtual = false;
}
}
return true;
}
......
......@@ -122,6 +122,18 @@ void AnalyticsController::run() {
publishSensors();
vector<op_dl_t>& _analyticsPlugins = _manager->getPlugins();
// De-allocating metadata for all sensors - after auto-publish it is not needed anymore
for(auto& p: _analyticsPlugins)
for(auto& op: p.configurator->getOperators()) {
for (auto &u: op->getUnits()) {
for (auto &s: u->getBaseOutputs())
s->clearMetadata();
for (auto &s: u->getBaseInputs())
s->clearMetadata();
}
op->releaseUnits();
}
DCDB::SensorId sid;
list<DCDB::SensorDataStoreReading> readings;
boost::lockfree::spsc_queue<reading_t> *sensorQueue;
......@@ -171,7 +183,12 @@ bool AnalyticsController::publishSensors() {
if(op->getStreaming() && !op->getDynamic()) {
for (const auto &u : op->getUnits())
for (const auto &s : u->getBaseOutputs()) {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
if(!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
err = _dcdbCfg->publishSensor(s->getMetadata()->publicName.c_str(), s->getMqtt().c_str());
switch (err) {
case DCDB::SC_OK:
publishCtr++;
......@@ -198,6 +215,6 @@ bool AnalyticsController::publishSensors() {
if(failedPublish)
LOG(error) << "Issues during sensor name auto-publish! Only " << publishCtr << " sensors were published.";
else
LOG(info) << "Sensor name auto-publish performed for all " << publishCtr << " sensors!";
LOG(info) << "Sensor name auto-publish performed for " << publishCtr << " sensors!";
return true;
}
......@@ -58,6 +58,7 @@
#include <dcdb/version.h>
#include <dcdb/sensor.h>
#include "version.h"
#include "metadatastore.h"
#include "CARestAPI.h"
#include "configuration.h"
......@@ -201,12 +202,20 @@ int mqttCallback(SimpleMQTTMessage *msg)
// We use strncmp as it is the most efficient way to do it
if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
if ((len = msg->getPayloadLength()) == 0) {
LOG(error) << "Empty topic-to-name mapping message received";
LOG(error) << "Empty topic-to-name mapping message received!";
return 1;
}
string sensorName((char *) msg->getPayload(), len);
err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);
string payload((char *) msg->getPayload(), len);
SensorMetadata sm;
try {
sm.parseJSON(payload);
} catch(const std::exception& e) {
LOG(error) << "Invalid metadata packed received!";
return 1;
}
err = mySensorConfig->publishSensor(sm.publicName.c_str(), topic + DCDB_MAP_LEN);
// PublishSensor does most of the error checking for us
switch (err) {
......@@ -214,7 +223,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
LOG(error) << "Invalid sensor topic : " << msg->getTopic();
return 1;
case DCDB::SC_INVALIDPUBLICNAME:
LOG(error) << "Invalid sensor public name: " << sensorName;
LOG(error) << "Invalid sensor public name: " << sm.publicName;
return 1;
case DCDB::SC_INVALIDSESSION:
LOG(error) << "Cannot reach sensor data store.";
......
......@@ -53,7 +53,7 @@ public:
unit(""),
scale(1),
ttl(0),
interval(1000000000) {}
interval(0) {}
SensorMetadata(const SensorMetadata& other) {
this->isVirtual = other.isVirtual;
......@@ -191,7 +191,7 @@ protected:
// Dumps the content of a vector into a string with an arbitrary separator
void _dumpVector(string& str, const vector<string>& v, const char sep=',') const {
str = "";
string sepStr = string(sep,1);
string sepStr = string(1,sep);
for(const auto& el : v)
str += el + sepStr;
if(!str.empty() && str.back() == sep)
......
......@@ -35,6 +35,7 @@
#include <boost/lockfree/spsc_queue.hpp>
#include "logging.h"
#include "cacheentry.h"
#include "metadatastore.h"
/**
* @brief General sensor base class.
......@@ -55,7 +56,8 @@ public:
_cache(nullptr),
_delta(false),
_firstReading(true),
_readingQueue(nullptr) {
_readingQueue(nullptr),
_metadata(nullptr) {
_lastRawUValue.timestamp = 0;
_lastRawUValue.value = 0;
......@@ -84,7 +86,10 @@ public:
_latestValue(other._latestValue),
_lastSentValue(other._lastSentValue),
_accumulator(other._accumulator),
_readingQueue(nullptr) {}
_readingQueue(nullptr) {
_metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
}
virtual ~SensorBase() {}
......@@ -109,6 +114,7 @@ public:
_accumulator.timestamp = other._accumulator.timestamp;
_accumulator.value = other._accumulator.value;
_readingQueue.reset(nullptr);
_metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
return *this;
}
......@@ -125,7 +131,10 @@ public:
// Exposing the reading queue is necessary for publishing sensor data from the collectagent
boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }
SensorMetadata* getMetadata() { return _metadata.get(); }
void clearMetadata() { _metadata.reset(nullptr); }
void setMetadata(SensorMetadata& s) { _metadata.reset(new SensorMetadata(s)); }
void setSkipConstVal(bool skipConstVal) { _skipConstVal = skipConstVal; }
void setDelta(const bool delta) { _delta = delta; }
void setName(const std::string& name) { _name = name; }
......@@ -292,6 +301,7 @@ protected:
reading_t _lastSentValue;
reading_t _accumulator;
std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
std::unique_ptr<SensorMetadata> _metadata;
};
//for better readability
......
......@@ -99,6 +99,26 @@ void MQTTPusher::push() {
//Performing auto-publish if necessary
sendMappings();
// De-allocating metadata for all sensors - after auto-publish it is not needed anymore
for(auto& p: _plugins) {
for (auto &g: p.configurator->getSensorGroups()) {
for (auto &s: g->acquireSensors()) {
s->clearMetadata();
}
g->releaseSensors();
}
}
for(auto& p: _operatorPlugins)
for(auto& op: p.configurator->getOperators()) {
for (auto &u: op->getUnits()) {
for (auto &s: u->getBaseOutputs())
s->clearMetadata();
for (auto &s: u->getBaseInputs())
s->clearMetadata();
}
op->releaseUnits();
}
computeMsgRate();
//collect sensor-data
reading_t* reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
......@@ -223,17 +243,28 @@ int MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& total
bool MQTTPusher::sendMappings() {
if(!_autoPublish) return false;
std::string topic, name;
std::string topic, payload;
unsigned int publishCtr=0;
// Performing auto-publish for sensors
for(auto& p: _plugins) {
for(auto& g: p.configurator->getSensorGroups()) {
for(auto& s: g->acquireSensors()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = s->getName();
// If metadata is missing, we use a dummy temporary structure
if(!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
try {
payload = s->getMetadata()->getJSON();
} catch(const std::exception& e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
}
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
......@@ -252,10 +283,20 @@ bool MQTTPusher::sendMappings() {
for (auto &u: op->getUnits())
for (auto &s: u->getBaseOutputs()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = s->getName();
if(!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
try {
payload = s->getMetadata()->getJSON();
} catch(const std::exception& e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
}
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
......@@ -264,7 +305,7 @@ bool MQTTPusher::sendMappings() {
}
op->releaseUnits();
}
LOGM(info) << "Sensor name auto-publish performed for all " << publishCtr << " sensors!";
LOGM(info) << "Sensor name auto-publish performed for " << publishCtr << " sensors!";
return true;
}
......
......@@ -375,8 +375,7 @@ protected:
sBase = *(it->second);
sBase.setName(config.data());
} else {
LOG(warning) << "Template " << _baseName << "\" "
<< def.get().data() << "\" not found! Using standard values.";
LOG(warning) << "Template " << _baseName << "\" " << def.get().data() << "\" not found! Using standard values.";
}
}
}
......@@ -390,6 +389,14 @@ protected:
sBase.setDelta(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "subSampling")) {
sBase.setSubsampling(std::stoi(val.second.data()));
} else if (boost::iequals(val.first, "metadata")) {
SensorMetadata sm;
try {
sm.parsePTREE(val.second);
sBase.setMetadata(sm);
} catch(const std::exception& e) {
LOG(error) << " Metadata parsing failed for sensor " << sBase.getName() << "!" << std::endl;
}
}
}
sensorBase(sBase, config);
......@@ -674,6 +681,11 @@ protected:
MQTTChecker::formatTopic(g->getMqttPart()) +
MQTTChecker::formatTopic(s->getMqtt()));
s->setName(s->getMqtt());
SensorMetadata* sm = s->getMetadata();
if(sm) {
sm->publicName = s->getMqtt();
sm->isVirtual = false;
}
}
g->releaseSensors();
}
......@@ -974,6 +986,14 @@ protected:
sBase.setDelta(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "subSampling")) {
sBase.setSubsampling(std::stoi(val.second.data()));
} else if (boost::iequals(val.first, "metadata")) {
SensorMetadata sm;
try {
sm.parsePTREE(val.second);
sBase.setMetadata(sm);
} catch(const std::exception& e) {
LOG(error) << " Metadata parsing failed for sensor " << sBase.getName() << "!" << std::endl;
}
}
}
sensorBase(sBase, config);
......@@ -1087,6 +1107,11 @@ protected:
MQTTChecker::formatTopic(g->getMqttPart()) +
MQTTChecker::formatTopic(s->getMqtt()));
s->setName(s->getMqtt());
SensorMetadata* sm = s->getMetadata();
if(sm) {
sm->publicName = s->getMqtt();
sm->isVirtual = false;
}
}
g->releaseSensors();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment