Commit 0848af2f authored by Alessio Netti's avatar Alessio Netti
Browse files

Sensor name auto-publish support

- MQTT publish messages that have a topic preceded by the /DCDB_MAP/
keyword will be interpreted as sensor name auto-publish messages
- The payload of such messages is a string defining a sensor name, that
is used together with the MQTT topic to perform a "sensor publish"
action on the Cassandra db, like dcdbconfig does
parent 0f7909c7
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include <dcdb/connection.h> #include <dcdb/connection.h>
#include <dcdb/sensordatastore.h> #include <dcdb/sensordatastore.h>
#include <dcdb/sensorconfig.h>
#include "simplemqttserver.h" #include "simplemqttserver.h"
#include "messaging.h" #include "messaging.h"
...@@ -62,8 +63,11 @@ int keepRunning; ...@@ -62,8 +63,11 @@ int keepRunning;
bool statistics; bool statistics;
uint64_t msgCtr; uint64_t msgCtr;
uint64_t pmsgCtr; uint64_t pmsgCtr;
DCDB::Connection* dcdbConn;
DCDB::SensorDataStore *mySensorDataStore; DCDB::SensorDataStore *mySensorDataStore;
DCDB::SensorConfig *mySensorConfig;
DCDB::SensorCache mySensorCache; DCDB::SensorCache mySensorCache;
DCDB::SCError err;
/* Normal termination (SIGINT, CTRL+C) */ /* Normal termination (SIGINT, CTRL+C) */
void sigHandler(int sig) void sigHandler(int sig)
...@@ -130,64 +134,93 @@ int mqttCallback(SimpleMQTTMessage *msg) ...@@ -130,64 +134,93 @@ int mqttCallback(SimpleMQTTMessage *msg)
if (msg->isPublish()) if (msg->isPublish())
pmsgCtr++; pmsgCtr++;
uint64_t len;
/* /*
* Decode the message and put into the database. * Decode the message and put into the database.
*/ */
if (msg->isPublish()) { if (msg->isPublish()) {
mqttPayload buf, *payload; const char *topic = msg->getTopic().c_str();
uint64_t len; // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
// sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
len = msg->getPayloadLength(); // We use strncmp as it is the most efficient way to do it
//In the 64 bit message case, the collect agent provides a timestamp if (strlen(topic) > DCDB_MAP_LEN && strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
if (len == sizeof(uint64_t)) { if ((len = msg->getPayloadLength()) == 0) {
payload = &buf; cout << "Empty topic-to-name mapping message received\n";
payload->value = *((uint64_t*)msg->getPayload()); return 1;
payload->timestamp = Messaging::calculateTimestamp(); }
len = sizeof(uint64_t) * 2;
}
//...otherwise it just retrieves it from the MQTT message payload.
else if((len%sizeof(mqttPayload)==0) && (len>0)){
payload = (mqttPayload*)msg->getPayload();
}
//...otherwise this message is malformed -> ignore...
else {
cout << "Message malformed\n";
return 1;
}
/* string sensorName((char *) msg->getPayload(), len);
* Check if we can decode the message topic err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);
* into a valid SensorId. If successful, store
* the record in the database. // PublishSensor does most of the error checking for us
*/ switch (err) {
DCDB::SensorId sid; case DCDB::SC_INVALIDPATTERN:
if (sid.mqttTopicConvert(msg->getTopic())) { std::cout << "Invalid sensor topic : " << msg->getTopic() << std::endl;
#if 0 return 1;
cout << "Topic decode successful:" << endl case DCDB::SC_INVALIDPUBLICNAME:
<< " Raw: " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl std::cout << "Invalid sensor public name: " << sensorName << std::endl;
<< " DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl return 1;
<< " device_id: " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl case DCDB::SC_INVALIDSESSION:
<< " sensor_number: " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec; std::cout << "Cannot reach sensor data store." << std::endl;
return 1;
cout << "Payload (" << len/sizeof(mqttPayload) << " messages):"<< endl; default:
for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) { break;
cout << " " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl; }
} else {
mqttPayload buf, *payload;
len = msg->getPayloadLength();
//In the 64 bit message case, the collect agent provides a timestamp
if (len == sizeof(uint64_t)) {
payload = &buf;
payload->value = *((uint64_t *) msg->getPayload());
payload->timestamp = Messaging::calculateTimestamp();
len = sizeof(uint64_t) * 2;
} }
cout << endl; //...otherwise it just retrieves it from the MQTT message payload.
else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
payload = (mqttPayload *) msg->getPayload();
}
//...otherwise this message is malformed -> ignore...
else {
cout << "Message malformed\n";
return 1;
}
/*
* Check if we can decode the message topic
* into a valid SensorId. If successful, store
* the record in the database.
*/
DCDB::SensorId sid;
if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
cout << "Topic decode successful:" << endl
<< " Raw: " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
<< " DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
<< " device_id: " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
<< " sensor_number: " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;
cout << "Payload (" << len/sizeof(mqttPayload) << " messages):"<< endl;
for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
cout << " " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
}
cout << endl;
#endif #endif
for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) { for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
mySensorDataStore->insert(&sid, payload[i].timestamp, payload[i].value); mySensorDataStore->insert(&sid, payload[i].timestamp, payload[i].value);
mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value); mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
}
//mySensorCache.dump();
} }
//mySensorCache.dump();
}
#if 1 #if 1
else { else {
cout << "Wrong topic format: " << msg->getTopic() << "\n"; cout << "Wrong topic format: " << msg->getTopic() << "\n";
return 1; return 1;
} }
#endif #endif
}
} }
return 0; return 0;
} }
...@@ -318,7 +351,6 @@ int main(int argc, char* const argv[]) { ...@@ -318,7 +351,6 @@ int main(int argc, char* const argv[]) {
/* /*
* Allocate and initialize connection to Cassandra. * Allocate and initialize connection to Cassandra.
*/ */
DCDB::Connection* dcdbConn;
dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword); dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword);
if (!dcdbConn->connect()) { if (!dcdbConn->connect()) {
...@@ -335,6 +367,7 @@ int main(int argc, char* const argv[]) { ...@@ -335,6 +367,7 @@ int main(int argc, char* const argv[]) {
* Allocate the SensorDataStore. * Allocate the SensorDataStore.
*/ */
mySensorDataStore = new DCDB::SensorDataStore(dcdbConn); mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
mySensorConfig = new DCDB::SensorConfig(dcdbConn);
/* /*
* Set TTL for data store inserts if TTL > 0. * Set TTL for data store inserts if TTL > 0.
...@@ -405,6 +438,7 @@ int main(int argc, char* const argv[]) { ...@@ -405,6 +438,7 @@ int main(int argc, char* const argv[]) {
httpThread.join(); httpThread.join();
cout << "HTTP Server stopped..." << std::endl; cout << "HTTP Server stopped..." << std::endl;
delete mySensorDataStore; delete mySensorDataStore;
delete mySensorConfig;
dcdbConn->disconnect(); dcdbConn->disconnect();
delete dcdbConn; delete dcdbConn;
cout << "Collect Agent closed. Bye bye..." << std::endl; cout << "Collect Agent closed. Bye bye..." << std::endl;
......
...@@ -43,6 +43,9 @@ ...@@ -43,6 +43,9 @@
#define MQTT_PINGRESP 0xd #define MQTT_PINGRESP 0xd
#define MQTT_DISCONNECT 0xe #define MQTT_DISCONNECT 0xe
#define DCDB_MAP "/DCDB_MAP/"
#define DCDB_MAP_LEN 10
#pragma pack(push,1) #pragma pack(push,1)
typedef union { typedef union {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment