Commit 6623dc9f authored by Alessio Netti's avatar Alessio Netti

Merge remote-tracking branch 'remotes/origin/development'

parents f11f8bb0 0e0ac9a1
......@@ -6,11 +6,11 @@ The *Data Center* *Data Base* (DCDB) is a modular, continuous and holistic monit
* _Collect_ _Agent_: The collect agent functions as intermediary between one storage backend and one or multiple pushers.
* _Pusher_: The main part for collecting data is the pusher framework. It allows to run arbitrary data collection plugins and pushes the data to the collect agent via MQTT messages.
Other features included by DCDB:
* libdcdb: a library to access the storage backend with external tools
* a set of tools leveraging libdcdb (e.g. a command line tool to query the storage backend)
* built in data analytics framework
* pusher plugins for a variety of typical HPC data sources
Other features included in DCDB:
* libdcdb: a library to access the storage backend with external tools.
* A set of tools leveraging libdcdb (e.g. a command line tool to query the storage backend).
* Built-in data analytics framework.
* Pusher plugins for a variety of typical HPC data sources.
## Build, install and run
......@@ -33,7 +33,7 @@ $ make depsinstall install
$ make depsinstall install doc
```
To run DCDB locally with the included default config files:
To run DCDB locally with the included default configuration files:
```bash
//Change to the DCDB directory you created during installation
......@@ -60,7 +60,7 @@ DCDB was created at Leibniz Supercomputing Centre (LRZ).
For questions and/or suggestions please contact info@dcdb.it
Copyright (C) 2011-2019 Leibniz Supercomputing Centre
Copyright (C) 2011-2020 Leibniz Supercomputing Centre
DCDB (except for the libdcdb parts) is licensed under the terms
and conditions of the GNU GENERAL PUBLIC LICENSE (Version 2 or later).
......
......@@ -57,6 +57,9 @@ libdcdboperator_classifier.$(LIBEXT): operators/regressor/RegressorOperator.o op
libdcdboperator_clustering.$(LIBEXT): operators/clustering/ClusteringOperator.o operators/clustering/ClusteringConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
libdcdboperator_cssignatures.$(LIBEXT): operators/cssignatures/CSOperator.o operators/cssignatures/CSConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core
libdcdboperator_job_aggregator.$(LIBEXT): operators/aggregator/AggregatorOperator.o operators/aggregator/JobAggregatorOperator.o operators/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
......
......@@ -354,8 +354,8 @@ void OperatorManager::addRestEndpoints(RESTHttpsServer* restServer) {
restServer->addEndpoint("/analytics/units", {http::verb::get, stdBind(GET_analytics_units)});
restServer->addEndpoint("/analytics/operators", {http::verb::get, stdBind(GET_analytics_operators)});
restServer->addEndpoint("/analytics/start", {http::verb::put, stdBind(PUT_analytics_start)});
restServer->addEndpoint("/analytics/stop", {http::verb::put, stdBind(PUT_analytics_stop)});
restServer->addEndpoint("/analytics/start", {http::verb::post, stdBind(POST_analytics_start)});
restServer->addEndpoint("/analytics/stop", {http::verb::post, stdBind(POST_analytics_stop)});
restServer->addEndpoint("/analytics/compute", {http::verb::put, stdBind(PUT_analytics_compute)});
restServer->addEndpoint("/analytics/operator", {http::verb::put, stdBind(PUT_analytics_operator)});
}
......@@ -543,7 +543,7 @@ void OperatorManager::GET_analytics_operators(endpointArgs) {
}
void OperatorManager::PUT_analytics_start(endpointArgs) {
void OperatorManager::POST_analytics_start(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
......@@ -560,7 +560,7 @@ void OperatorManager::PUT_analytics_start(endpointArgs) {
}
}
void OperatorManager::PUT_analytics_stop(endpointArgs) {
void OperatorManager::POST_analytics_stop(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
......
......@@ -228,13 +228,7 @@ public:
" /units?plugin;[operator];[json]\n"
" D List of units to which sensors are associated in\n"
" the specified data analytics plugin (and operator).\n"
" -PUT: /start?[plugin];[operator]\n"
" Start all or only a specific analytics plugin or\n"
" start only a specific operator within a plugin.\n"
" /stop?[plugin];[operator]\n"
" Stop all or only a specific analytics plugin or\n"
" stop only a specific operator within a plugin.\n"
" /reload?[plugin] Reload all or only a specific analytics plugin.\n"
" -PUT /reload?[plugin] Reload all or only a specific analytics plugin.\n"
" /load?plugin;[path];[config]\n"
" Load a new plugin. Optionally specify path to the\n"
" shared library and/or the config file for the \n"
......@@ -248,6 +242,12 @@ public:
" selected operator within a plugin (refer to plugin\n"
" documentation).\n"
" /navigator Reloads the sensor navigator.\n"
" -POST:/start?[plugin];[operator]\n"
" Start all or only a specific analytics plugin or\n"
" start only a specific operator within a plugin.\n"
" /stop?[plugin];[operator]\n"
" Stop all or only a specific analytics plugin or\n"
" stop only a specific operator within a plugin.\n"
"\n"
"D = Discovery method\n"
"All resources have to be prepended by host:port.\n"
......@@ -362,7 +362,7 @@ private:
void GET_analytics_operators(endpointArgs);
/**
* PUT "/analytics/start"
* POST "/analytics/start"
*
* @brief Start all or only a specific plugin. Or only start a specific
* streaming operator within a specific plugin.
......@@ -377,10 +377,10 @@ private:
* | | | to be specified. Limited to
* | | | streaming operators.
*/
void PUT_analytics_start(endpointArgs);
void POST_analytics_start(endpointArgs);
/**
* PUT "/analytics/stop"
* POST "/analytics/stop"
*
* @brief Stop all or only a specific plugin. Or only stop a specific
* streaming operator within a plugin.
......@@ -395,7 +395,7 @@ private:
* | | | to be specified. Limited to
* | | | streaming operators.
*/
void PUT_analytics_stop(endpointArgs);
void POST_analytics_stop(endpointArgs);
/**
* This endpoint must either be overwritten (by adding a custom
......
This diff is collapsed.
template_signature def1 {
interval 1000
minValues 1
streaming true
window 30000
}
signature sig1 {
default def1
numBlocks 20
input {
; Supposing that we target a compute node, we pick all of its available sensors
all
}
output {
; There are at most two outputs, one for the real blocks and one for the imag blocks
; These will be duplicated automatically according to the desired number of blocks
sensor "<bottomup 1>cs-sig-real" {
mqttsuffix /cs-sig-real
}
sensor "<bottomup 1>cs-sig-imag" {
mqttsuffix /cs-sig-imag
imag true
}
}
}
smoother smt1 {
;smoother smt1 {
; interval 10000
; minValues 10
; duplicate false
; separator "-"
; exclude "/cpu[0-9]*/"
;
; output {
;
; sensor avg300 {
; mqttsuffix /avg300
; range 300000
; }
;
; sensor avg3600 {
; mqttsuffix /avg3600
; range 3600000
; }
; }
;}
;smoother grafanaFine {
; interval 1000
; minValues 10
; duplicate false
; separator "-"
; exclude "/cpu[0-9]*/"
;
; output {
;
; sensor avg10 {
; mqttsuffix /avg10
; range 10000
; }
;
; }
;}
smoother grafanaCoarse {
interval 10000
minValues 10
duplicate false
......@@ -8,9 +46,9 @@ smoother smt1 {
output {
sensor avg300 {
mqttsuffix /avg300
range 300000
}
mqttsuffix /avg300
range 300000
}
sensor avg3600 {
mqttsuffix /avg3600
......
......@@ -97,8 +97,8 @@ void computePercentiles(std::vector<reading_t> &data, const std::vector<size_t>
// Sorting the sensor reading buffer to extract quantiles
std::sort(data.begin(), data.end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
for(const auto& q : percentilePositions) {
idx = (data.size() * q) / 100;
mod = (data.size() * q) % 100;
idx = ((data.size()-1) * q) / 100;
mod = ((data.size()-1) * q) % 100;
percentiles.push_back((mod==0 || idx==data.size()-1) ? data[idx].value : (data[idx].value + data[idx+1].value)/2);
}
}
......
......@@ -64,7 +64,9 @@ public:
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobIdFilterStr = QueryEngine::getInstance().getJobIdFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr);
this->_jobIdFilter = boost::regex(this->_jobIdFilterStr);
}
/**
......@@ -78,7 +80,9 @@ public:
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobIdFilterStr = QueryEngine::getInstance().getJobIdFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr);
this->_jobIdFilter = boost::regex(this->_jobIdFilterStr);
}
/**
......@@ -90,7 +94,9 @@ public:
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobIdFilterStr = QueryEngine::getInstance().getJobIdFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr);
this->_jobIdFilter = boost::regex(this->_jobIdFilterStr);
return *this;
}
......@@ -225,7 +231,7 @@ protected:
// The job unit is generated as a hierarchical unit
jobUnit = unitGen.generateFromTemplate(uTemplate, jobTopic, jobData.nodes, this->_mqttPart, this->_enforceTopics, this->_relaxed);
// Initializing sensors if necessary
jobUnit->init(this->_interval);
jobUnit->init(this->_interval, this->_queueSize);
this->addToUnitCache(jobUnit);
}
return jobUnit;
......@@ -250,6 +256,10 @@ protected:
for(auto& nodeName : jobData.nodes)
nodeName = MQTTChecker::formatTopic(nodeName) + std::string(1, MQTT_SEP);
// First, we apply the job ID filter, if configured
if(_jobIdFilterStr!="" && !boost::regex_search(jobData.jobId.c_str(), _match, _jobIdFilter))
return false;
// No filter was set - every job is accepted
if(_jobFilterStr=="" || _jobMatchStr=="")
return true;
......@@ -289,14 +299,16 @@ protected:
virtual void computeAsync() override {
try {
_jobDataVec.clear();
if(this->_queryEngine.queryJob("", this->_interval * 1000000, 0, _jobDataVec, true, true)) {
uint64_t queryTsEnd = !this->_scheduledTime ? getTimestamp() : this->_scheduledTime;
uint64_t queryTsStart = queryTsEnd - (this->_interval * 1000000);
if(this->_queryEngine.queryJob("", queryTsStart, queryTsEnd, _jobDataVec, false, true)) {
_tempUnits.clear();
// Producing units from the job data, discarding invalid jobs in the process
for(auto& job : _jobDataVec) {
try {
_tempUnits.push_back(jobDataToUnit(job));
} catch(const invalid_argument& e2) {
LOG(error) << e2.what();
LOG(debug) << e2.what();
_tempUnits.push_back(nullptr);
continue; }
}
......@@ -322,14 +334,15 @@ protected:
_tempUnits.clear();
}
else
LOG(error) << "Operator " + this->_name + ": cannot retrieve job data!";
LOG(debug) << "Operator " + this->_name + ": cannot retrieve job data!";
} catch(const exception& e) {
LOG(error) << "Operator " + this->_name + ": internal error " + e.what() + " during computation!";
_unitAccess.store(false);
}
if (this->_timer && this->_keepRunning) {
this->_timer->expires_at(timestamp2ptime(this->nextReadingTime()));
this->_scheduledTime = this->nextReadingTime();
this->_timer->expires_at(timestamp2ptime(this->_scheduledTime));
this->_pendingTasks++;
this->_timer->async_wait(bind(&JobOperatorTemplate::computeAsync, this));
}
......@@ -347,6 +360,9 @@ protected:
string _jobMatchStr;
boost::regex _jobFilter;
boost::cmatch _match;
// Filters for jobs based on their IDs
string _jobIdFilterStr;
boost::regex _jobIdFilter;
// Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
......
......@@ -382,6 +382,8 @@ protected:
{
if (boost::iequals(val.first, "interval")) {
op.setInterval(stoull(val.second.data()));
} else if (boost::iequals(val.first, "queueSize")) {
op.setQueueSize(stoull(val.second.data()));
} else if (boost::iequals(val.first, "minValues")) {
op.setMinValues(stoull(val.second.data()));
} else if (boost::iequals(val.first, "mqttPart")) {
......
......@@ -90,6 +90,7 @@ public:
_keepRunning(0),
_minValues(1),
_interval(1000),
_queueSize(1024),
_cacheInterval(900000),
_unitCacheLimit(1000),
_cacheSize(1),
......@@ -116,6 +117,7 @@ public:
_keepRunning(other._keepRunning),
_minValues(other._minValues),
_interval(other._interval),
_queueSize(other._queueSize),
_cacheInterval(other._cacheInterval),
_unitCacheLimit(other._unitCacheLimit),
_cacheSize(other._cacheSize),
......@@ -148,6 +150,7 @@ public:
_minValues = other._minValues;
_interval = other._interval;
_cacheInterval = other._cacheInterval;
_queueSize = other._queueSize;
_unitCacheLimit = other._unitCacheLimit;
_cacheSize = other._cacheSize;
_delayInterval = other._delayInterval;
......@@ -265,6 +268,7 @@ public:
bool getStreaming() const { return _streaming; }
unsigned getMinValues() const { return _minValues; }
unsigned getInterval() const { return _interval; }
unsigned getQueueSize() const { return _queueSize; }
unsigned getCacheSize() const { return _cacheSize; }
unsigned getUnitCacheLimit() const { return _unitCacheLimit; }
unsigned getDelayInterval() const { return _delayInterval; }
......@@ -285,6 +289,7 @@ public:
void setDisabled(bool disabled) { _disabled = disabled; }
void setMinValues(unsigned minValues) { _minValues = minValues; }
void setInterval(unsigned interval) { _interval = interval; }
void setQueueSize(unsigned queueSize) { _queueSize = queueSize; }
void setUnitCacheLimit(unsigned uc) { _unitCacheLimit = uc+1; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
void setDelayInterval(unsigned delayInterval) { _delayInterval = delayInterval; }
......@@ -358,6 +363,8 @@ protected:
unsigned int _minValues;
// Sampling period regulating compute batches
unsigned int _interval;
// readingQueue size
unsigned int _queueSize;
// Size of the cache in time for the output sensors in this operator
unsigned int _cacheInterval;
// Maximum number of units that can be contained in the unit cache
......
......@@ -72,7 +72,8 @@ public:
OperatorInterface(name),
_unitCache(nullptr),
_insertionLUT(nullptr),
_queryEngine(QueryEngine::getInstance()) {}
_queryEngine(QueryEngine::getInstance()),
_scheduledTime(0) {}
/**
* @brief Copy constructor
......@@ -86,7 +87,8 @@ public:
OperatorInterface(other),
_unitCache(nullptr),
_insertionLUT(nullptr),
_queryEngine(QueryEngine::getInstance()) {
_queryEngine(QueryEngine::getInstance()),
_scheduledTime(0) {
for(auto u : other._units) {
_units.push_back(u);
......@@ -103,6 +105,7 @@ public:
OperatorTemplate& operator=(const OperatorTemplate& other) {
OperatorInterface::operator=(other);
_units.clear();
_scheduledTime = 0;
for(auto u : other._units) {
_units.push_back(u);
......@@ -144,6 +147,7 @@ public:
LOG_VAR(ll) << " MinValues: " << _minValues;
LOG_VAR(ll) << " Interval: " << _interval;
LOG_VAR(ll) << " Interval Delay: " << _delayInterval;
LOG_VAR(ll) << " QueueSize: " << _queueSize;
LOG_VAR(ll) << " Unit Cache Size: " << _unitCacheLimit;
if(!_units.empty()) {
LOG_VAR(ll) << " Units:";
......@@ -234,7 +238,7 @@ public:
OperatorInterface::init(io);
for(const auto u : _units)
u->init(_interval);
u->init(_interval, _queueSize);
this->execOnInit();
}
......@@ -353,7 +357,7 @@ public:
addToUnitCache(tempUnit);
}
// Initializing sensors if necessary
tempUnit->init(_interval);
tempUnit->init(_interval, _queueSize);
compute(tempUnit);
retrieveAndFlush(outMap, tempUnit);
} catch(const exception& e) {
......@@ -397,8 +401,9 @@ public:
}
if(_unitCache->size() >= _unitCacheLimit) {
U_Ptr oldest = _insertionLUT->begin()->second;
_unitCache->erase(oldest->getName());
auto oldest = _insertionLUT->begin();
_unitCache->erase(oldest->second->getName());
_insertionLUT->erase(oldest->first);
}
_unitCache->insert(make_pair(unit->getName(), unit));
// The template unit must never be deleted, even if the cache is full; therefore, we omit its entry from
......@@ -508,7 +513,8 @@ protected:
}
if (_timer && _keepRunning && !_disabled) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_scheduledTime = nextReadingTime();
_timer->expires_at(timestamp2ptime(_scheduledTime));
_pendingTasks++;
_timer->async_wait(bind(&OperatorTemplate::computeAsync, this));
}
......@@ -536,6 +542,8 @@ protected:
vector<UnitPtr> _baseUnits;
// Instance of a QueryEngine object to get sensor data
QueryEngine& _queryEngine;
// Internal timestamp to keep track of the time at which an operator will be invoked
uint64_t _scheduledTime;
};
#endif //PROJECT_OPERATORTEMPLATE_H
......@@ -44,9 +44,9 @@ struct qeJobData {
};
//Typedef for the callback used to retrieve sensors
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool, const uint64_t);
//Typedef for the callback used to retrieve sensors
typedef bool (*QueryEngineGroupCallback)(const vector<string>&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
typedef bool (*QueryEngineGroupCallback)(const vector<string>&, const uint64_t, const uint64_t, vector<reading_t>&, const bool, const uint64_t);
//Typedef for the job retrieval callback
typedef bool (*QueryEngineJobCallback)(const string&, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
//Typedef for the metadata retrieval callback
......@@ -130,6 +130,17 @@ public:
*/
void setJobFilter(const string& jfilter) { _jobFilter = jfilter; }
/**
* @brief Set the current job ID filter
*
* This method sets the internal filter string used by job operators to identify
* the set of jobs for which they are responsible, based on their ID. All jobs whose ID does not
* match this filter are excluded.
*
* @param jidfilter String containing the new job ID filter
*/
void setJobIDFilter(const string& jidfilter) { _jobIdFilter = jidfilter; }
/**
* @brief Set the current job match string
*
......@@ -223,6 +234,13 @@ public:
*/
const string& getJobFilter() { return _jobFilter; }
/**
* @brief Returns the current job ID filter
*
* @return String containing the current job ID filter
*/
const string& getJobIdFilter() { return _jobIdFilter; }
/**
* @brief Returns the current job match string
*
......@@ -256,14 +274,15 @@ public:
* @param endTs End timestamp (in nanoseconds) of the time range for the query. Must be >= startTs
* @param buffer Reference to a vector in which readings must be stored.
* @param rel If true, the input timestamps are considered to be relative offset against "now"
* @param tol Tolerance (in ns) for returned timestamps. Does not affect Cassandra range queries.
* @return True if successful, false otherwise
*/
bool querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true) {
bool querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true, const uint64_t tol=3600000000000) {
if(!_callback)
throw runtime_error("Query Engine: callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
throw invalid_argument("Query Engine: invalid time range!");
return _callback(name, startTs, endTs, buffer, rel);
return _callback(name, startTs, endTs, buffer, rel, tol);
}
/**
......@@ -274,12 +293,12 @@ public:
*
* @return True if successful, false otherwise
*/
bool querySensor(const vector<string>& names, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true) {
bool querySensor(const vector<string>& names, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true, const uint64_t tol=3600000000000) {
if(!_gCallback)
throw runtime_error("Query Engine: callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
throw invalid_argument("Query Engine: invalid time range!");
return _gCallback(names, startTs, endTs, buffer, rel);
return _gCallback(names, startTs, endTs, buffer, rel, tol);
}
/**
......@@ -404,6 +423,8 @@ private:
string _jobFilter;
// String storing the matching string resulting from the job filter for a job to be processed
string _jobMatch;
// String storing the job ID filter to be used by job operators
string _jobIdFilter;
};
#endif //PROJECT_QUERYENGINE_H
......@@ -340,12 +340,15 @@ public:
else {
units = new set<string>();
for(const auto &u : uNames) {
units->insert(u);
// The unit specified as input must belong to the domain of the outputs
if (!outputs.empty() && !nodeBelongsToPattern(u, outputs[0]->getName())) {
delete units;
throw invalid_argument("UnitGenerator: Node " + u + " does not belong to this unit domain!");
}
if (!outputs.empty() && !nodeBelongsToPattern(u, outputs[0]->getName()))
LOG(debug) << "UnitGenerator: Node " + u + " does not belong to this unit domain!";
else
units->insert(u);
}
if(units->empty()) {
delete units;
throw invalid_argument("UnitGenerator: All input nodes do not belong to this unit domain!");
}
}
......@@ -357,8 +360,8 @@ public:
unitObjects->push_back(_generateUnit(u, inputs, outputs, inputMode, mqttPrefix, enforceTopics, relaxed));
} catch(const exception& e) {
if(units->size()>1) {
LOG(error) << e.what();
LOG(error) << "UnitGenerator: cannot build unit " << u << "!";
LOG(debug) << e.what();
LOG(debug) << "UnitGenerator: cannot build unit " << u << "!";
continue;
} else {
delete units;
......@@ -438,8 +441,8 @@ public:
} catch (const invalid_argument &e) {
topUnit->clear();
if(uNames.size()>1) {
LOG(error) << e.what();
LOG(error) << "HierarchicalUnitGenerator: cannot build unit " << u << "!";
LOG(debug) << e.what();
LOG(debug) << "HierarchicalUnitGenerator: cannot build unit " << u << "!";
continue;
} else {
delete unitObjects;
...