Commit ee0e92b9 authored by Alessio Netti's avatar Alessio Netti
Browse files

Optimization

- Round-robin thread-to-connection assignment policy (speedup on
small instantiations)
- Sensor cache entries are pre-allocated to avoid reallocations, and
shrunk to the correct size later
- Avoiding heap allocations during message processing and recycling buffers
parent 6646c409
......@@ -137,7 +137,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
// We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
// sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
// We use strncmp as it is the most efficient way to do it
if (strlen(topic) > DCDB_MAP_LEN && strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
if ((len = msg->getPayloadLength()) == 0) {
LOG(error) << "Empty topic-to-name mapping message received";
return 1;
......
......@@ -31,6 +31,7 @@ void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
sensorReading_t s = { val, ts };
/* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0);
//TODO: Check for thread-safety
if(sensorCache.find(sid) == sensorCache.end())
sensorCache[sid] = CacheEntry(_maxHistory);
sensorCache[sid].store(s);
......@@ -81,7 +82,7 @@ int64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
sensorCache_t::iterator mostRecentSidIt = sensorCache.end();
bool foundOne = false;
// TODO: check also this
// TODO: Remote sensorIDs and switch to simple topics
/* Iterate over the cache until the current entry is > sidHi */
while ((it != sensorCache.end()) && (it->first <= sidHi)) {
if ((it->first & sidMask) == sidLow) {
......@@ -134,13 +135,15 @@ CacheEntry::CacheEntry(uint64_t maxHistory) {
_maxHistory = maxHistory;
_stable = false;
_cacheIndex = 0;
//We pre-allocate the cache to a initial guess of 600 elements - 10 minutes at 1s sampling period
_cache.reserve(600);
}
CacheEntry::~CacheEntry() {
_cache.clear();
}
const std::vector<sensorReading_t>& CacheEntry::getCache() { return _cache; }
std::vector<sensorReading_t>& CacheEntry::getCache() { return _cache; }
sensorReading_t CacheEntry::getLatest() { return _cache[_cacheIndex]; }
......@@ -158,8 +161,11 @@ void CacheEntry::store(sensorReading_t reading) {
_cacheIndex = _stable ? (_cacheIndex + 1) % _cache.size() : (_cacheIndex + 1);
if(!_stable) {
_cache.push_back(reading);
if(_cache.front().timestamp + _maxHistory <= reading.timestamp)
if(_cache.front().timestamp + _maxHistory <= reading.timestamp) {
_stable = true;
//We shrink the cache capacity, if necessary, to its actual size
_cache.shrink_to_fit();
}
} else
_cache[_cacheIndex] = reading;
}
......@@ -193,9 +199,12 @@ int64_t CacheEntry::getAverage(uint64_t avg) {
TimeStamp ts;
if (_cache.size() > 0) {
// TODO: check this
if (ts.getRaw() - getOldest().timestamp < avg * NS_PER_S) {
throw std::out_of_range("Sid outdated");
throw std::out_of_range("Not enough data");
return 0;
}
else if (ts.getRaw() - getLatest().timestamp > avg * NS_PER_S) {
throw std::out_of_range("SID outdated");
return 0;
}
......@@ -225,7 +234,6 @@ int64_t CacheEntry::getAverage(uint64_t avg) {
}
int64_t CacheEntry::searchTimestamp(uint64_t t, bool relative) {
//TODO: Check for thread-safety
// Cache is empty or has only one element
if(_cache.size()<2)
return -1;
......
......@@ -32,7 +32,7 @@ typedef struct {
/**
* @brief Returns a constant reference to the internal cache vector.
**/
const std::vector<sensorReading_t>& getCache();
std::vector<sensorReading_t>& getCache();
/**
* @brief Stores a sensor reading in the cache.
......
......@@ -44,6 +44,7 @@ SimpleMQTTMessage::SimpleMQTTMessage()
bytesProcessed = 0;
remainingRaw = NULL;
remainingLength = 0;
bufferLength = 0;
fixedHeaderLength = 0;
msgId = 0;
payloadLength = 0;
......@@ -59,6 +60,17 @@ SimpleMQTTMessage::~SimpleMQTTMessage()
free(remainingRaw);
}
void SimpleMQTTMessage::clear() {
//We reset all variables except for the internal buffer, which is recycled
state = Empty;
bytesProcessed = 0;
remainingLength = 0;
fixedHeaderLength = 0;
msgId = 0;
payloadLength = 0;
payloadPtr = NULL;
}
ssize_t SimpleMQTTMessage::decodeFixedHeader(void* buf, size_t len)
{
/*
......@@ -139,8 +151,10 @@ ssize_t SimpleMQTTMessage::receiveMessage(void* buf, size_t len)
* If we are in this function for the first time,
* we need to allocate the buffer.
*/
if (!remainingRaw) {
if (!remainingRaw || remainingLength > bufferLength) {
if(remainingRaw) free(remainingRaw);
remainingRaw = malloc(remainingLength);
bufferLength = remainingLength;
if (!remainingRaw) {
throw new boost::system::system_error(errno, boost::system::system_category(), "Error in SimpleMQTTMessage::receiveMessage().");
}
......
......@@ -80,6 +80,7 @@ protected:
uint16_t msgId;
std::string topic;
void *remainingRaw;
size_t bufferLength;
size_t payloadLength;
void *payloadPtr;
......@@ -101,6 +102,7 @@ public:
SimpleMQTTMessage();
virtual ~SimpleMQTTMessage();
void clear();
};
#endif /* SIMPLEMQTTMESSAGE_H_ */
......@@ -176,41 +176,36 @@ void SimpleMQTTServerAcceptThread::run()
close(newsock);
} else {
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Successfully set socket " << newsock << " non-blocking in thread (" << this << ")...\n";
coutMtx.unlock();
coutMtx.lock();
cout << "Successfully set socket " << newsock << " non-blocking in thread (" << this << ")...\n";
coutMtx.unlock();
#endif
/*
* Find a free message thread to take over
* the next incoming connection.
*/
boost::ptr_list<SimpleMQTTServerMessageThread>::iterator mt = messageThreads.begin();
while ((mt != messageThreads.end()) && mt->queueConnection(newsock)) {
mt++;
}
if (mt == messageThreads.end()) {
/*
* In case no free message thread was found,
* try to create a new one as long as we do
* not exceed the maximum.
*/
if (messageThreads.size() >= this->_maxThreads) {
LOG(warning) << "Socket " << socket << " cannot accept more connections.";
// FIXME: There must be nicer ways to handle such situations...
close(newsock);
break;
if (messageThreads.size() < this->_maxThreads) {
// Spawning new threads, if not exceeding maximum thread pool size
messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback, this->_maxConnPerThread));
messageThreads.back()->queueConnection(newsock);
}
messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback, this->_maxConnPerThread));
messageThreads.back().queueConnection(newsock);
}
else {
// If thread pool is full, we cycle through it to find any available threads to connect to
unsigned int ctr=0;
do {
// Rotating the thread queue to ensure round-robin scheduling
threadCtr = (threadCtr + 1) % messageThreads.size();
} while(ctr++ < messageThreads.size() && messageThreads[threadCtr]->queueConnection(newsock));
if(ctr > messageThreads.size()) {
LOG(warning) << "Socket " << socket << " cannot accept more connections.";
// FIXME: There must be nicer ways to handle such situations...
close(newsock);
}
#ifdef SimpleMQTTVerbose
else {
coutMtx.lock();
cout << "Found a message thread with capacity: " << &*mt << "\n";
coutMtx.unlock();
}
else {
coutMtx.lock();
cout << "Found a message thread with capacity: " << messageThreads.front() << "\n";
coutMtx.unlock();
}
#endif
}
}
}
#ifdef SimpleMQTTVerbose
......@@ -231,8 +226,8 @@ void SimpleMQTTServerAcceptThread::setMessageCallback(SimpleMQTTMessageCallback
* MQTT message and propagate to all message threads.
*/
messageCallback = callback;
for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
(*i).setMessageCallback(callback);
for (std::vector<SimpleMQTTServerMessageThread*>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
(*i)->setMessageCallback(callback);
}
}
......@@ -245,6 +240,7 @@ SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, Simpl
this->_maxConnPerThread = maxConnPerThread;
socket = listenSock;
messageCallback = callback;
threadCtr = 0;
/*
* Release the lock to indicate that the constructor has
......@@ -253,8 +249,16 @@ SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, Simpl
launchMtx.unlock();
}
SimpleMQTTServerAcceptThread::~SimpleMQTTServerAcceptThread()
{
SimpleMQTTServerAcceptThread::~SimpleMQTTServerAcceptThread() {
terminate = true;
cleanupMtx.lock();
cleanupMtx.unlock();
//De-allocating and destroying running message threads
for(auto t : messageThreads)
delete t;
messageThreads.clear();
}
int SimpleMQTTServerMessageThread::sendAck(int connectionId) {
......@@ -430,8 +434,7 @@ void SimpleMQTTServerMessageThread::run()
break;
}
}
delete msg[connectionId];
msg[connectionId] = NULL;
msg[connectionId]->clear();
}
}
}
......@@ -584,11 +587,14 @@ SimpleMQTTServerMessageThread::~SimpleMQTTServerMessageThread()
* order to avoid fdsMtx to be accessed after destruction.
*/
terminate = true;
delete[] fdQueue;
delete[] msg;
delete[] fds;
cleanupMtx.lock();
cleanupMtx.unlock();
delete[] fdQueue;
for(unsigned i=0; i<_maxConnPerThread; i++)
delete msg[i];
delete[] msg;
delete[] fds;
}
......@@ -28,6 +28,7 @@
#define SIMPLEMQTTSERVERTHREAD_H_
#include "logging.h"
#include <vector>
class SimpleMQTTServerThread
{
......@@ -82,7 +83,8 @@ protected:
uint64_t _maxThreads;
uint64_t _maxConnPerThread;
int socket;
boost::ptr_list<SimpleMQTTServerMessageThread> messageThreads;
unsigned int threadCtr;
std::vector<SimpleMQTTServerMessageThread*> messageThreads;
SimpleMQTTMessageCallback messageCallback;
void run();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment