The expiration time for new job artifacts in CI/CD pipelines is now 30 days (GitLab default). Previously generated artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit dcafa065 authored by Alessio Netti's avatar Alessio Netti
Browse files

Changes to metadata packet send method

- JSON metadata is sent only if a "metadata" block for the sensor is
defined, else if auto-publish is enabled we only send the sensor name
parent a68b62c8
......@@ -184,12 +184,13 @@ bool AnalyticsController::publishSensors() {
for (const auto &u : op->getUnits())
for (const auto &s : u->getBaseOutputs()) {
if (s->getPublish()) {
if (!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
if (s->getMetadata()) {
SensorMetadata* sm = s->getMetadata();
//TODO: update this publishSensor call
err = _dcdbCfg->publishSensor(sm->publicName.c_str(), s->getMqtt().c_str());
} else {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
}
err = _dcdbCfg->publishSensor(s->getMetadata()->publicName.c_str(), s->getMqtt().c_str());
switch (err) {
case DCDB::SC_OK:
publishCtr++;
......
......@@ -197,25 +197,30 @@ int mqttCallback(SimpleMQTTMessage *msg)
*/
if (msg->isPublish()) {
const char *topic = msg->getTopic().c_str();
// We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
// We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
// sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
// We use strncmp as it is the most efficient way to do it
if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
if ((len = msg->getPayloadLength()) == 0) {
LOG(error) << "Empty topic-to-name mapping message received!";
LOG(error) << "Empty sensor publish message received!";
return 1;
}
string payload((char *) msg->getPayload(), len);
SensorMetadata sm;
try {
sm.parseJSON(payload);
} catch(const std::exception& e) {
LOG(error) << "Invalid metadata packed received!";
return 1;
//If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
SensorMetadata sm;
try {
sm.parseJSON(payload);
} catch (const std::exception &e) {
LOG(error) << "Invalid metadata packed received!";
return 1;
}
// TODO: update this publishSensor call
err = mySensorConfig->publishSensor(sm.publicName.c_str(), topic + DCDB_MET_LEN);
} else {
err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
}
err = mySensorConfig->publishSensor(sm.publicName.c_str(), topic + DCDB_MAP_LEN);
// PublishSensor does most of the error checking for us
switch (err) {
......@@ -223,7 +228,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
LOG(error) << "Invalid sensor topic : " << msg->getTopic();
return 1;
case DCDB::SC_INVALIDPUBLICNAME:
LOG(error) << "Invalid sensor public name: " << sm.publicName;
LOG(error) << "Invalid sensor public name.";
return 1;
case DCDB::SC_INVALIDSESSION:
LOG(error) << "Cannot reach sensor data store.";
......
......@@ -53,6 +53,8 @@
#define DCDB_MAP "/DCDB_MAP/"
#define DCDB_MAP_LEN 10
#define DCDB_MET "/DCDB_MAP/METADATA/"
#define DCDB_MET_LEN 19
#pragma pack(push,1)
......
......@@ -250,18 +250,17 @@ bool MQTTPusher::sendMappings() {
for(auto& g: p.configurator->getSensorGroups()) {
for(auto& s: g->acquireSensors()) {
if(s->getPublish()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
// If metadata is missing, we use a dummy temporary structure
if (!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
try {
payload = s->getMetadata()->getJSON();
} catch (const std::exception &e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
topic = std::string(DCDB_MAP) + s->getMqtt();
payload = s->getName();
} else {
topic = std::string(DCDB_MET) + s->getMqtt();
try {
payload = s->getMetadata()->getJSON();
} catch (const std::exception &e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
}
}
// Try to send mapping to the broker
......@@ -284,17 +283,17 @@ bool MQTTPusher::sendMappings() {
for (auto &u: op->getUnits())
for (auto &s: u->getBaseOutputs()) {
if (s->getPublish()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
if (!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
try {
payload = s->getMetadata()->getJSON();
} catch (const std::exception &e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
topic = std::string(DCDB_MAP) + s->getMqtt();
payload = s->getName();
} else {
topic = std::string(DCDB_MET) + s->getMqtt();
try {
payload = s->getMetadata()->getJSON();
} catch (const std::exception &e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
}
}
// Try to send mapping to the broker
......
......@@ -29,6 +29,7 @@
#define MQTTPUSHER_H_
#define DCDB_MAP "/DCDB_MAP/"
#define DCDB_MET "/DCDB_MAP/METADATA/"
#define PUSHER_IDLETIME 1000000000
#include <mosquitto.h>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment