Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 28944306 authored by Alessio Netti's avatar Alessio Netti
Browse files

CollectAgent: added "hosts" REST API command to fetch list of connected MQTT clients

parent 215fda96
......@@ -35,12 +35,15 @@
CARestAPI::CARestAPI(serverSettings_t settings,
SensorCache* sensorCache,
AnalyticsController* analyticsController) :
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer) :
RESTHttpsServer(settings),
_sensorCache(sensorCache),
_analyticsController(analyticsController) {
_analyticsController(analyticsController),
_mqttServer(mqttServer) {
addEndpoint("/help", {http::verb::get, stdBind(GET_help)});
addEndpoint("/hosts", {http::verb::get, stdBind(GET_hosts)});
addEndpoint("/average", {http::verb::get, stdBind(GET_average)});
addEndpoint("/quit", {http::verb::put, stdBind(PUT_quit)});
......@@ -57,6 +60,22 @@ void CARestAPI::GET_help(endpointArgs) {
res.result(http::status::ok);
}
void CARestAPI::GET_hosts(endpointArgs) {
if (!_mqttServer) {
res.body() = "The MQTT server is not initialized!";
res.result(http::status::internal_server_error);
return;
}
std::ostringstream data;
data << "address,clientID,lastSeen" << std::endl;
std::vector<hostInfo_t> hostsVec = _mqttServer->collectLastSeen();
for(auto &el : hostsVec) {
data << el.address << "," << el.clientId << "," << std::to_string(el.lastSeen) << std::endl;
}
res.body() = data.str();
res.result(http::status::ok);
}
void CARestAPI::GET_average(endpointArgs) {
const std::string sensor = getQuery("sensor", queries);
const std::string interval = getQuery("interval", queries);
......
......@@ -33,6 +33,7 @@
#include "analyticscontroller.h"
#include "mqttchecker.h"
#include "configuration.h"
#include "simplemqttserver.h"
#include <signal.h>
/**
......@@ -44,7 +45,8 @@ class CARestAPI : public RESTHttpsServer {
public:
CARestAPI(serverSettings_t settings,
SensorCache* sensorCache,
AnalyticsController* analyticsController);
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer);
virtual ~CARestAPI() {}
......@@ -53,6 +55,8 @@ public:
" -GET: /help This help message.\n"
" /analytics/help\n"
" An help message for data analytics commands.\n"
" /hosts\n"
" Prints the list of connected hosts.\n"
" /average?sensor;[interval]\n"
" Average of last sensor readings from the last\n"
" [interval] seconds or of all cached readings\n"
......@@ -76,6 +80,18 @@ private:
*/
void GET_help(endpointArgs);
/**
* GET "/hosts"
*
* @brief Returns a CSV list of connected hosts and their "last seen" timestamps.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | - | - | -
*/
void GET_hosts(endpointArgs);
/**
* GET "/average"
*
......@@ -167,6 +183,7 @@ private:
SensorCache* _sensorCache;
AnalyticsController* _analyticsController;
SimpleMQTTServer* _mqttServer;
};
#endif /* COLLECTAGENT_CARESTAPI_H_ */
......@@ -833,7 +833,7 @@ int main(int argc, char* const argv[]) {
* Start the HTTP Server for the REST API
*/
if (restAPISettings.enabled) {
httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController, &ms);
config.readRestAPIUsers(httpsServer);
httpsServer->start();
LOG(info) << "HTTP Server running...";
......
......@@ -184,6 +184,15 @@ void SimpleMQTTServer::init(string addr, string port)
initSockets();
}
std::vector<hostInfo_t> SimpleMQTTServer::collectLastSeen() {
std::vector<hostInfo_t> hosts;
for(auto &t : acceptThreads) {
std::vector<hostInfo_t> tempHosts = t.collectLastSeen();
hosts.insert(hosts.end(), tempHosts.begin(), tempHosts.end());
}
return hosts;
}
SimpleMQTTServer::SimpleMQTTServer()
{
/*
......
......@@ -140,6 +140,7 @@ public:
void start();
void stop();
void setMessageCallback(SimpleMQTTMessageCallback callback);
std::vector<hostInfo_t> collectLastSeen();
SimpleMQTTServer();
SimpleMQTTServer(std::string addr, std::string port, uint64_t maxThreads=128, uint64_t maxConnPerThread=16);
......
......@@ -201,6 +201,28 @@ ssize_t SimpleMQTTMessage::receiveMessage(void* buf, size_t len)
payloadLength = remainingLength - ((uint8_t*)payloadPtr - (uint8_t*)remainingRaw);
break;
}
case MQTT_CONNECT: {
char* data = (char*) remainingRaw;
// First 10 bytes compose the CONNECT message's variable header
data+= 10;
// Message is malformed, break out
if(remainingLength < 12) {
break;
}
ssize_t idLen = ntohs(((uint16_t*) data)[0]);
data+= 2;
// Leveraging the topic field to store also the client ID on CONNECT messages
topic = string(data, idLen);
data+= idLen;
// We store the rest of the CONNECT payload in its raw form
payloadPtr = (void*) data;
payloadLength = remainingLength - ((uint8_t*)payloadPtr - (uint8_t*)remainingRaw);
break;
}
case MQTT_PUBREL: {
msgId = ntohs(((uint16_t*) remainingRaw)[0]);
break;
......
......@@ -165,7 +165,9 @@ void SimpleMQTTServerAcceptThread::run()
cout << "Thread (" << this << ") waiting in accept()...\n";
coutMtx.unlock();
#endif
newsock = accept(socket, NULL, 0);
struct sockaddr_in addr;
socklen_t socklen = sizeof(addr);
newsock = accept(socket, (struct sockaddr*)&addr, &socklen);
if (newsock != -1) {
int opt = fcntl(newsock, F_GETFL, 0);
if (opt == -1 || fcntl(newsock, F_SETFL, opt | O_NONBLOCK)==-1) {
......@@ -184,7 +186,7 @@ void SimpleMQTTServerAcceptThread::run()
if (messageThreads.size() < this->_maxThreads) {
// Spawning new threads, if not exceeding maximum thread pool size
messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback, this->_maxConnPerThread));
messageThreads.back()->queueConnection(newsock);
messageThreads.back()->queueConnection(newsock, std::string(inet_ntoa(addr.sin_addr)) + ":" + std::to_string(addr.sin_port));
}
else {
// If thread pool is full, we cycle through it to find any available threads to connect to
......@@ -192,7 +194,7 @@ void SimpleMQTTServerAcceptThread::run()
do {
// Rotating the thread queue to ensure round-robin scheduling
threadCtr = (threadCtr + 1) % messageThreads.size();
} while(ctr++ < messageThreads.size() && messageThreads[threadCtr]->queueConnection(newsock));
} while(ctr++ < messageThreads.size() && messageThreads[threadCtr]->queueConnection(newsock, inet_ntoa(addr.sin_addr)));
if(ctr > messageThreads.size()) {
LOG(warning) << "Socket " << socket << " cannot accept more connections.";
......@@ -232,6 +234,21 @@ void SimpleMQTTServerAcceptThread::setMessageCallback(SimpleMQTTMessageCallback
}
}
std::vector<hostInfo_t> SimpleMQTTServerAcceptThread::collectLastSeen() {
std::vector<hostInfo_t> hosts;
for(const auto &m : messageThreads) {
hostInfo_t *lastSeenVec = m->getLastSeen();
if(lastSeenVec) {
for(size_t idx=0; idx<this->_maxConnPerThread; idx++) {
if(lastSeenVec[idx].lastSeen != 0) {
hosts.push_back(lastSeenVec[idx]);
}
}
}
}
return hosts;
}
SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback, uint64_t maxThreads, uint64_t maxConnPerThread)
{
/*
......@@ -362,6 +379,8 @@ void SimpleMQTTServerMessageThread::run()
if (rbytes == 0) {
releaseConnection(connectionId);
}
lastSeen[connectionId].lastSeen = getTimestamp();
while (lbytes > 0) {
/*
......@@ -404,6 +423,7 @@ void SimpleMQTTServerMessageThread::run()
#endif
switch(msg[connectionId]->getType()) {
case MQTT_CONNECT: {
lastSeen[connectionId].clientId = msg[connectionId]->getTopic();
sendAck(connectionId);
break;
}
......@@ -446,7 +466,7 @@ void SimpleMQTTServerMessageThread::run()
}
}
int SimpleMQTTServerMessageThread::queueConnection(int newsock) {
int SimpleMQTTServerMessageThread::queueConnection(int newsock, std::string addr) {
if (numConnections >= this->_maxConnPerThread) {
#ifdef SimpleMQTTVerbose
coutMtx.lock();
......@@ -471,7 +491,8 @@ int SimpleMQTTServerMessageThread::queueConnection(int newsock) {
cout << "Queued new connection (" << this << ", " << newsock << ", " << fdQueueWritePos << ")...\n";
coutMtx.unlock();
#endif
fdQueue[fdQueueWritePos] = newsock;
fdQueue[fdQueueWritePos].first = newsock;
fdQueue[fdQueueWritePos].second = addr;
fdQueueWritePos = nextWritePos;
return 0;
}
......@@ -491,7 +512,9 @@ void SimpleMQTTServerMessageThread::assignConnections()
if (fds[i].fd == -1) {
fds[i].events = POLLIN | POLLPRI | POLLHUP;
fds[i].revents = 0;
fds[i].fd = fdQueue[fdQueueReadPos];
fds[i].fd = fdQueue[fdQueueReadPos].first;
lastSeen[i].lastSeen = getTimestamp();
lastSeen[i].address = fdQueue[fdQueueReadPos].second;
numConnections++;
fdQueueReadPos = (fdQueueReadPos + 1) % SimpleMQTTConnectionsQueueLength;
......@@ -516,6 +539,10 @@ void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
fds[connectionId].fd = -1;
fds[connectionId].events = 0;
fds[connectionId].revents = 0;
lastSeen[connectionId].lastSeen = 0;
lastSeen[connectionId].address = "";
lastSeen[connectionId].clientId = "";
if (msg[connectionId]) {
delete msg[connectionId];
msg[connectionId] = NULL;
......@@ -564,11 +591,19 @@ SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread(SimpleMQTTMessageCa
*/
numConnections = 0;
messageCallback = callback;
// Initializing the list of address-value pairs for connected hosts
this->lastSeen = new hostInfo_t[this->_maxConnPerThread];
for(size_t idx=0; idx<this->_maxConnPerThread; idx++) {
this->lastSeen[idx].lastSeen = 0;
this->lastSeen[idx].address = "";
this->lastSeen[idx].clientId = "";
}
/*
* Initialize the fd queue for new connections
*/
fdQueue = new(int[SimpleMQTTConnectionsQueueLength]);
fdQueue = new(std::pair<int, std::string>[SimpleMQTTConnectionsQueueLength]);
fdQueueReadPos = 0;
fdQueueWritePos = 0;
......@@ -598,5 +633,6 @@ SimpleMQTTServerMessageThread::~SimpleMQTTServerMessageThread()
delete msg[i];
delete[] msg;
delete[] fds;
delete[] lastSeen;
}
......@@ -29,8 +29,15 @@
#define SIMPLEMQTTSERVERTHREAD_H_
#include "logging.h"
#include "timestamp.h"
#include <vector>
typedef struct {
uint64_t lastSeen;
std::string address;
std::string clientId;
} hostInfo_t;
/**
* @brief Simple MQTT server thread.
*
......@@ -68,7 +75,8 @@ protected:
unsigned numConnections;
struct pollfd* fds;
SimpleMQTTMessage** msg;
int *fdQueue;
std::pair<int, std::string> *fdQueue;
hostInfo_t *lastSeen;
int fdQueueReadPos;
int fdQueueWritePos;
SimpleMQTTMessageCallback messageCallback;
......@@ -77,11 +85,12 @@ protected:
int sendAck(int connectionId);
public:
int queueConnection(int newsock);
int queueConnection(int newsock, std::string addr);
void assignConnections();
void releaseConnection(int connectionId);
bool hasCapacity();
void setMessageCallback(SimpleMQTTMessageCallback callback);
hostInfo_t *getLastSeen() { return lastSeen; }
SimpleMQTTServerMessageThread(SimpleMQTTMessageCallback callback, uint64_t maxConnPerThread=16);
virtual
......@@ -107,10 +116,10 @@ protected:
public:
void setMessageCallback(SimpleMQTTMessageCallback callback);
std::vector<hostInfo_t> collectLastSeen();
SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback, uint64_t maxThreads=128, uint64_t maxConnPerThread=16);
virtual
~SimpleMQTTServerAcceptThread();
virtual~SimpleMQTTServerAcceptThread();
};
#endif /* SIMPLEMQTTSERVERTHREAD_H_ */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment