Commit b45ce395 authored by Alessio Netti's avatar Alessio Netti
Browse files

Fully-operational string MQTT topics

- String topics are now used across all of DCDB, up to Cassandra
- As of now, sensor names ALWAYS match MQTT topics
- Users can still define sensor "aliases" via dcdbconfig that point
to the same topics
parent fa4e87ff
......@@ -10,7 +10,7 @@ BOOST_VERSION = 1.58.0
OPENSSL_VERSION = 1.0.2l
CPPDRV_VERSION = 2.10.0
LIBUV_VERSION = 1.24.0
SOURCEFORGE_MROR = netcologne
SOURCEFORGE_MROR = vorboss
CPPNET_VERSION = 0.12.0-final
BACNET-STACK_VERSION = 0.8.5
FREEIPMI_VERSION = 1.5.5
......
......@@ -437,7 +437,7 @@ restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vect
string unit = SensorNavigator::rootKey;
for (auto& p : queries)
if (p.first == "unit")
unit = MQTTChecker::convertTopic(p.second);
unit = MQTTChecker::nameToTopic(p.second);
bool found=false, unitFound=false;
for (auto &p : _plugins)
......
......@@ -112,7 +112,7 @@ public:
void printConfig(LOG_LEVEL ll) final {
LOG_VAR(ll) << " General: ";
LOG_VAR(ll) << " MQTT-Prefix: " << (_mqttPrefix != "" ? _mqttPrefix : "DEFAULT");
LOG_VAR(ll) << " Cache interval: " << _cacheInterval << " ms";
LOG_VAR(ll) << " CacheInterval: " << _cacheInterval/1000 << " [s]";
//prints plugin specific configurator attributes and entities if present
printConfiguratorConfig(ll);
......
......@@ -24,7 +24,7 @@ void AnalyticsController::stop() {
}
restResponse_t AnalyticsController::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method) {
if(_initialized)
if(!_initialized)
throw runtime_error("Cannot forward REST command, AnalyticsController is not initialized!");
return _manager->REST(pathStrs, queries, method, _io);
}
......
......@@ -300,7 +300,7 @@ struct httpHandler_t {
//try getting the latest value
try {
std::string sensor = MQTTChecker::convertTopic(pathStrs[0]);
std::string sensor = MQTTChecker::nameToTopic(pathStrs[0]);
int64_t val = mySensorCache.getSensor(sensor, (uint64_t) time * 1000000000);
connection->set_status(httpServer_t::connection::ok);
response = "collectagent::" + sensor + " Average of last " +
......
......@@ -50,63 +50,63 @@ int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
}
}
int64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
//topic.erase(std::remove(topic.begin(), topic.end(), '/'), topic.end());
size_t wp = topic.find("*");
if (wp == std::string::npos) {
return getSensor(DCDB::SensorId(topic), avg);
}
int wl = 29 - topic.length();
//TODO: the wildcard part will likely not be supported with string topics
/* Create SensorIds with the lowest and highest values matching the wildcard */
DCDB::SensorId sidLow(std::string(topic).replace(wp, 1, std::string(wl, '0')));
DCDB::SensorId sidHi(std::string(topic).replace(wp, 1, std::string(wl, 'f')));
DCDB::SensorId sidMask(std::string(wp, 'f') + std::string(wl, '0') + std::string(topic.length()-wp-1, 'f'));
sidLow.setRsvd(0);
sidHi.setRsvd(0);
/* See whether there's a SensorId in the cache >= sidLow */
sensorCache_t::iterator it = sensorCache.lower_bound(sidLow);
sensorCache_t::iterator mostRecentSidIt = sensorCache.end();
bool foundOne = false;
/* Iterate over the cache until the current entry is > sidHi */
while ((it != sensorCache.end()) && (it->first <= sidHi)) {
if ((it->first & sidMask) == sidLow) {
foundOne = true;
/* We only return one value, even if multiple SensorIds would match.
* At least make sure it's the most recent value
*/
if (it->second.checkValid() && ((mostRecentSidIt == sensorCache.end()) || mostRecentSidIt->second.getLatest().timestamp < it->second.getLatest().timestamp)) {
mostRecentSidIt = it;
}
}
it++;
}
if (mostRecentSidIt == sensorCache.end()) {
/* Check whether we actually found at least an outdated entry */
if (foundOne) {
throw std::out_of_range("Sid outdated");
} else {
throw std::invalid_argument("Sid not found");
}
}
if (avg) {
return mostRecentSidIt->second.getAverage(avg);
} else {
return mostRecentSidIt->second.getLatest().value;
}
}
// Wildcards are not supported with string topics
//int64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
// topic.erase(std::remove(topic.begin(), topic.end(), '/'), topic.end());
//
// size_t wp = topic.find("*");
// if (wp == std::string::npos) {
// return getSensor(DCDB::SensorId(topic), avg);
// }
//
// int wl = 29 - topic.length();
//
// /* Create SensorIds with the lowest and highest values matching the wildcard */
// DCDB::SensorId sidLow(std::string(topic).replace(wp, 1, std::string(wl, '0')));
// DCDB::SensorId sidHi(std::string(topic).replace(wp, 1, std::string(wl, 'f')));
// DCDB::SensorId sidMask(std::string(wp, 'f') + std::string(wl, '0') + std::string(topic.length()-wp-1, 'f'));
// sidLow.setRsvd(0);
// sidHi.setRsvd(0);
//
// /* See whether there's a SensorId in the cache >= sidLow */
// sensorCache_t::iterator it = sensorCache.lower_bound(sidLow);
// sensorCache_t::iterator mostRecentSidIt = sensorCache.end();
//
// bool foundOne = false;
// /* Iterate over the cache until the current entry is > sidHi */
// while ((it != sensorCache.end()) && (it->first <= sidHi)) {
// if ((it->first & sidMask) == sidLow) {
// foundOne = true;
// /* We only return one value, even if multiple SensorIds would match.
// * At least make sure it's the most recent value
// */
// if (it->second.checkValid() && ((mostRecentSidIt == sensorCache.end()) || mostRecentSidIt->second.getLatest().timestamp < it->second.getLatest().timestamp)) {
// mostRecentSidIt = it;
// }
// }
// it++;
// }
//
// if (mostRecentSidIt == sensorCache.end()) {
// /* Check whether we actually found at least an outdated entry */
// if (foundOne) {
// throw std::out_of_range("Sid outdated");
// } else {
// throw std::invalid_argument("Sid not found");
// }
// }
//
// if (avg) {
// return mostRecentSidIt->second.getAverage(avg);
// } else {
// return mostRecentSidIt->second.getLatest().value;
// }
//}
void SensorCache::dump() {
std::cout << "SensorCache Dump:" << std::endl;
for (sensorCache_t::iterator sit = sensorCache.begin(); sit != sensorCache.end(); sit++) {
std::cout << " id=" << sit->first.toString() << " data=[";
std::cout << " id=" << sit->first.getId() << " data=[";
for (std::vector<reading_t>::const_iterator eit = sit->second.getRaw()->begin(); eit != sit->second.getRaw()->end(); eit++) {
if (eit != sit->second.getRaw()->begin()) {
std::cout << ",";
......
......@@ -61,7 +61,7 @@ public:
* @throws std::invalid_argument if the topic couldn't be found in the SensorCache.
* @throws std::out_of_range if the topic was found in the cache entry but is outdated.
**/
int64_t getSensor(std::string topic, uint64_t avg=0);
// int64_t getSensor(std::string topic, uint64_t avg=0);
/**
* @brief Dump the contents of the SensorCache to stdout.
......
......@@ -10,6 +10,7 @@
#include "logging.h"
#define MQTT_SEP '/'
#define NAME_SEP '.'
/**
* Class that manages constraint for MQTT topic formatting
......@@ -35,18 +36,32 @@ public:
}
/**
* @brief Replaces all characters in a MQTT topics matching a token and sanitizes the result
* @brief Converts a sensor name to its internal MQTT topic representation
*
* @param topic The topic or suffix to be processed
* @param tok The token character to be replaced with forward slashes
* @param name The sensor name to be processed
* @return The processed MQTT topic
*/
static std::string convertTopic(const std::string& topic, const char tok='.') {
if(topic.empty()) return topic;
std::string newTopic = topic;
std::replace(newTopic.begin(), newTopic.end(), tok, MQTT_SEP);
static std::string nameToTopic(const std::string& name) {
if(name.empty()) return name;
std::string newTopic = name;
std::replace(newTopic.begin(), newTopic.end(), NAME_SEP, MQTT_SEP);
if(!newTopic.empty() && newTopic[0] != MQTT_SEP) newTopic.insert(0, 1, MQTT_SEP);
return newTopic;
}
/**
* @brief Converts an MQTT topic to the name representation exposed to users
*
* @param topic The topic to be processed
* @return The processed sensor name
*/
static std::string topicToName(const std::string& topic) {
if(topic.empty()) return topic;
std::string newName = topic;
std::replace(newName.begin(), newName.end(), MQTT_SEP, NAME_SEP);
if(!newName.empty() && newName[0] == NAME_SEP) newName.erase(0, 1);
return newName;
}
/**
* @brief Sanitizes and formats a MQTT topic or suffix
......
......@@ -212,7 +212,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
goto error;
}
std::string sensor = MQTTChecker::convertTopic(pathStrs[1]);
std::string sensor = MQTTChecker::nameToTopic(pathStrs[1]);
std::string action = pathStrs[2];
uint64_t time = 0;
......
......@@ -14,6 +14,7 @@ OBJS = src/connection.o \
src/timestamp.o \
src/sensorconfig.o \
src/sensorid.o \
src/sensorid_num.o \
src/unitconv.o \
src/virtualsensor.o \
src/c_api.o \
......
......@@ -45,6 +45,8 @@
#ifndef DCDB_SENSORCONFIG_H
#define DCDB_SENSORCONFIG_H
#define MAX_PATTERN_LENGTH 64
namespace DCDB {
/* Forward-declaration of the implementation-internal classes */
......
......@@ -2,7 +2,7 @@
* sensorid.h
*
* Created on: May 18, 2015
* Author: Axel Auweter
* Author: Axel Auweter, Alessio Netti
*/
#include <cstdint>
......@@ -16,44 +16,6 @@ namespace DCDB {
/* Ensure that the unions and structs are created without padding */
#pragma pack(push,1)
/**
* @brief The DeviceLocation type describes the location of a device. A
* device is the smallest piece of hardware containing sensors.
*
* The location of a device is highly specific to the system architecture
* and thus, it is only treated as unsigned 64 bit integer internally.
* Since these bits, however, make up for the location of the data within
* the distributed database, it is recommended to assign a globally used
* schema in advance leaving the higher-order bits to higher level
* entities.
*
* Example:
* ----------------------------------------------
* | 8 Bits | 8 Bits | 16 Bits | 8 Bits | ...
* ----------------------------------------------
* | Data | | | |
* | Center | Cluster | Rack | Chassis | ...
* | ID | ID | ID | ID |
* ----------------------------------------------
*/
typedef uint64_t DeviceLocation;
/**
* @brief The DeviceSensorId type describes the tuple of the sensor
* number and a unique (location-independent) device id.
* In combination with a location specified by the DeviceLocation type,
* the sensor_number defines a sensor by location.
*
* The device_id member is - in return - used to uniquely identify
* certain components, even when their location changes. A suitable
* approach is, for example, to use MAC addresses as the device_id.
*/
typedef struct {
uint64_t rsvd : 16; /**< Reserved */
uint64_t sensor_number : 16; /**< The sensor_number of the sensor */
uint64_t device_id : 32; /**< The location-independent device_id */
} DeviceSensorId;
/**
* @brief The SensorId class packs the DeviceLocation and DeviceSensorId
* types into a single object.
......@@ -67,87 +29,53 @@ typedef struct {
* DeviceSensorId.
*/
class SensorId {
protected:
union {
uint64_t raw[2]; /**< The raw bit-field representation of a sensor */
struct {
DeviceLocation dl;
DeviceSensorId dsid;
};
} data;
public:
DeviceLocation getDeviceLocation();
void setDeviceLocation(DeviceLocation dl);
DeviceSensorId getDeviceSensorId();
void setDeviceSensorId(DeviceSensorId dsid);
uint16_t getSensorNumber();
void setSensorNumber(uint16_t sn);
uint16_t getRsvd();
void setRsvd(uint16_t rsvd);
uint32_t getDeviceId();
void setDeviceId(uint32_t did);
uint64_t* getRaw();
void setRaw(uint64_t* raw);
/**
* @brief This function populates the data field from a MQTT topic string.
* @param topic The string from which the SensorId object will be populated.
* @return Returns true if the topic string was valid and the data field of the object was populated.
*/
bool mqttTopicConvert(std::string mqttTopic);
/**
* @brief This function returns a "key" string which
* corresponds to the supplied SensorId object.
* @return Returns the serialized string that can be used by Cassandra as row key.
*/
std::string serialize() const;
/**
* @brief This function returns a hex string which corresponds to the
* supplied SensorId object.
* @return Returns a human-readable hex string of the SensorId.
*/
std::string toString() const;
/**
* @brief This function matches the sensor against a
* sensor pattern string.
* @return Returns true if the sensor name matches the pattern, false otherwise.
*/
bool patternMatch(std::string pattern);
inline bool operator == (const SensorId& rhs) const {
return (data.raw[0] == rhs.data.raw[0]) &&
(data.raw[1] == rhs.data.raw[1]);}
inline bool operator < (const SensorId& rhs) const {
if (data.raw[0] == rhs.data.raw[0])
return data.raw[1] < rhs.data.raw[1];
else
return data.raw[0] < rhs.data.raw[0];
}
inline bool operator <= (const SensorId& rhs) const {
if (data.raw[0] == rhs.data.raw[0])
return data.raw[1] <= rhs.data.raw[1];
protected:
std::string data;
uint16_t rsvd;
public:
void setId(std::string &d) { if (!mqttTopicConvert(d)) data = ""; }
void setRsvd(uint16_t rs) { rsvd = rs; }
std::string getId() const { return data; }
uint16_t getRsvd() const { return rsvd; }
/**
* @brief This function populates the data field from a MQTT topic string.
* @param topic The string from which the SensorId object will be populated.
* @return Returns true if the topic string was valid and the data field of the object was populated.
*/
bool mqttTopicConvert(std::string mqttTopic);
/**
* @brief This function matches the sensor against a
* sensor pattern string.
* @return Returns true if the sensor name matches the pattern, false otherwise.
*/
bool patternMatch(std::string pattern);
inline bool operator == (const SensorId& rhs) const {
return (data == rhs.data) && (rsvd == rhs.rsvd);
}
inline bool operator < (const SensorId& rhs) const {
if (data == rhs.data)
return rsvd < rhs.rsvd;
else
return data.raw[0] < rhs.data.raw[0];
}
inline SensorId operator & (const SensorId& rhs) const {
SensorId sid;
sid.data.raw[0] = (data.raw[0] & rhs.data.raw[0]);
sid.data.raw[1] = (data.raw[1] & rhs.data.raw[1]);
return sid;
}
return data < rhs.data;
}
SensorId();
SensorId(std::string mqttTopic);
virtual ~SensorId();
};
inline bool operator <= (const SensorId& rhs) const {
if (data == rhs.data)
return rsvd <= rhs.rsvd;
else
return data < rhs.data;
}
SensorId();
SensorId(std::string mqttTopic);
virtual ~SensorId();
};
#pragma pack(pop)
......
/*
* sensorid.h
*
* Created on: May 18, 2015
* Author: Axel Auweter
*/
#include <cstdint>
#include <string>
#ifndef DCDB_SENSORIDNUM_H
#define DCDB_SENSORIDNUM_H
namespace DCDB {
/* Ensure that the unions and structs are created without padding */
#pragma pack(push,1)
/**
* @brief The DeviceLocation type describes the location of a device. A
* device is the smallest piece of hardware containing sensors.
*
* The location of a device is highly specific to the system architecture
* and thus, it is only treated as unsigned 64 bit integer internally.
* Since these bits, however, make up for the location of the data within
* the distributed database, it is recommended to assign a globally used
* schema in advance leaving the higher-order bits to higher level
* entities.
*
* Example:
* ----------------------------------------------
* | 8 Bits | 8 Bits | 16 Bits | 8 Bits | ...
* ----------------------------------------------
* | Data | | | |
* | Center | Cluster | Rack | Chassis | ...
* | ID | ID | ID | ID |
* ----------------------------------------------
*/
typedef uint64_t DeviceLocation;
/**
* @brief The DeviceSensorId type describes the tuple of the sensor
* number and a unique (location-independent) device id.
* In combination with a location specified by the DeviceLocation type,
* the sensor_number defines a sensor by location.
*
* The device_id member is - in return - used to uniquely identify
* certain components, even when their location changes. A suitable
* approach is, for example, to use MAC addresses as the device_id.
*/
typedef struct {
uint64_t rsvd : 16; /**< Reserved */
uint64_t sensor_number : 16; /**< The sensor_number of the sensor */
uint64_t device_id : 32; /**< The location-independent device_id */
} DeviceSensorId;
/**
* @brief The SensorId class packs the DeviceLocation and DeviceSensorId
* types into a single object.
*
* In DCDB, a sensor is described by it's location, a sensor number at
* this location, and a unique part number identifier. This information
* is packed into a 128 bit wide bitfield.
*
* Access to this bitfield can be done through the raw member of through
* the two variables dl and dsid of the helper types DeviceLocation and
* DeviceSensorId.
*/
class SensorIdNumerical {
protected:
union {
uint64_t raw[2]; /**< The raw bit-field representation of a sensor */
struct {
DeviceLocation dl;
DeviceSensorId dsid;
};
} data;
public:
DeviceLocation getDeviceLocation();
void setDeviceLocation(DeviceLocation dl);
DeviceSensorId getDeviceSensorId();
void setDeviceSensorId(DeviceSensorId dsid);
uint16_t getSensorNumber();
void setSensorNumber(uint16_t sn);
uint16_t getRsvd();
void setRsvd(uint16_t rsvd);
uint32_t getDeviceId();
void setDeviceId(uint32_t did);
uint64_t* getRaw();
void setRaw(uint64_t* raw);
/**
* @brief This function populates the data field from a MQTT topic string.
* @param topic The string from which the SensorId object will be populated.
* @return Returns true if the topic string was valid and the data field of the object was populated.
*/
bool mqttTopicConvert(std::string mqttTopic);
/**
* @brief This function returns a "key" string which
* corresponds to the supplied SensorId object.
* @return Returns the serialized string that can be used by Cassandra as row key.
*/
std::string serialize() const;
/**
* @brief This function returns a hex string which corresponds to the
* supplied SensorId object.
* @return Returns a human-readable hex string of the SensorId.
*/
std::string toString() const;
/**
* @brief This function matches the sensor against a
* sensor pattern string.
* @return Returns true if the sensor name matches the pattern, false otherwise.
*/
bool patternMatch(std::string pattern);
inline bool operator == (const SensorIdNumerical& rhs) const {
return (data.raw[0] == rhs.data.raw[0]) &&
(data.raw[1] == rhs.data.raw[1]);}
inline bool operator < (const SensorIdNumerical& rhs) const {
if (data.raw[0] == rhs.data.raw[0])
return data.raw[1] < rhs.data.raw[1];
else
return data.raw[0] < rhs.data.raw[0];
}
inline bool operator <= (const SensorIdNumerical& rhs) const {
if (data.raw[0] == rhs.data.raw[0])
return data.raw[1] <= rhs.data.raw[1];
else
return data.raw[0] < rhs.data.raw[0];
}
inline SensorIdNumerical operator & (const SensorIdNumerical& rhs) const {
SensorIdNumerical sid;
sid.data.raw[0] = (data.raw[0] & rhs.data.raw[0]);
sid.data.raw[1] = (data.raw[1] & rhs.data.raw[1]);
return sid;
}
SensorIdNumerical();
SensorIdNumerical(std::string mqttTopic);
virtual ~SensorIdNumerical();
};
#pragma pack(pop)
} /* End of namespace DCDB */
#endif /* DCDB_SENSORIDNUM_H */
......@@ -488,16 +488,16 @@ bool ConnectionImpl::initSchema() {
if (!existsColumnFamily(CF_SENSORDATA)) {
std::cout << "Creating Column Family " CF_SENSORDATA "...\n";
createColumnFamily(CF_SENSORDATA,
"sid blob, ts bigint, value bigint",
"sid, ts",
"sid varchar, ws smallint, ts bigint, value bigint",
"sid, ws, ts",
"COMPACT STORAGE AND gc_grace_seconds = " SENSORDATA_GC_GRACE_SECONDS );
}