Commit 44ca1b2e authored by Daniele Tafani's avatar Daniele Tafani

Stable version of Grafana Server (Works with Grafana 6.2.2)

parent 52eaf253
include ../config.mk
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../include/ -I../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/cxxopts/src -DASIO_HEADER_ONLY -DBOOST_TEST_DYN_LINK
OBJS = collectagent.o \
sensorcache.o \
simplemqttserver.o \
simplemqttserverthread.o \
simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -L../lib -ldcdb -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_regex -lcppnetlib-server-parsers -lcppnetlib-uri
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-function -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../common/include/ -I../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_LOG_DYN_LINK -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
OBJS = ../common/src/logging.o \
sensorcache.o \
collectagent.o \
configuration.o \
simplemqttserver.o \
simplemqttserverthread.o \
simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -L../lib -ldcdb -pthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time -lboost_log_setup -lboost_log -lboost_regex -lcppnetlib-server-parsers -lcppnetlib-uri
TARGET = collectagent
.PHONY : clean install
......@@ -25,5 +27,8 @@ clean:
rm -f $(TARGET)
rm -f $(OBJS)
install: $(TARGET)
install_conf: config/collectagent.conf
install -m 644 $^ $(DCDBDEPLOYPATH)/etc/
install: $(TARGET) install_conf
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
This diff is collapsed.
......@@ -6,118 +6,51 @@
*/
#include "sensorcache.h"
#include <dcdb/timestamp.h>
#include <exception>
#include <iostream>
#include <iomanip>
#include <algorithm>
namespace DCDB {
SensorCache::SensorCache() {
// TODO Auto-generated constructor stub
SensorCache::SensorCache(uint64_t maxHistory) {
this->_maxHistory = maxHistory;
}
SensorCache::~SensorCache() {
// TODO Auto-generated destructor stub
sensorCache.clear();
}
void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
sensorReading_t s = { val, ts };
const sensorCache_t& SensorCache::getSensorMap() {
return sensorCache;
}
void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
reading_t s = { val, ts };
/* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0);
sensorCache_t::iterator it = sensorCache.find(sid);
if (it != sensorCache.end()) {
if (it->second.size() && (it->second.front().timestamp+MAX_HISTORY_NS <= ts)) {
it->second.pop_front();
}
it->second.push_back(s);
} else {
sensorCache[sid].push_back(s);
}
}
bool SensorCache::checkValid(cacheEntry_t &entry) {
if (entry.size() > 2) {
TimeStamp ts;
uint64_t avg = 0;
cacheEntry_t::iterator it = entry.begin();
uint64_t prev = it->timestamp;
for (it++; it != entry.end(); it++) {
avg+= it->timestamp - prev;
prev = it->timestamp;
}
avg/= (entry.size()-1);
/*
* A SID is outdated if it's older than 5x the average sampling period.
*/
if ((ts.getRaw() - entry.back().timestamp) > 5 * avg) {
return false;
}
}
return true;
//TODO: Check for thread-safety
if(sensorCache.find(sid) == sensorCache.end())
sensorCache[sid] = CacheEntry(_maxHistory);
sensorCache[sid].store(s);
}
int64_t SensorCache::getAverage(cacheEntry_t &entry, uint64_t avg) {
TimeStamp ts;
if (entry.size() > 0) {
if (ts.getRaw() - entry.front().timestamp < avg * NS_PER_S) {
throw std::out_of_range("Sid outdated");
return 0;
}
double sum = 0;
cacheEntry_t::reverse_iterator it, prev;
it = prev = entry.rbegin();
it++;
while ((it != entry.rend()) && ((ts.getRaw() - it->timestamp) <= avg * NS_PER_S)) {
uint64_t deltaT = (prev->timestamp - it->timestamp);
sum+= ((it->val + prev->val) / 2) * deltaT;
//std::cout << "SensorCache::getAverage sum=" << sum << " deltaT=" <<deltaT << " it=(" << it->timestamp << "," <<it->val <<") prev=(" << prev->timestamp << "," << prev->val <<") " << (ts.getRaw() - it->timestamp) << std::endl;
prev = it++;
}
//std::cout << "SensorCache::getAverage (" << prev->timestamp << "," <<prev->val <<") (" << entry.back().timestamp << "," << entry.back().val << ") sum=" << sum << " deltaT=" << entry.back().timestamp - prev->timestamp << std::endl;
if (prev == entry.rbegin()) {
return entry.back().val;
} else {
return sum/(entry.back().timestamp - prev->timestamp);
}
}
throw std::invalid_argument("Sid not found");
return 0;
}
uint64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
/* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0);
sensorCache_t::iterator it = sensorCache.find(sid);
if (it == sensorCache.end()) {
throw std::invalid_argument("Sid not found");
return 0;
}
if (!checkValid(it->second))
if (!it->second.checkValid())
{
throw std::out_of_range("Sid outdated");
return 0;
}
if (avg) {
return getAverage(it->second, avg);
return it->second.getAverage(avg);
} else {
return it->second.back().val;
return it->second.getLatest().value;
}
}
uint64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
int64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
topic.erase(std::remove(topic.begin(), topic.end(), '/'), topic.end());
size_t wp = topic.find("*");
......@@ -125,7 +58,7 @@ uint64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
return getSensor(DCDB::SensorId(topic), avg);
}
int wl = 33 - topic.length();
int wl = 29 - topic.length();
/* Create SensorIds with the lowest and highest values matching the wildcard */
DCDB::SensorId sidLow(std::string(topic).replace(wp, 1, std::string(wl, '0')));
......@@ -146,7 +79,7 @@ uint64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
/* We only return one value, even if multiple SensorIds would match.
* At least make sure it's the most recent value
*/
if (checkValid(it->second) && ((mostRecentSidIt == sensorCache.end()) || mostRecentSidIt->second.back().timestamp < it->second.back().timestamp)) {
if (it->second.checkValid() && ((mostRecentSidIt == sensorCache.end()) || mostRecentSidIt->second.getLatest().timestamp < it->second.getLatest().timestamp)) {
mostRecentSidIt = it;
}
}
......@@ -163,9 +96,9 @@ uint64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
}
if (avg) {
return getAverage(mostRecentSidIt->second, avg);
return mostRecentSidIt->second.getAverage(avg);
} else {
return mostRecentSidIt->second.back().val;
return mostRecentSidIt->second.getLatest().value;
}
}
......@@ -173,16 +106,26 @@ void SensorCache::dump() {
std::cout << "SensorCache Dump:" << std::endl;
for (sensorCache_t::iterator sit = sensorCache.begin(); sit != sensorCache.end(); sit++) {
std::cout << " id=" << sit->first.toString() << " data=[";
for (cacheEntry_t::iterator eit = sit->second.begin(); eit != sit->second.end(); eit++) {
if (eit != sit->second.begin()) {
for (std::vector<reading_t>::const_iterator eit = sit->second.getRaw()->begin(); eit != sit->second.getRaw()->end(); eit++) {
if (eit != sit->second.getRaw()->begin()) {
std::cout << ",";
}
std::cout << "(" << eit->val << "," << eit->timestamp/NS_PER_S << "." << std::setfill ('0') << std::setw (9) << eit->timestamp%NS_PER_S << ")";
std::cout << "(" << eit->value << "," << eit->timestamp/NS_PER_S << "." << std::setfill ('0') << std::setw (9) << eit->timestamp%NS_PER_S << ")";
}
std::cout << "]" << std::endl;
}
}
} /* namespace DCDB */
uint64_t SensorCache::clean(uint64_t t) {
uint64_t thresh = getTimestamp() - t;
uint64_t ctr = 0;
for (auto it = sensorCache.cbegin(); it != sensorCache.cend();) {
uint64_t latestTs = it->second.getLatest().timestamp;
if (latestTs!=0 && latestTs < thresh) {
it = sensorCache.erase(it);
ctr++;
} else
++it;
}
return ctr;
}
......@@ -9,65 +9,96 @@
#define COLLECTAGENT_SENSORCACHE_H_
#include <map>
#include <list>
#include <utility>
#include <dcdb/sensorid.h>
#include <dcdb/timestamp.h>
#include "cacheentry.h"
#define MAX_HISTORY_NS 60000000000 // Store max 60s of historic data
using namespace DCDB;
namespace DCDB {
typedef struct {
int64_t val;
uint64_t timestamp;
} sensorReading_t;
typedef std::list<sensorReading_t> cacheEntry_t;
typedef std::map<SensorId, cacheEntry_t> sensorCache_t;
typedef std::map<SensorId, CacheEntry> sensorCache_t;
class SensorCache {
public:
SensorCache();
SensorCache(uint64_t maxHistory=60000000000);
virtual ~SensorCache();
/**
* @brief Store a sensor reading in the SensorCache.
* @param sid The SensorId of the sensor to be cached.
* @param ts The timestamp of the sensor reading.
* @param val The actual sensor reading.
* @return Returns true if the topic string was valid and the data field of the object was populated.
*/
* @brief Returns a constant reference to the internal sensor cache map.
**/
const sensorCache_t& getSensorMap();
/**
* @brief Store a sensor reading in the SensorCache.
*
* @param sid The SensorId of the sensor to be cached.
* @param ts The timestamp of the sensor reading.
* @param val The actual sensor reading.
* @return Returns true if the topic string was valid and the data field of the object was populated.
**/
void storeSensor(SensorId sid, uint64_t ts, int64_t val);
/**
* @brief Return a sensor reading from the SensorCache.
* @param sid The SensorId of the sensor to be looked up in the cache.
* @return The sensor reading of the corresponding cache entry.
* @throws std::invalid_argument if the SensorId doesn't exist in the SensorCache.
* @throws std::out_of_range if the sid was found in the cache entry but is outdated.
*/
uint64_t getSensor(SensorId sid, uint64_t avg=0);
/**
* @brief Return a sensor reading from the SensorCache.
* @param topic The topic of the sensor to be looked up in the cache. May contain wildcards.
* @return The sensor reading of the corresponding cache entry.
* @throws std::invalid_argument if the topic couldn't be found in the SensorCache.
* @throws std::out_of_range if the topic was found in the cache entry but is outdated.
*/
uint64_t getSensor(std::string topic, uint64_t avg=0);
/**
* @brief Dump the contents of the SensorCache to stdout.
*/
/**
* @brief Return a sensor reading or the average of the last readings
* from the SensorCache.
*
* @param sid The SensorId of the sensor to be looked up in the cache.
* @param avg If avg > 0: denotes the length of the average aggregation window in nanoseconds.
* @return If avg == 0 :The sensor reading of the corresponding cache entry.
* If avg > 0 the average of the last readings is returned.
* @throws std::invalid_argument if the SensorId doesn't exist in the SensorCache.
* @throws std::out_of_range if the sid was found in the cache entry but is outdated.
**/
int64_t getSensor(SensorId sid, uint64_t avg=0);
/**
* @brief Return a sensor reading or the average of the last readings
* from the SensorCache.
*
* @param topic The topic of the sensor to be looked up in the cache. May contain wildcards.
* @param avg If avg > 0: denotes the length of the average aggregation window in nanoseconds.
* @return If avg == 0 :The sensor reading of the corresponding cache entry.
* If avg > 0 the average of the last readings is returned.
* @throws std::invalid_argument if the topic couldn't be found in the SensorCache.
* @throws std::out_of_range if the topic was found in the cache entry but is outdated.
**/
int64_t getSensor(std::string topic, uint64_t avg=0);
/**
* @brief Dump the contents of the SensorCache to stdout.
**/
void dump();
/**
* @brief Removes all obsolete entries from the cache
*
* All entries in the cache whose latest sensor reading is older than "now" - t nanoseconds are
* removed.
*
* @param t The threshold in nanoseconds for entries that must be removed
* @return The number of purged cache entries
**/
uint64_t clean(uint64_t t);
/**
* @brief Set a new maximum cache length.
*
* @param maxHistory: new sensor cache length value.
**/
void setMaxHistory(uint64_t maxHistory) { this->_maxHistory = maxHistory; }
/**
* @brief Returns the current maximum sensor cache length
*
* @return Current maximum sensor cache length
*/
uint64_t getMaxHistory() { return this->_maxHistory; }
private:
bool checkValid(cacheEntry_t &entry);
int64_t getAverage(cacheEntry_t &entry, uint64_t avg);
// Map containing the single sensor caches
sensorCache_t sensorCache;
// Global maximum allowed time frame for the sensor caches
uint64_t _maxHistory;
};
} /* namespace DCDB */
#endif /* COLLECTAGENT_SENSORCACHE_H_ */
......@@ -104,12 +104,12 @@ void SimpleMQTTServer::initSockets(void)
* Bind and listen on socket.
*/
if (::bind(sock, ainfo_cur->ai_addr, ainfo_cur->ai_addrlen) == -1) {
cout << "Warning: could not bind to socket, ignoring socket.\n";
LOG(warning) << "Could not bind to socket, ignoring socket.";
close(sock);
continue;
}
if (listen(sock, SimpleMQTTMaxBacklog) == -1) {
cout << "Warning: could not listen on socket, ignoring socket.\n";
LOG(warning) << "Could not listen on socket, ignoring socket.";
close(sock);
continue;
}
......@@ -137,7 +137,7 @@ void SimpleMQTTServer::start()
* Start one accept thread per socket.
*/
for (unsigned int i=0; i<listenSockets.size(); i++)
acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i], messageCallback));
acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i], messageCallback, this->_maxThreads, this->_maxConnPerThread));
}
void SimpleMQTTServer::stop()
......@@ -191,11 +191,13 @@ SimpleMQTTServer::SimpleMQTTServer()
init("localhost", "1883");
}
SimpleMQTTServer::SimpleMQTTServer(std::string addr, std::string port)
SimpleMQTTServer::SimpleMQTTServer(std::string addr, std::string port, uint64_t maxThreads, uint64_t maxConnPerThread)
{
/*
* Initialize server to listen on specified address and port.
*/
this->_maxThreads = maxThreads;
this->_maxConnPerThread = maxConnPerThread;
init(addr, port);
}
......
......@@ -42,30 +42,17 @@
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include "logging.h"
#ifndef SIMPLEMQTTSERVER_H_
#define SIMPLEMQTTSERVER_H_
//#define SimpleMQTTVerbose 100
/*
* Define the number of connections that are handled by each thread.
* If the number of clients exceeds the specified value, a new thread will be spawned.
*/
#ifndef SimpleMQTTConnectionsPerThread
#define SimpleMQTTConnectionsPerThread 16
#endif
#ifndef SimpleMQTTConnectionsQueueLength
#define SimpleMQTTConnectionsQueueLength 4
#endif
/*
* Define the maximum number of threads that will be spawned per socket.
*/
#ifndef SimpleMQTTMaxThreadsPerSocket
#define SimpleMQTTMaxThreadsPerSocket 128
#endif
/*
* Define the maximum backlog size for listen().
*/
......@@ -114,12 +101,16 @@ typedef int (*SimpleMQTTMessageCallback)(SimpleMQTTMessage*);
class SimpleMQTTServer
{
protected:
uint64_t _maxThreads;
uint64_t _maxConnPerThread;
std::string listenAddress;
std::string listenPort;
boost::ptr_vector<int> listenSockets;
boost::ptr_list<SimpleMQTTServerAcceptThread> acceptThreads;
SimpleMQTTMessageCallback messageCallback;
logger_t lg;
void init(std::string addr, std::string port);
void initSockets(void);
......@@ -129,7 +120,7 @@ public:
void setMessageCallback(SimpleMQTTMessageCallback callback);
SimpleMQTTServer();
SimpleMQTTServer(std::string addr, std::string port);
SimpleMQTTServer(std::string addr, std::string port, uint64_t maxThreads=128, uint64_t maxConnPerThread=16);
virtual
~SimpleMQTTServer();
};
......
......@@ -44,6 +44,7 @@ SimpleMQTTMessage::SimpleMQTTMessage()
bytesProcessed = 0;
remainingRaw = NULL;
remainingLength = 0;
bufferLength = 0;
fixedHeaderLength = 0;
msgId = 0;
payloadLength = 0;
......@@ -59,6 +60,17 @@ SimpleMQTTMessage::~SimpleMQTTMessage()
free(remainingRaw);
}
void SimpleMQTTMessage::clear() {
//We reset all variables except for the internal buffer, which is recycled
state = Empty;
bytesProcessed = 0;
remainingLength = 0;
fixedHeaderLength = 0;
msgId = 0;
payloadLength = 0;
payloadPtr = NULL;
}
ssize_t SimpleMQTTMessage::decodeFixedHeader(void* buf, size_t len)
{
/*
......@@ -139,8 +151,10 @@ ssize_t SimpleMQTTMessage::receiveMessage(void* buf, size_t len)
* If we are in this function for the first time,
* we need to allocate the buffer.
*/
if (!remainingRaw) {
if (!remainingRaw || remainingLength > bufferLength) {
if(remainingRaw) free(remainingRaw);
remainingRaw = malloc(remainingLength);
bufferLength = remainingLength;
if (!remainingRaw) {
throw new boost::system::system_error(errno, boost::system::system_category(), "Error in SimpleMQTTMessage::receiveMessage().");
}
......@@ -171,7 +185,7 @@ ssize_t SimpleMQTTMessage::receiveMessage(void* buf, size_t len)
*/
ssize_t topicLen = ntohs(((uint16_t*) data)[0]);
data+= 2;
topic = string(data, topicLen);
data+= topicLen;
......
......@@ -43,6 +43,9 @@
#define MQTT_PINGRESP 0xd
#define MQTT_DISCONNECT 0xe
#define DCDB_MAP "/DCDB_MAP/"
#define DCDB_MAP_LEN 10
#pragma pack(push,1)
typedef union {
......@@ -77,6 +80,7 @@ protected:
uint16_t msgId;
std::string topic;
void *remainingRaw;
size_t bufferLength;
size_t payloadLength;
void *payloadPtr;
......@@ -98,6 +102,7 @@ public:
SimpleMQTTMessage();
virtual ~SimpleMQTTMessage();
void clear();
};
#endif /* SIMPLEMQTTMESSAGE_H_ */
This diff is collapsed.