In January 2021 we will introduce a 10 GB quota for project repositories. Higher limits for individual projects will be available on request. Please see https://doku.lrz.de/display/PUBLIC/GitLab for more information.

Commit 88f3104f authored by Alessio Netti's avatar Alessio Netti

PrintConfig functionality added to Data Analytics Framework

- Added some more indentation to make the configurations more readable
- Fixed one more warning in dcdbquery
parent abf6af19
......@@ -11,6 +11,11 @@ AverageAnalyzer::~AverageAnalyzer() {
delete _buffer;
}
void AverageAnalyzer::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _window;
AnalyzerTemplate<SensorBase>::printConfig(ll);
}
void AverageAnalyzer::compute(int unitID) {
long long max=0, sum=0, avg=0;
......
......@@ -15,6 +15,7 @@ public:
void setWindow(unsigned long long w) { _window = w; }
unsigned long long getWindow() { return _window; }
void printConfig(LOG_LEVEL ll) override;
private:
......
......@@ -82,6 +82,13 @@ public:
*/
virtual std::vector<AnalyzerPtr>& getAnalyzers() = 0;
/**
* @brief Prints the current plugin configuration
*
* @param ll Logging level at which the configuration is printed
*/
virtual void printConfig(LOG_LEVEL ll) = 0;
protected:
// Logger object
......
......@@ -105,6 +105,27 @@ public:
_cacheInterval = pluginSettings.cacheInterval;
}
/**
* @brief Print configuration as read in.
*
* @param ll Logging level to log with
*/
void printConfig(LOG_LEVEL ll) final {
LOG_VAR(ll) << " General: ";
LOG_VAR(ll) << " MQTT-Prefix: " << (_mqttPrefix != "" ? _mqttPrefix : "DEFAULT");
LOG_VAR(ll) << " Sensor Pattern: " << (_sensorPattern != "" ? _sensorPattern : "DEFAULT");
LOG_VAR(ll) << " Cache interval: " << _cacheInterval << " ms";
//prints plugin specific configurator attributes and entities if present
printConfiguratorConfig(ll);
LOG_VAR(ll) << " Analyzers: ";
for(auto a : _analyzers) {
LOG_VAR(ll) << " Analyzer: " << a->getName();
a->printConfig(ll);
}
}
/**
* @brief Read a config file and instantiate analyzers accordingly
*
......@@ -265,7 +286,16 @@ protected:
*/
virtual void global(CFG_VAL config) {}
/**
* @brief Print information about configurable configurator attributes.
*
* This method is virtual and can be overridden on a per-plugin basis.
*
* @param ll Severity level to log with
*/
virtual void printConfiguratorConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " No other plugin-specific general parameters defined";
}
/**
* @brief Store an analyzer in the internal vectors
......@@ -368,23 +398,19 @@ protected:
for (auto &u: *units) {
constructSensorNames(*u, an);
if (an.getStreaming()) {
LOG(debug) << " Unit \"" << u->getName() << "\"";
for (const auto &i : u->getInputs())
LOG(debug) << " Input " << i->getName();
for (const auto &o : u->getOutputs())
LOG(debug) << " Output " << o->getName() << " using MQTT-topic \"" << o->getMqtt() << "\"";
if (!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
} else
} else {
LOG(debug) << " Unit " << u->getName() << " generated.";
an.addUnit(u);
}
} else {
if (unit(*u)) {
an.addToOndemandCache(u);
LOG(debug) << " Template unit for on-demand operation " + u->getName() + " generated!";
LOG(debug) << " Template unit for on-demand operation " + u->getName() + " generated.";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
......
......@@ -193,6 +193,13 @@ public:
*/
virtual map<string, reading_t> computeOnDemand(const string& node="") = 0;
/**
* @brief Prints the current analyzer configuration
*
* @param ll Logging level at which the configuration is printed
*/
virtual void printConfig(LOG_LEVEL ll) = 0;
// Getter methods
const string& getName() const { return _name; }
const string& getMqttPart() const { return _mqttPart; }
......
......@@ -102,6 +102,27 @@ public:
}
}
/**
* @brief Prints the current analyzer configuration
*
* @param ll Logging level at which the configuration is printed
*/
virtual void printConfig(LOG_LEVEL ll) override {
LOG_VAR(ll) << " MQTT part: " << (_mqttPart != "" ? _mqttPart : "DEFAULT");
LOG_VAR(ll) << " Sync readings: " << (_sync ? "enabled" : "disabled");
LOG_VAR(ll) << " Streaming mode: " << (_streaming ? "enabled" : "disabled");
LOG_VAR(ll) << " Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
LOG_VAR(ll) << " MinValues: " << _minValues;
LOG_VAR(ll) << " Interval: " << _interval;
LOG_VAR(ll) << " Start delay: " << _delayInterval;
if(!_units.empty()) {
LOG_VAR(ll) << " Units:";
for (auto u : _units)
u->printConfig(ll, lg);
} else
LOG_VAR(ll) << " Units: none";
}
/**
* @brief Adds an unit to this analyzer
*
......
......@@ -76,6 +76,14 @@ public:
*/
virtual std::vector<SBasePtr>& getBaseOutputs() = 0;
/**
* @brief Prints the current unit configuration
*
* @param ll Logging level at which the configuration is printed
* @param lg Logger object to be used
*/
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg) = 0;
};
//for better readability
......
......@@ -187,6 +187,23 @@ public:
*/
void addOutput(const S_Ptr output) { _outputs.push_back(output); _baseOutputs.push_back(output); }
/**
* @brief Prints the current unit configuration
*
* @param ll Logging level at which the configuration is printed
* @param lg Logger object to be used
*/
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg) override {
LOG_VAR(ll) << " Unit: " << _name;
LOG_VAR(ll) << " Inputs: ";
for (const auto &i : _inputs)
LOG_VAR(ll) << " " << i->getName();
LOG_VAR(ll) << " Outputs: ";
for (const auto &o : _outputs)
o->printConfig(ll, lg, 20);
}
protected:
// Name corresponds to the output unit we are addressing
......
......@@ -257,22 +257,14 @@ public:
static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg) {
LOG_VAR(ll) << " Sensor: " << _name;
LOG_VAR(ll) << " MQTT Topic: " << _mqtt;
LOG_VAR(ll) << " sink: " << getSinkPath();
LOG_VAR(ll) << " subSampling: " << getSubsampling();
if(_skipConstVal) {
LOG_VAR(ll) << " Skipping constant values";
} else {
LOG_VAR(ll) << " No skipping of constant values";
}
if(_delta) {
LOG_VAR(ll) << " Storing delta readings";
} else {
LOG_VAR(ll) << " Storing absolute readings";
}
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << "Sensor: " << _name;
LOG_VAR(ll) << leading << " MQTT Topic: " << _mqtt;
LOG_VAR(ll) << leading << " Sink: " << (getSinkPath() != "" ? getSinkPath() : "none");
LOG_VAR(ll) << leading << " SubSampling: " << getSubsampling();
LOG_VAR(ll) << leading << (_skipConstVal ? " Skipping constant values" : " No skipping of constant values");
LOG_VAR(ll) << leading << (_delta ? " Storing delta readings" : " Storing absolute readings");
}
protected:
......
......@@ -254,56 +254,92 @@ int main(int argc, char** argv) {
return 1;
}
_analyticsManager = new AnalyticsManager();
// Preparing the SensorNavigator
bool failedTree = false;
std::shared_ptr<SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector<std::string> names, topics;
for(const auto& p : _configuration->getPlugins())
for(const auto& g : p.configurator->getSensorGroups())
for(const auto& s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
try {
navigator->buildTree(globalSettings.hierarchy, &names, &topics);
} catch(const std::invalid_argument& e) {
LOG(error) << e.what();
LOG(error) << "Failed to build sensor hierarchy tree, data analytics manager will not be initialized!";
failedTree = true;
}
if(!failedTree) {
_queryEngine.setNavigator(navigator);
_queryEngine.triggerUpdate();
_queryEngine.setQueryCallback(sensorQueryCallback);
if(!_analyticsManager->load(argv[argc-1], "dcdbpusher.conf", pluginSettings)) {
LOG(fatal) << "Failed to load data analytics manager!";
return 1;
}
}
//print configuration to give some feedback
//config of plugins is only printed if the config shall be validated or to debug level otherwise
LOG_LEVEL vLogLevel = LOG_LEVEL::debug;
if (globalSettings.validateConfig) {
vLogLevel = boost::log::trivial::info;
}
LOG_VAR(vLogLevel) << "----- Configuration: -----";
LOG_VAR(vLogLevel) << "----- Configuration -----";
//print global settings in either case
LOG(info) << "Global Settings:";
LOG(info) << " Broker: " << globalSettings.brokerHost << ":" << globalSettings.brokerPort;
LOG(info) << " Threads: " << globalSettings.threads;
LOG(info) << " Daemonize: " << (globalSettings.daemonize ? "Enabled" : "Disabled");
LOG(info) << " MaxMsgNum: " << globalSettings.maxMsgNum;
LOG(info) << " MaxInflightMsgNum: " << globalSettings.maxInflightMsgNum;
LOG(info) << " MaxQueuedMsgNum: " << globalSettings.maxQueuedMsgNum;
LOG(info) << " MQTT-QoS: " << globalSettings.qosLevel;
LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix;
LOG(info) << " Write-Dir: " << pluginSettings.tempdir;
LOG(info) << " Hierarchy: " << globalSettings.hierarchy;
LOG(info) << " CacheInterval: " << pluginSettings.cacheInterval / 1000 << " [s]";
LOG(info) << " Broker: " << globalSettings.brokerHost << ":" << globalSettings.brokerPort;
LOG(info) << " Threads: " << globalSettings.threads;
LOG(info) << " Daemonize: " << (globalSettings.daemonize ? "Enabled" : "Disabled");
LOG(info) << " MaxMsgNum: " << globalSettings.maxMsgNum;
LOG(info) << " MaxInflightMsgNum: " << globalSettings.maxInflightMsgNum;
LOG(info) << " MaxQueuedMsgNum: " << globalSettings.maxQueuedMsgNum;
LOG(info) << " MQTT-QoS: " << globalSettings.qosLevel;
LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix;
LOG(info) << " Write-Dir: " << pluginSettings.tempdir;
LOG(info) << " Hierarchy: " << globalSettings.hierarchy;
LOG(info) << " CacheInterval: " << pluginSettings.cacheInterval / 1000 << " [s]";
if(globalSettings.validateConfig) {
LOG(info) << " Only validating config files.";
LOG(info) << " Only validating config files.";
} else {
LOG(info) << " validateConfig: Disabled";
LOG(info) << " ValidateConfig: Disabled";
}
LOG(info) << "RestAPI Settings:";
LOG(info) << " REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
LOG(info) << " REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
#ifdef DEBUG
LOG(info) << " Certificate: " << restAPISettings.certificate;
LOG(info) << " Private key file: " << restAPISettings.privateKey;
LOG(info) << " DH params from: " << restAPISettings.dhFile;
LOG(info) << " Certificate: " << restAPISettings.certificate;
LOG(info) << " Private key file: " << restAPISettings.privateKey;
LOG(info) << " DH params from: " << restAPISettings.dhFile;
#else
LOG(info) << " Certificate, private key and DH-param file not printed.";
LOG(info) << " Certificate, private key and DH-param file not printed.";
#endif
LOG_VAR(vLogLevel) << "----- Sampling Configuration -----";
for(auto& p : _configuration->getPlugins()) {
LOG_VAR(vLogLevel) << "Plugin \"" << p.id << "\"";
LOG_VAR(vLogLevel) << "Sampling Plugin \"" << p.id << "\"";
p.configurator->printConfig(vLogLevel);
}
LOG_VAR(vLogLevel) << "----- End Config -----";
LOG_VAR(vLogLevel) << "----- Analytics Configuration -----";
for(auto& p : _analyticsManager->getPlugins()) {
LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
p.configurator->printConfig(vLogLevel);
}
LOG_VAR(vLogLevel) << "----- End Configuration -----";
if (globalSettings.validateConfig) {
return 0;
}
//MQTTPusher and Https server get their own threads
_analyticsManager = new AnalyticsManager();
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.sensorPattern, globalSettings.qosLevel,
_configuration->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, _analyticsManager, io);
......@@ -328,34 +364,7 @@ int main(int argc, char** argv) {
}
}
// Preparing the SensorNavigator
bool failedTree = false;
std::shared_ptr<SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector<std::string> names, topics;
for(const auto& p : _configuration->getPlugins())
for(const auto& g : p.configurator->getSensorGroups())
for(const auto& s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
try {
navigator->buildTree(globalSettings.hierarchy, &names, &topics);
} catch(const std::invalid_argument& e) {
LOG(error) << e.what();
LOG(error) << "Failed to build sensor hierarchy tree, data analytics manager will not be initialized!";
failedTree = true;
}
if(!failedTree) {
_queryEngine.setNavigator(navigator);
_queryEngine.triggerUpdate();
_queryEngine.setQueryCallback(sensorQueryCallback);
if(!_analyticsManager->load(argv[argc-1], "dcdbpusher.conf", pluginSettings)) {
LOG(fatal) << "Failed to load data analytics manager!";
return 1;
}
if(!_queryEngine.updated.is_lock_free())
LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
......
......@@ -316,25 +316,25 @@ public:
* @param ll Logging level to log with
*/
void printConfig(LOG_LEVEL ll) final {
LOG_VAR(ll) << " General: ";
LOG_VAR(ll) << " General: ";
if (_mqttPrefix != "") {
LOG_VAR(ll) << " MQTT-Prefix: " << _mqttPrefix;
LOG_VAR(ll) << " MQTT-Prefix: " << _mqttPrefix;
} else {
LOG_VAR(ll) << " MQTT-Prefix: DEFAULT";
LOG_VAR(ll) << " MQTT-Prefix: DEFAULT";
}
if (_sensorPattern != "") {
LOG_VAR(ll) << " Sensor Pattern: " << _sensorPattern;
LOG_VAR(ll) << " Sensor Pattern: " << _sensorPattern;
}
if (_cacheInterval != DEFAULT_CACHE_INTERVAL) {
LOG_VAR(ll) << " Cache interval: " << _cacheInterval << " ms";
LOG_VAR(ll) << " Cache interval: " << _cacheInterval << " ms";
} else {
LOG_VAR(ll) << " Cache interval: DEFAULT";
LOG_VAR(ll) << " Cache interval: DEFAULT";
}
//prints plugin specific configurator attributes and entities if present
printConfiguratorConfig(ll);
LOG_VAR(ll) << " Groups:";
LOG_VAR(ll) << " Groups:";
for(auto g : _sensorGroups) {
g->SensorGroupInterface::printConfig(ll);
g->printConfig(ll);
......@@ -722,7 +722,7 @@ protected:
*/
virtual void printConfiguratorConfig(LOG_LEVEL ll) {
//Overwrite if necessary
LOG_VAR(ll) << " No other plugin specific general parameters or entities defined";
LOG_VAR(ll) << " No other plugin-specific general parameters or entities defined";
}
/**
......
......@@ -81,19 +81,19 @@ public:
}
virtual void printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Sensor Group: " << _groupName;
LOG_VAR(ll) << " Sensor Group: " << _groupName;
if (_mqttPart != "") {
LOG_VAR(ll) << " MQTT part: " << _mqttPart;
LOG_VAR(ll) << " MQTT part: " << _mqttPart;
}
if (_sync) {
LOG_VAR(ll) << " Synchronized readings enabled";
LOG_VAR(ll) << " Synchronized readings enabled";
} else {
LOG_VAR(ll) << " Synchronized readings disabled";
LOG_VAR(ll) << " Synchronized readings disabled";
}
LOG_VAR(ll) << " minValues: " << _minValues;
LOG_VAR(ll) << " interval: " << _interval;
LOG_VAR(ll) << " minValues: " << _minValues;
LOG_VAR(ll) << " interval: " << _interval;
}
//have to be overwritten
......
......@@ -76,7 +76,7 @@ public:
}
virtual void printConfig(LOG_LEVEL ll) override {
LOG_VAR(ll) << " Sensors:";
LOG_VAR(ll) << " Sensors:";
for(auto s : _sensors) {
s->SensorBase::printConfig(ll, lg);
s->printConfig(ll, lg);
......
......@@ -57,12 +57,13 @@ public:
void setPropertyId(const std::string& property) { _propertyId = static_cast<BACNET_PROPERTY_ID>(stoul(property)); }
void setObjectIndex(int32_t objectIndex) { _objectIndex = objectIndex; }
void printConfig(LOG_LEVEL ll, LOGGER& lg) override {
LOG_VAR(ll) << " Factor: " << _factor;
LOG_VAR(ll) << " objectInstance: " << _objectInstance;
LOG_VAR(ll) << " objectType: " << _objectType;
LOG_VAR(ll) << " propertyId: " << _propertyId;
LOG_VAR(ll) << " objectIndex: " << _objectIndex;
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) override {
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << " Factor: " << _factor;
LOG_VAR(ll) << leading << " objectInstance: " << _objectInstance;
LOG_VAR(ll) << leading << " objectType: " << _objectType;
LOG_VAR(ll) << leading << " propertyId: " << _propertyId;
LOG_VAR(ll) << leading << " objectIndex: " << _objectIndex;
}
protected:
......
......@@ -90,10 +90,10 @@ void BACnetSensorGroup::readAsync() {
}
void BACnetSensorGroup::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " deviceInstance: " << _deviceInstance;
LOG_VAR(ll) << " deviceInstance: " << _deviceInstance;
if (_bacClient) {
LOG_VAR(ll) << " BACClient set";
LOG_VAR(ll) << " BACClient set";
} else {
LOG_VAR(ll) << " No BACClient set!";
LOG_VAR(ll) << " No BACClient set!";
}
}
......@@ -47,7 +47,8 @@ public:
GpfsmonSensorBase(const GpfsmonSensorBase& )=default;
GpfsmonSensorBase& operator=(const GpfsmonSensorBase &)=default;
void printConfig(LOG_LEVEL ll, LOGGER& lg) {
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' ');
std::string metric("unknown");
switch (_metric_type) {
case TIMESTAMP_GPFS:
......@@ -81,7 +82,7 @@ public:
metric = "SIZE";
break;
}
LOG_VAR(ll) << " Metric type: " << metric;
LOG_VAR(ll) << leading << " Metric type: " << metric;
}
protected:
......
......@@ -95,7 +95,7 @@ void GpfsmonSensorGroup::readAsync() {
}
void GpfsmonSensorGroup::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " No other specific attributes";
LOG_VAR(ll) << " No other specific attributes";
}
void GpfsmonSensorGroup::createTempFile(){
......
......@@ -79,12 +79,13 @@ public:
void setMsb(const std::string& msb) { _msb = stoi(msb); }
void setMsb(uint8_t msb) { _msb = msb; }
void printConfig(LOG_LEVEL ll, LOGGER& lg) {
LOG_VAR(ll) << " Record Id: " << _recordId;
LOG_VAR(ll) << " Factor: " << _factor;
LOG_VAR(ll) << " Raw Cmd: " << getRawCmdString();
LOG_VAR(ll) << " lsb: " << _lsb;
LOG_VAR(ll) << " msb: " << _msb;
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << " Record Id: " << _recordId;
LOG_VAR(ll) << leading << " Factor: " << _factor;
LOG_VAR(ll) << leading << " Raw Cmd: " << getRawCmdString();
LOG_VAR(ll) << leading << " lsb: " << _lsb;
LOG_VAR(ll) << leading << " msb: " << _msb;
}
protected:
......
......@@ -116,8 +116,8 @@ void IPMISensorGroup::readAsync() {
void IPMISensorGroup::printConfig(LOG_LEVEL ll) {
if (_host) {
LOG_VAR(ll) << " Host: " << _host->getHostName();
LOG_VAR(ll) << " Host: " << _host->getHostName();
} else {
LOG_VAR(ll) << " No Host set!";
LOG_VAR(ll) << " No Host set!";
}
}
......@@ -41,9 +41,10 @@ public:
_metric = metric;
}
void printConfig(LOG_LEVEL ll, LOGGER& lg) {
LOG_VAR(ll) << " CPU: " << _cpu;
LOG_VAR(ll) << " Metric: " << _metric;
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << " CPU: " << _cpu;
LOG_VAR(ll) << leading << " Metric: " << _metric;
}
protected:
......
......@@ -193,5 +193,5 @@ void MSRSensorGroup::printConfig(LOG_LEVEL ll) {
separator = ", ";
}
LOG_VAR(ll) << " CPUs: " << ss.str();
LOG_VAR(ll) << " CPUs: " << ss.str();
}
......@@ -65,7 +65,8 @@ public:
void setCounterData(int counterData) { _counterData = static_cast<PORT_COUNTER_DATA>(counterData); }
void printConfig(LOG_LEVEL ll, LOGGER& lg) {
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' ');
std::string cntData("unknown");
switch (_counterData) {
case portXmitData:
......@@ -150,7 +151,7 @@ public:
cntData = "uncorrectableErrors";
break;
}
LOG_VAR(ll) << " Counter data: " << cntData;
LOG_VAR(ll) << leading << " Counter data: " << cntData;
}
protected:
......
......@@ -199,6 +199,6 @@ void OpaSensorGroup::readAsync() {
}
void OpaSensorGroup::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " HFI Num: " << _hfiNum;
LOG_VAR(ll) << " Port Num: " << _portNum;
LOG_VAR(ll) << " HFI Num: " << _hfiNum;
LOG_VAR(ll) << " Port Num: " << _portNum;
}