Commit 7de3886e authored by Alessio Netti's avatar Alessio Netti
Browse files

Implemented "publish" switch to prevent auto-publishing per-sensor

parent 967a4592
......@@ -489,6 +489,8 @@ protected:
sBase.setDelta(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "subSampling")) {
sBase.setSubsampling(std::stoi(val.second.data()));
} else if (boost::iequals(val.first, "publish")) {
sBase.setPublish(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "metadata")) {
SensorMetadata sm;
try {
......
......@@ -183,30 +183,32 @@ bool AnalyticsController::publishSensors() {
if(op->getStreaming() && !op->getDynamic()) {
for (const auto &u : op->getUnits())
for (const auto &s : u->getBaseOutputs()) {
if(!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
err = _dcdbCfg->publishSensor(s->getMetadata()->publicName.c_str(), s->getMqtt().c_str());
switch (err) {
case DCDB::SC_OK:
publishCtr++;
break;
case DCDB::SC_INVALIDPATTERN:
LOG(error) << "Invalid sensor topic : " << s->getMqtt();
failedPublish = true;
break;
case DCDB::SC_INVALIDPUBLICNAME:
LOG(error) << "Invalid sensor public name: " << s->getName();
failedPublish = true;
break;
case DCDB::SC_INVALIDSESSION:
LOG(error) << "Cannot reach sensor data store.";
failedPublish = true;
break;
default:
break;
if (s->getPublish()) {
if (!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
err = _dcdbCfg->publishSensor(s->getMetadata()->publicName.c_str(), s->getMqtt().c_str());
switch (err) {
case DCDB::SC_OK:
publishCtr++;
break;
case DCDB::SC_INVALIDPATTERN:
LOG(error) << "Invalid sensor topic : " << s->getMqtt();
failedPublish = true;
break;
case DCDB::SC_INVALIDPUBLICNAME:
LOG(error) << "Invalid sensor public name: " << s->getName();
failedPublish = true;
break;
case DCDB::SC_INVALIDSESSION:
LOG(error) << "Cannot reach sensor data store.";
failedPublish = true;
break;
default:
break;
}
}
}
op->releaseUnits();
......
......@@ -50,6 +50,7 @@ public:
_name(name),
_mqtt(""),
_skipConstVal(false),
_publish(true),
_cacheInterval(900000),
_subsamplingFactor(1),
_subsamplingIndex(0),
......@@ -75,6 +76,7 @@ public:
_name(other._name),
_mqtt(other._mqtt),
_skipConstVal(other._skipConstVal),
_publish(other._publish),
_cacheInterval(other._cacheInterval),
_subsamplingFactor(other._subsamplingFactor),
_subsamplingIndex(0),
......@@ -97,6 +99,7 @@ public:
_name = other._name;
_mqtt = other._mqtt;
_skipConstVal = other._skipConstVal;
_publish = other._publish;
_cacheInterval = other._cacheInterval;
_subsamplingFactor = other._subsamplingFactor;
_subsamplingIndex = 0;
......@@ -123,6 +126,7 @@ public:
const std::string& getName() const { return _name; }
const std::string& getMqtt() const { return _mqtt; }
bool getSkipConstVal() const { return _skipConstVal; }
bool getPublish() const { return _publish; }
unsigned getCacheInterval() const { return _cacheInterval; }
int getSubsampling() const { return _subsamplingFactor; }
const CacheEntry* const getCache() const { return _cache.get(); }
......@@ -136,6 +140,7 @@ public:
void clearMetadata() { _metadata.reset(nullptr); }
void setMetadata(SensorMetadata& s) { _metadata.reset(new SensorMetadata(s)); }
void setSkipConstVal(bool skipConstVal) { _skipConstVal = skipConstVal; }
void setPublish(bool pub) { _publish = pub; }
void setDelta(const bool delta) { _delta = delta; }
void setName(const std::string& name) { _name = name; }
void setMqtt(const std::string& mqtt) { _mqtt = mqtt; }
......@@ -282,6 +287,7 @@ public:
}
LOG_VAR(ll) << leading << " Skip const values: " << (_skipConstVal ? "true" : "false");
LOG_VAR(ll) << leading << " Store delta only: " << (_delta ? "true" : "false");
LOG_VAR(ll) << leading << " Publish: " << (_publish ? "true" : "false");
}
protected:
......@@ -289,6 +295,7 @@ protected:
std::string _name;
std::string _mqtt;
bool _skipConstVal;
bool _publish;
unsigned int _cacheInterval;
int _subsamplingFactor;
unsigned int _subsamplingIndex;
......
......@@ -249,28 +249,29 @@ bool MQTTPusher::sendMappings() {
for(auto& p: _plugins) {
for(auto& g: p.configurator->getSensorGroups()) {
for(auto& s: g->acquireSensors()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
// If metadata is missing, we use a dummy temporary structure
if(!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
try {
payload = s->getMetadata()->getJSON();
} catch(const std::exception& e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
}
if(s->getPublish()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
// If metadata is missing, we use a dummy temporary structure
if (!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
try {
payload = s->getMetadata()->getJSON();
} catch (const std::exception &e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
}
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
} else
publishCtr++;
}
else
publishCtr++;
}
g->releaseSensors();
}
......@@ -282,26 +283,28 @@ bool MQTTPusher::sendMappings() {
if(op->getStreaming() && !op->getDynamic()) {
for (auto &u: op->getUnits())
for (auto &s: u->getBaseOutputs()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
if(!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
try {
payload = s->getMetadata()->getJSON();
} catch(const std::exception& e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
}
if (s->getPublish()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
if (!s->getMetadata()) {
SensorMetadata sm;
sm.publicName = s->getName();
s->setMetadata(sm);
}
try {
payload = s->getMetadata()->getJSON();
} catch (const std::exception &e) {
LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
continue;
}
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
} else
publishCtr++;
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
} else
publishCtr++;
}
}
op->releaseUnits();
}
......
......@@ -302,8 +302,9 @@ All the different plugins share some same general principles in common regarding
* __default__ (One can define the name of a template group (see below) whose values and sensors should be used as default)
3. Sensors hold only those attributes which are necessary to uniquely identify the target sensor. Common base attributes:
* __mqttsuffix__ (to make its [mqtt-topic](#mqttTopic) unique)
* __delta__ (identifies a monotonic sensor. If set to "on", differences between successive readings are collected)
* __delta__ (identifies a monotonic sensor. If set to "true", differences between successive readings are collected)
* __subSampling__ (subsampling factor S. If S>=1, only one reading every S is sent over MQTT, and the others are kept locally. If S<1, readings are never sent out and only kept locally)
* __publish__ (if set to "true", the sensor will be published when the auto-publish feature is enabled. Otherwise it is omitted. Default is "true".)
5. Be aware that naming of sensor/group/entity is not fixed. A plugin developer can name them as he likes, e.g. counter/multicounter/host.
6. It is possible to define template groups or entities in the config file, but not template sensors (as a sensor should only consists of attributs which make him unique this would not be too useful). To specify a template group/entity simply prefix its definition with `template_` (see the example below). You can reference them later by using the `default` attribute. A template entity can consist of groups and these in turn can consist of sensors. When using a template, all of its attribute values are copied to the actual sensor. Copied attributes can be overwritten in the actual entity/sensor (some of them even should be overwritten, e.g. the mqttPart). However, groups/sensors associated with a template are copied to the actual entity/group and can NOT be overwritten. One can specify further groups/sensors which are then added to those copied from the template. Template entitys/groups itself or sensors within them are never used in live operation of the plugin. They are purely cosmetic for convenient configuration.
......
......@@ -389,6 +389,8 @@ protected:
sBase.setDelta(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "subSampling")) {
sBase.setSubsampling(std::stoi(val.second.data()));
} else if (boost::iequals(val.first, "publish")) {
sBase.setPublish(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "metadata")) {
SensorMetadata sm;
try {
......@@ -986,7 +988,9 @@ protected:
sBase.setDelta(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "subSampling")) {
sBase.setSubsampling(std::stoi(val.second.data()));
} else if (boost::iequals(val.first, "metadata")) {
} else if (boost::iequals(val.first, "publish")) {
sBase.setPublish(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "metadata")) {
SensorMetadata sm;
try {
sm.parsePTREE(val.second);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment