16.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit dda07f02 authored by Michael Ott's avatar Michael Ott
Browse files

Merge branch 'development'

parents ab3f9666 923d8efd
......@@ -82,7 +82,7 @@ void CSOperator::execOnInit() {
_trainingUnit = _units[0]->getName();
_actualBlocks = _units[0]->getInputs().size() < _numBlocks ? _units[0]->getInputs().size() : _numBlocks;
if(_actualBlocks!=_numBlocks)
LOG(warning) << "Operator " << _name << ": cannot enforce " << _numBlocks << ", using " << _actualBlocks << " instead.";
LOG(warning) << "Operator " << _name << ": cannot enforce " << _numBlocks << " blocks, using " << _actualBlocks << " instead.";
} else {
_actualBlocks = _numBlocks;
}
......@@ -164,9 +164,9 @@ bool CSOperator::dumpToFile(std::string &path) {
// Saving CS data in terms of permutation index, minimum and maximum for each input sensor
for(size_t idx=0; idx<_permVector.size(); idx++) {
boost::property_tree::ptree group;
group.push_back(boost::property_tree::ptree::value_type("idx", std::to_string(_permVector[idx])));
group.push_back(boost::property_tree::ptree::value_type("min", std::to_string(_min[idx])));
group.push_back(boost::property_tree::ptree::value_type("max", std::to_string(_max[idx])));
group.push_back(boost::property_tree::ptree::value_type("idx", boost::property_tree::ptree(std::to_string(_permVector[idx]))));
group.push_back(boost::property_tree::ptree::value_type("min", boost::property_tree::ptree(std::to_string(_min[idx]))));
group.push_back(boost::property_tree::ptree::value_type("max", boost::property_tree::ptree(std::to_string(_max[idx]))));
blocks.add_child(std::to_string(idx), group);
}
root.add_child(std::to_string(_permVector.size()), blocks);
......@@ -324,7 +324,7 @@ void CSOperator::computeMinMax(std::vector<std::vector<reading_t>>& v) {
for (const auto &s : v[idx]) {
if (s.value > max)
max = s.value;
else if (s.value < min)
if (s.value < min)
min = s.value;
}
} else {
......
......@@ -200,10 +200,10 @@ bool MariaDB::getDBJobID(const std::string & job_id_string, std::string& job_db_
}
bool MariaDB::getCurrentSuffixAggregateTable(std::string & suffix){
if(_end_aggregate_timestamp){
auto now_uts = getTimestamp();
if(_end_aggregate_timestamp){
auto now_uts = getTimestamp();
if(now_uts < _end_aggregate_timestamp) { //suffix found, don't do anything
suffix = _current_table_suffix;
suffix = _current_table_suffix;
return true;
}
}
......@@ -212,7 +212,7 @@ bool MariaDB::getCurrentSuffixAggregateTable(std::string & suffix){
std::replace( date_time.begin(), date_time.end(), 'T', ' ');
std::stringstream build_query;
build_query << "SELECT suffix, UNIX_TIMESTAMP(end_timestamp) FROM SuffixToAggregateTable WHERE begin_timestamp < \'";
build_query << "SELECT suffix, UNIX_TIMESTAMP(end_timestamp) FROM SuffixToAggregateTable WHERE begin_timestamp <= \'";
build_query << date_time << "\' AND end_timestamp > \'" << date_time << "\'";
auto query = build_query.str();
LOG(debug) << query;
......
......@@ -26,6 +26,7 @@
//================================================================================
#include "CARestAPI.h"
#include <boost/beast/http/field.hpp>
#define stdBind(fun) std::bind(&CARestAPI::fun, \
this, \
......@@ -34,16 +35,27 @@
std::placeholders::_3)
CARestAPI::CARestAPI(serverSettings_t settings,
std::map<std::string, influx_t>& influxMap,
SensorCache* sensorCache,
AnalyticsController* analyticsController) :
SensorDataStore* sensorDataStore,
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer) :
RESTHttpsServer(settings),
_influxMap(influxMap),
_sensorCache(sensorCache),
_analyticsController(analyticsController) {
_sensorDataStore(sensorDataStore),
_analyticsController(analyticsController),
_mqttServer(mqttServer) {
addEndpoint("/help", {http::verb::get, stdBind(GET_help)});
addEndpoint("/hosts", {http::verb::get, stdBind(GET_hosts)});
addEndpoint("/average", {http::verb::get, stdBind(GET_average)});
addEndpoint("/quit", {http::verb::put, stdBind(PUT_quit)});
addEndpoint("/ping", {http::verb::get, stdBind(GET_ping)});
addEndpoint("/query", {http::verb::post, stdBind(POST_query)});
addEndpoint("/write", {http::verb::post, stdBind(POST_write)});
_analyticsController->getManager()->addRestEndpoints(this);
addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
......@@ -57,6 +69,22 @@ void CARestAPI::GET_help(endpointArgs) {
res.result(http::status::ok);
}
void CARestAPI::GET_hosts(endpointArgs) {
if (!_mqttServer) {
res.body() = "The MQTT server is not initialized!";
res.result(http::status::internal_server_error);
return;
}
std::ostringstream data;
data << "address,clientID,lastSeen" << std::endl;
std::map<std::string, hostInfo_t> hostsVec = _mqttServer->collectLastSeen();
for(auto &el : hostsVec) {
data << el.second.address << "," << el.second.clientId << "," << std::to_string(el.second.lastSeen) << std::endl;
}
res.body() = data.str();
res.result(http::status::ok);
}
void CARestAPI::GET_average(endpointArgs) {
const std::string sensor = getQuery("sensor", queries);
const std::string interval = getQuery("interval", queries);
......@@ -103,6 +131,98 @@ void CARestAPI::GET_average(endpointArgs) {
}
}
void CARestAPI::GET_ping(endpointArgs) {
res.body() = "";
res.result(http::status::ok);
}
void CARestAPI::POST_query(endpointArgs) {
res.set(http::field::content_type, "application/json");
res.body() = "{results: [{statement_id: 0}]}";
res.result(http::status::ok);
}
void CARestAPI::POST_write(endpointArgs) {
std::istringstream body(req.body());
std::string line;
while (std::getline(body, line)) {
// Regex to split line into measurement, tags, fields, timestamp
boost::regex r1("^([^,]*)(,[^ ]*)? ([^ ]*)( .*)?$", boost::regex::extended);
// Regex to split comma-separated tags and fields into individual entries
boost::regex r2(",?([^,=]*)=([^,]*)", boost::regex::extended);
boost::smatch m1, m2;
if (boost::regex_search(line, m1, r1)) {
std::string measurement = m1[1].str();
auto m = _influxMap.find(measurement);
if (m != _influxMap.end()) {
influx_t influx = m->second;
// Parse tags into a map
std::map<std::string, std::string> tags;
std::string tagList = m1[2].str();
while (boost::regex_search(tagList, m2, r2)) {
tags[m2[1].str()] = m2[2].str();
tagList = m2.suffix().str();
}
auto t = tags.find(influx.tag);
if (t != tags.end()) {
std::string tagName = t->second;
// Perform pattern filter or substitution via regex on tag
if (!influx.tagRegex.empty()) {
std::string input(tagName);
tagName = "";
boost::regex_replace(std::back_inserter(tagName), input.begin(), input.end(), influx.tagRegex, influx.tagSubstitution.c_str(), boost::regex_constants::format_sed | boost::regex_constants::format_no_copy);
if (tagName.size() == 0) {
// There was no match
break;
}
}
std::map<std::string, std::string> fields;
std::string fieldList = m1[3].str();
while (boost::regex_search(fieldList, m2, r2)) {
fields[m2[1].str()] = m2[2].str();
fieldList = m2.suffix().str();
}
DCDB::TimeStamp ts;
try {
ts = TimeStamp(m1[4].str());
} catch (...) {
}
for (auto &f: fields) {
// If no fields were defined, we take any field
if (influx.fields.empty() || (influx.fields.find(f.first) != influx.fields.end())) {
std::string mqttTopic = m->second.mqttPrefix + "/" + tagName + "/" + f.first;
uint64_t value = 0;
try {
value = stoull(f.second);
} catch (...) {
break;
}
DCDB::SensorId sid;
if (sid.mqttTopicConvert(mqttTopic)) {
_sensorCache->storeSensor(sid, ts.getRaw(), value);
_sensorDataStore->insert(&sid, ts.getRaw(), value, 0); //Fixme: TTL
}
#ifdef DEBUG
LOG(debug) << "influx insert: " << mqttTopic << " " << ts.getRaw() << " " << value;
#endif
}
}
}
} else {
LOG(debug) << "influx: unknown measurement " << measurement;
}
}
}
res.body() = "";
res.result(http::status::no_content);
}
void CARestAPI::PUT_quit(endpointArgs) {
int retCode = getQuery("code", queries)=="" ? 0 : std::stoi(getQuery("code", queries));
if(retCode<0 || retCode>255)
......
......@@ -33,6 +33,7 @@
#include "analyticscontroller.h"
#include "mqttchecker.h"
#include "configuration.h"
#include "simplemqttserver.h"
#include <signal.h>
/**
......@@ -43,8 +44,11 @@
class CARestAPI : public RESTHttpsServer {
public:
CARestAPI(serverSettings_t settings,
std::map<std::string, influx_t>& influxMap,
SensorCache* sensorCache,
AnalyticsController* analyticsController);
SensorDataStore* sensorDataStore,
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer);
virtual ~CARestAPI() {}
......@@ -53,6 +57,8 @@ public:
" -GET: /help This help message.\n"
" /analytics/help\n"
" An help message for data analytics commands.\n"
" /hosts\n"
" Prints the list of connected hosts.\n"
" /average?sensor;[interval]\n"
" Average of last sensor readings from the last\n"
" [interval] seconds or of all cached readings\n"
......@@ -76,6 +82,18 @@ private:
*/
void GET_help(endpointArgs);
/**
* GET "/hosts"
*
* @brief Returns a CSV list of connected hosts and their "last seen" timestamps.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | - | - | -
*/
void GET_hosts(endpointArgs);
/**
* GET "/average"
*
......@@ -104,6 +122,10 @@ private:
*/
void PUT_quit(endpointArgs);
void GET_ping(endpointArgs);
void POST_query(endpointArgs);
void POST_write(endpointArgs);
/**
* PUT "/analytics/reload"
*
......@@ -166,7 +188,10 @@ private:
void PUT_analytics_navigator(endpointArgs);
SensorCache* _sensorCache;
SensorDataStore* _sensorDataStore;
AnalyticsController* _analyticsController;
SimpleMQTTServer* _mqttServer;
std::map<std::string, influx_t> _influxMap;
};
#endif /* COLLECTAGENT_CARESTAPI_H_ */
......@@ -458,19 +458,19 @@ int mqttCallback(SimpleMQTTMessage *msg)
if ((jd.endTime == DCDB::TimeStamp((uint64_t)0)) || ((jd.endTime.getRaw() & 0x1) == 1)) {
//starting job data
if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
LOG(error) << "Job data insert failed!";
LOG(error) << "Job data insert for job " << jd.jobId << " failed!";
return 1;
}
} else {
//ending job data
DCDB::JobData tmp;
if (myJobDataStore->getJobById(tmp, jd.jobId, jd.domainId) != DCDB::JD_OK) {
LOG(error) << "Could not retrieve job to be updated!";
LOG(error) << "Could not retrieve job " << jd.jobId << " to be updated!";
return 1;
}
if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
LOG(error) << "Could not update end time of job!";
LOG(error) << "Could not update end time of job " << tmp.jobId << "!";
return 1;
}
}
......@@ -806,6 +806,27 @@ int main(int argc, char* const argv[]) {
LOG(info) << " REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
LOG(info) << " Certificate: " << restAPISettings.certificate;
LOG(info) << " Private key file: " << restAPISettings.privateKey;
if (config.influxMap.size() > 0) {
LOG(info) << "InfluxDB Settings:";
for (auto &m: config.influxMap) {
LOG(info) << " Measurement: " << m.first;
LOG(info) << " Tag: " << m.second.tag;
if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0))
if (m.second.tagSubstitution != "&") {
LOG(info) << " TagFilter: s/" << m.second.tagRegex.str() << "/" << m.second.tagSubstitution << "/";
} else {
LOG(info) << " TagFilter: " << m.second.tagRegex.str();
}
if (m.second.fields.size() > 0) {
stringstream ss;
copy(m.second.fields.begin(), m.second.fields.end(), ostream_iterator<std::string>(ss, ","));
string fields = ss.str();
fields.pop_back();
LOG(info) << " Fields: " << fields;
}
}
}
}
LOG_VAR(vLogLevel) << "----- Analytics Configuration -----";
for(auto& p : analyticsController->getManager()->getPlugins()) {
......@@ -833,7 +854,7 @@ int main(int argc, char* const argv[]) {
* Start the HTTP Server for the REST API
*/
if (restAPISettings.enabled) {
httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
httpsServer = new CARestAPI(restAPISettings, config.influxMap, &mySensorCache, mySensorDataStore, analyticsController, &ms);
config.readRestAPIUsers(httpsServer);
httpsServer->start();
LOG(info) << "HTTP Server running...";
......@@ -843,22 +864,22 @@ int main(int argc, char* const argv[]) {
* Run (hopefully) forever...
*/
keepRunning = 1;
timeval start, end;
uint64_t start, end;
double elapsed;
msgCtr = 0;
pmsgCtr = 0;
readingCtr = 0;
gettimeofday(&start, NULL);
uint64_t lastCleanup = start.tv_sec;
start = getTimestamp();
uint64_t lastCleanup = start;
uint64_t sleepInterval = (settings.statisticsInterval > 0) ? settings.statisticsInterval : 60;
LOG(info) << "Collect Agent running...";
while(keepRunning) {
gettimeofday(&start, NULL);
if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
lastCleanup = start.tv_sec;
start = getTimestamp();
if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) {
uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval));
lastCleanup = start;
if(purged > 0)
LOG(info) << "Cache: purged " << purged << " obsolete entries";
}
......@@ -867,12 +888,19 @@ int main(int argc, char* const argv[]) {
if((settings.statisticsInterval > 0) && keepRunning) {
/* not really thread safe but will do the job */
gettimeofday(&end, NULL);
elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
end = getTimestamp();
elapsed = NS_TO_S(((double) end - (double) start));
float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
LOG(info) << "Performance: " << (readingCtr/elapsed) << " inserts/s, " << (msgCtr/elapsed) << " messages/s (" << publish << "% PUBLISH)";
LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed) << " inserts/s ";
std::map<std::string, hostInfo_t> lastSeen = ms.collectLastSeen();
uint64_t connectedHosts = 0;
for (auto h: lastSeen) {
if (h.second.lastSeen >= end - S_TO_NS(settings.statisticsInterval)) {
connectedHosts++;
}
}
LOG(info) << "Connected hosts: " << connectedHosts;
msgCtr = 0;
pmsgCtr = 0;
readingCtr = 0;
......
......@@ -72,4 +72,42 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
}
}
}
// ----- READING INFLUXDB LINE PROTOCOL SETTINGS -----
if (cfg.find("influx") != cfg.not_found()) {
BOOST_FOREACH(boost::property_tree::iptree::value_type & global, cfg.get_child("influx")) {
if (boost::iequals(global.first, "measurement")) {
influx_t influx;
BOOST_FOREACH(boost::property_tree::iptree::value_type &m, global.second) {
if (boost::iequals(m.first, "tag")) {
influx.tag = m.second.data();
} else if (boost::iequals(m.first, "tagfilter")) {
//check if input has sed format of "s/.../.../" for substitution
boost::regex checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
boost::smatch matchResults;
if (regex_match(m.second.data(), matchResults, checkSubstitute)) {
//input has substitute format
influx.tagRegex = boost::regex(matchResults[2].str(), boost::regex_constants::extended);
influx.tagSubstitution = matchResults[3].str();
} else {
//input is only a regex
influx.tagRegex = boost::regex(m.second.data(), boost::regex_constants::extended);
influx.tagSubstitution = "&";
}
} else if (boost::iequals(m.first, "mqttprefix")) {
influx.mqttPrefix = m.second.data();
} else if (boost::iequals(m.first, "fields")) {
std::stringstream ss(m.second.data());
while (ss.good()) {
std::string s;
getline(ss, s, ',');
influx.fields.insert(s);
}
}
}
influxMap[global.second.data()] = influx;
}
}
}
}
......@@ -31,6 +31,8 @@
#include <string>
#include <unistd.h>
#include <boost/algorithm/string/trim.hpp>
#include <boost/regex.hpp>
#include "logging.h"
#include "globalconfiguration.h"
......@@ -63,6 +65,16 @@ public:
bool debugLog = false;
};
class influx_t {
public:
influx_t() {}
std::string mqttPrefix;
std::string tag;
boost::regex tagRegex;
std::string tagSubstitution;
std::set<std::string> fields;
};
/**
* @brief Class responsible for reading collect agent specific configuration.
*
......@@ -93,6 +105,7 @@ public:
uint64_t messageThreads = 128;
uint64_t messageSlots = 16;
cassandra_t cassandraSettings;
std::map<std::string, influx_t> influxMap;
protected:
void readAdditionalBlocks(boost::property_tree::iptree& cfg) override;
......
......@@ -184,6 +184,24 @@ void SimpleMQTTServer::init(string addr, string port)
initSockets();
}
std::map<std::string, hostInfo_t> SimpleMQTTServer::collectLastSeen() {
std::map<std::string, hostInfo_t> hosts;
for(auto &t : acceptThreads) {
std::vector<hostInfo_t> tempHosts = t.collectLastSeen();
for(auto &h1 : tempHosts) {
auto h2 = hosts.find(h1.clientId);
if (h2 != hosts.end()) {
if (h2->second.lastSeen < h1.lastSeen) {
h2->second = h1;
}
} else {
hosts[h1.clientId] = h1;
}
}
}
return hosts;
}
SimpleMQTTServer::SimpleMQTTServer()
{
/*
......
......@@ -140,6 +140,7 @@ public:
void start();
void stop();
void setMessageCallback(SimpleMQTTMessageCallback callback);
std::map<std::string, hostInfo_t> collectLastSeen();
SimpleMQTTServer();
SimpleMQTTServer(std::string addr, std::string port, uint64_t maxThreads=128, uint64_t maxConnPerThread=16);
......
......@@ -201,6 +201,32 @@ ssize_t SimpleMQTTMessage::receiveMessage(void* buf, size_t len)
payloadLength = remainingLength - ((uint8_t*)payloadPtr - (uint8_t*)remainingRaw);
break;
}
case MQTT_CONNECT: {
char* data = (char*) remainingRaw;
// First 10 bytes compose the CONNECT message's variable header
data+= 10;
// Message is malformed, break out
if(remainingLength < 12) {
state = Error;
} else {
ssize_t idLen = ntohs(((uint16_t *) data)[0]);
data += 2;
// Leveraging the topic field to store also the client ID on CONNECT messages
if (idLen > 0) {
topic = string(data, idLen);
data += idLen;
} else {
topic = "";
}
// We store the rest of the CONNECT payload in its raw form
payloadPtr = (void *) data;
payloadLength = remainingLength - ((uint8_t *) payloadPtr - (uint8_t *) remainingRaw);
}
break;
}
case MQTT_PUBREL: {
msgId = ntohs(((uint16_t*) remainingRaw)[0]);
break;
......
......@@ -165,7 +165,9 @@ void SimpleMQTTServerAcceptThread::run()
cout << "Thread (" << this << ") waiting in accept()...\n";
coutMtx.unlock();
#endif
newsock = accept(socket, NULL, 0);
struct sockaddr_in addr;
socklen_t socklen = sizeof(addr);
newsock = accept(socket, (struct sockaddr*)&addr, &socklen);
if (newsock != -1) {
int opt = fcntl(newsock, F_GETFL, 0);
if (opt == -1 || fcntl(newsock, F_SETFL, opt | O_NONBLOCK)==-1) {
......@@ -184,7 +186,7 @@ void SimpleMQTTServerAcceptThread::run()
if (messageThreads.size() < this->_maxThreads) {
// Spawning new threads, if not exceeding maximum thread pool size
messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback, this->_maxConnPerThread));
messageThreads.back()->queueConnection(newsock);
messageThreads.back()->queueConnection(newsock, std::string(inet_ntoa(addr.sin_addr)) + ":" + std::to_string(addr.sin_port));
}
else {
// If thread pool is full, we cycle through it to find any available threads to connect to
......@@ -192,7 +194,7 @@ void SimpleMQTTServerAcceptThread::run()
do {
// Rotating the thread queue to ensure round-robin scheduling
threadCtr = (threadCtr + 1) % messageThreads.size();
} while(ctr++ < messageThreads.size() && messageThreads[threadCtr]->queueConnection(newsock));
} while(ctr++ < messageThreads.size() && messageThreads[threadCtr]->queueConnection(newsock, inet_ntoa(addr.sin_addr)));
if(ctr > messageThreads.size()) {
LOG(warning) << "Socket " << socket << " cannot accept more connections.";
......@@ -232,6 +234,21 @@ void SimpleMQTTServerAcceptThread::setMessageCallback(SimpleMQTTMessageCallback
}
}
std::vector<hostInfo_t> SimpleMQTTServerAcceptThread::collectLastSeen() {
std::vector<hostInfo_t> hosts;
for(const auto &m : messageThreads) {
hostInfo_t *lastSeenVec = m->getLastSeen();
if(lastSeenVec) {
for(size_t idx=0; idx<this->_maxConnPerThread; idx++) {
if(lastSeenVec[idx].lastSeen != 0) {
hosts.push_back(lastSeenVec[idx]);
}
}
}
}
return hosts;
}
SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback, uint64_t maxThreads, uint64_t maxConnPerThread)
{