Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 44cf05f2 authored by Alessio Netti's avatar Alessio Netti
Browse files

Completed integration of meta-data

- Collectagent uses published TTL values and caches received metadata
- READMEs updated accordingly
parent 672829be
......@@ -495,6 +495,8 @@ With enforceTopics:
/analytics/avg1/rack02/node03/cpu00/sum
```
Like ordinary sensors in DCDB Pusher, also operator output sensors can be published via the _auto-publish_ feature, and metadata can be specified for them. For more details, refer to the README of DCDB Pusher.
#### Pipelining Operators <a name="pipelining"></a>
The inputs and outputs of streaming operators can be chained so as to form a processing pipeline. To enable this, users
......
......@@ -46,7 +46,7 @@ void AnalyticsController::stop() {
_initialized = false;
}
bool AnalyticsController::initialize(Configuration& settings, const string& configPath, MetadataStore& metaStore) {
bool AnalyticsController::initialize(Configuration& settings, const string& configPath) {
_settings = settings;
_configPath = configPath;
_navigator = make_shared<SensorNavigator>();
......@@ -55,7 +55,7 @@ bool AnalyticsController::initialize(Configuration& settings, const string& conf
QueryEngine &_queryEngine = QueryEngine::getInstance();
if(_manager->probe(_configPath, "collectagent.conf")) {
vector<string> topics;
for(const auto& kv : metaStore.getMap())
for(const auto& kv : _metadataStore->getMap())
topics.push_back(kv.second.pattern);
// Building the sensor navigator
......@@ -151,7 +151,7 @@ void AnalyticsController::run() {
_sensorCache->storeSensor(sid, readingBuf.timestamp, readingBuf.value);
}
_sensorCache->getSensorMap()[sid].updateBatchSize(op->getMinValues());
_dcdbStore->insertBatch(readings);
_dcdbStore->insertBatch(readings, _metadataStore->getTTL(s->getMqtt()));
_readingCtr += readings.size();
}
op->releaseUnits();
......@@ -178,6 +178,7 @@ bool AnalyticsController::publishSensors() {
if (s->getMetadata()) {
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(*s->getMetadata());
err = _dcdbCfg->publishSensor(ps);
_metadataStore->store(s->getMetadata()->pattern, *s->getMetadata());
} else {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
}
......
......@@ -34,7 +34,6 @@
#include <dcdb/sensorconfig.h>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/algorithm/string/trim.hpp>
#include "../analytics/OperatorManager.h"
#include "sensornavigator.h"
#include "sensorcache.h"
......@@ -72,6 +71,7 @@ public:
_manager = make_shared<OperatorManager>();
_navigator = nullptr;
_sensorCache = nullptr;
_metadataStore = nullptr;
_keepRunning = false;
_doHalt = false;
_halted = true;
......@@ -106,10 +106,9 @@ public:
*
* @param settings Settings class containing user-specified configuration parameters
* @param configPath Path to the configuration files for the data analytics framework
* @param metaStore MetadataStore object containing all public sensors to add in the sensor tree
* @return True if successful, false otherwise
*/
bool initialize(Configuration& settings, const string& configPath, MetadataStore& metaStore);
bool initialize(Configuration& settings, const string& configPath);
/**
* @brief Sets the cache to be used for sensors
......@@ -119,6 +118,13 @@ public:
* @param cache The SensorCache object to be used as cache
*/
void setCache(SensorCache* cache) { _sensorCache = cache; }
/**
* @brief Sets the internal MetadataStore object to retrieve sensor information
*
* @param mStore A pointer to a valid MetadataStore object
*/
void setMetadataStore(MetadataStore* mStore) { _metadataStore = mStore; }
/**
* @brief Returns the status of the internal thread
......@@ -198,6 +204,8 @@ private:
DCDB::SensorDataStore *_dcdbStore;
// Global sensor cache object for the collectagent
SensorCache *_sensorCache;
// Global sensor metadata store object
MetadataStore *_metadataStore;
// Sensor navigator
shared_ptr<SensorNavigator> _navigator;
// Internal data operator manager object
......
......@@ -50,6 +50,7 @@
#include <string>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <dcdb/connection.h>
#include <dcdb/sensordatastore.h>
......@@ -218,6 +219,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
}
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
err = mySensorConfig->publishSensor(ps);
metadataStore->store(sm.pattern, sm);
} else {
err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
}
......@@ -284,7 +286,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
}
mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
mySensorDataStore->insertBatch(readings);
mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
readingCtr+= readings.size();
//mySensorCache.dump();
......@@ -499,19 +501,23 @@ int main(int argc, char* const argv[]) {
list<DCDB::PublicSensor> publicSensors;
metadataStore = new MetadataStore();
mySensorConfig->getPublicSensorsVerbose(publicSensors);
std::string patternBuf;
SensorMetadata sBuf;
for (const auto &s : publicSensors)
if (!s.is_virtual)
metadataStore->store(s.pattern, Configuration::publicSensorToMetadata(s));
if (!s.is_virtual) {
sBuf = Configuration::publicSensorToMetadata(s);
boost::algorithm::trim(sBuf.pattern);
metadataStore->store(sBuf.pattern, sBuf);
}
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
analyticsController->setCache(&mySensorCache);
analyticsController->setMetadataStore(metadataStore);
queryEngine.setFilter(analyticsSettings.filter);
queryEngine.setJobFilter(analyticsSettings.jobFilter);
queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setJobQueryCallback(jobQueryCallback);
if(!analyticsController->initialize(settings, argv[argc - 1], *metadataStore))
if(!analyticsController->initialize(settings, argv[argc - 1]))
return EXIT_FAILURE;
LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
......
......@@ -305,6 +305,21 @@ public:
throw invalid_argument("MetadataStore: key " + key + " does not exist!");
return _metadata[key];
}
/**
* @brief Returns the TTL of a sensorMetadata_t object from the internal map.
*
* If the input key does not exist in the map, the value -1 is returned. This method exists
* to boost (slightly) look-up performance in the CollectAgent, which requires TTL values
* when performing database inserts.
*
* @param key Sensor key to be queried
* @return TTL value in seconds
*/
int64_t getTTL(const string& key) {
auto it = _metadata.find(key);
return it==_metadata.end() ? -1 : it->second.ttl/1000000000;
}
/**
* @brief Returns a sensorMetadata_t object from the internal map, converted into JSON format.
......
......@@ -21,7 +21,8 @@
1. [counterData](#opaCounterData)
8. [ProcFS](#procfs)
9. [Caliper](#caliper)
10. [Writing own plugins](#writingOwnPlugins)
10. [Metadata Management](#metadataManagement)
11. [Writing own plugins](#writingOwnPlugins)
## Introduction <a name="introduction"></a>
DCDB (DataCenter DataBase) is a database to collect various (sensor-)values of a datacenter for further analysis.
......@@ -273,7 +274,7 @@ PUT https://localhost:8000/stop?plugin=bacnet
## MQTT topic <a name="mqttTopic"></a>
For communication between the different DCDB-components (database, dcdbpusher) the [MQTT protocol](https://mqtt.org/) is used. In order to identify each sensor, everyone has to have a unique MQTT topic assigned. A MQTT topic for DCDB consists of exactly 112 bits (= 28 hex characters), not including '/' separators. The topic for a sensor is built by appending up to 4 parts:
For communication between the different DCDB-components (database, dcdbpusher) the [MQTT protocol](https://mqtt.org/) is used. In order to identify each sensor, each has to have a unique MQTT topic assigned. The topic for a sensor is built by appending up to 4 parts:
1. mqttprefix (e.g. /mysystem)
2. mqttpart of entity (if supported by plugin, e.g. /host0)
3. mqttpart of group (e.g. /eth0)
......@@ -281,6 +282,10 @@ For communication between the different DCDB-components (database, dcdbpusher) t
Then the topic for the sensor is /mysystem/host0/eth0/xmitdata.
Additionally, sensors can be published automatically to the Storage Backend under their specified MQTT topics, by using the _auto-publish_ feature. Such feature is enabled via the _-a_ switch to DCDB Pusher. This way, the metadata tables in the Storage Backend will be populated with the information of all instantiated sensors, and these will become visible for queries.
# Plugins <a name ="plugins"></a>
The core of dcdbpusher is responsible of collecting all the values read by the sensors and sending them to the database. However, the main functionality of the sensors comes from the various plugins. Every plugin corresponds to a special sensor functionality.
......@@ -697,6 +702,43 @@ Explanation of the values specific for this plugin:
|:----- |:----------- |
| timeout | Number of read cycles after which an Caliper-application is assumed to be terminated if no new values have been received. Connection (shared memory) is teared down on timeout.
## Metadata Management <a name="metadataManagement"></a>
Sensor metadata can be included in Pusher configurations, and will be published to the Storage Backend if the _auto-publish_ feature is enabled. A metadata block looks like the following:
```
...
group g2 {
sensor pw {
mqttsuffix /power
...
metadata {
unit Watt
scale 1000
ttl 3600000
operations avg5,min,max
}
}
}
...
```
Available fields that can be published as metadata are the following:
| Value | Explanation |
|:----- |:----------- |
| unit | String containing the unit of measure for the sensor, if any. |
| scale | Scaling factor (as a floating point value) to be applied to readings of the sensor upon queries. |
| ttl | Time to live for the readings of this sensor in milliseconds, after which they are automatically deleted from the Storage Backend. |
| monotonic | Boolean flag specifying whether the sensor is monotonic or not. |
| integrable | Boolean flag specifying whether the sensor's time series can be integrated or not. |
| interval | Sampling interval in milliseconds of the sensor. |
| operations | Comma-separated lists of operations available for the sensor, whose values can be retrieved by appending their names to the sensor name. |
## Writing own plugins <a name="writingOwnPlugins"></a>
First make sure you read the [plugins](#plugins) section.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment