11.3.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

OperatorConfiguratorTemplate.h 31.4 KB
Newer Older
1
//================================================================================
2
// Name        : OperatorConfiguratorTemplate.h
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Template implementing a standard OperatorConfiguratorInterface.
7 8 9 10 11 12 13 14 15 16
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18 19 20 21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23 24 25 26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28 29
#ifndef PROJECT_OPERATORCONFIGURATORTEMPLATE_H
#define PROJECT_OPERATORCONFIGURATORTEMPLATE_H
Alessio Netti's avatar
Alessio Netti committed
30 31 32 33 34 35 36 37

#include <map>
#include <set>

#include <boost/foreach.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
38 39
#include "OperatorTemplate.h"
#include "OperatorConfiguratorInterface.h"
40
#include "sensorbase.h"
Alessio Netti's avatar
Alessio Netti committed
41 42 43 44 45 46 47 48

#include <iostream>
#include <sstream>
#include <iomanip>

#define CFG_VAL	boost::property_tree::iptree&

/**
49
 * @brief Template that implements a standard OperatorConfiguratorInterface.
Alessio Netti's avatar
Alessio Netti committed
50
 *
51 52 53
 * @details Users should employ this template whenever possible, and create
 *          their own configurators only when strictly necessary.
 *
54
 * @ingroup operator
Alessio Netti's avatar
Alessio Netti committed
55
 */
56 57
template <class Operator, class SBase = SensorBase>
class OperatorConfiguratorTemplate : public OperatorConfiguratorInterface {
Alessio Netti's avatar
Alessio Netti committed
58 59 60

    // Verifying the types of input classes
    static_assert(std::is_base_of<SensorBase, SBase>::value, "SBase must derive from SensorBase!");
61
    static_assert(std::is_base_of<OperatorInterface, Operator>::value, "Operator must derive from OperatorInterface!");
Alessio Netti's avatar
Alessio Netti committed
62 63 64 65

protected:

    // For readability
66
    using O_Ptr = std::shared_ptr<Operator>;
Alessio Netti's avatar
Alessio Netti committed
67 68 69 70 71 72 73 74

    // Some wildcard characters
    const char COMMA = ',';
    const char OPEN_SQBRKET = '[';
    const char CLOSE_SQBRKET = ']';
    const char DASH = '-';

    // Keywords used to identify input and output sensor blocks
75 76 77 78 79
    const string INPUT_BLOCK_LEGACY = "input";
    const string OUTPUT_BLOCK_LEGACY = "output";
    const string INPUT_BLOCK = "unitInput";
    const string OUTPUT_BLOCK = "unitOutput";
    const string GLOBAL_OUTPUT_BLOCK = "globalOutput";
80 81
    const string ALL_CLAUSE = "all";
    const string ALL_REC_CLAUSE = "all-recursive";
Alessio Netti's avatar
Alessio Netti committed
82
    
Alessio Netti's avatar
Alessio Netti committed
83 84 85 86 87
public:

    /**
    * @brief            Class constructor
    */
88
    OperatorConfiguratorTemplate() :
89
            _queryEngine(QueryEngine::getInstance()),
90
            _operatorName("INVALID"),
Alessio Netti's avatar
Alessio Netti committed
91 92 93 94 95 96 97 98
            _baseName("INVALID"),
            _cfgPath(""),
            _mqttPrefix(""),
            _cacheInterval(900000) {}

    /**
    * @brief            Copy constructor is not available
    */
99
    OperatorConfiguratorTemplate(const OperatorConfiguratorTemplate&) = delete;
Alessio Netti's avatar
Alessio Netti committed
100 101 102 103

    /**
    * @brief            Assignment operator is not available
    */
104
    OperatorConfiguratorTemplate& operator=(const OperatorConfiguratorTemplate&) = delete;
Alessio Netti's avatar
Alessio Netti committed
105 106 107 108

    /**
    * @brief            Class destructor
    */
109 110
    virtual ~OperatorConfiguratorTemplate() {
        for (auto ta : _templateOperators)
Alessio Netti's avatar
Alessio Netti committed
111
            delete ta.second;
Alessio Netti's avatar
Alessio Netti committed
112 113
        for (auto ts : _templateSensors)
            delete ts.second;
114
        _templateOperators.clear();
Alessio Netti's avatar
Alessio Netti committed
115 116
        _templateSensors.clear();
        _templateProtoInputs.clear();
117 118
        _operatorInterfaces.clear();
        _operators.clear();
Alessio Netti's avatar
Alessio Netti committed
119 120 121
    }

    /**
122
    * @brief                    Sets default global settings for operators
Alessio Netti's avatar
Alessio Netti committed
123 124 125 126 127 128 129 130 131 132 133 134
    *
    *                           This method should be called once after constructing a configurator and before reading
    *                           the configuration, so that it has access to the default global settings (which can
    *                           be overridden.
    *
    * @param pluginSettings	    struct with global default settings for the plugins.
    */
    virtual void setGlobalSettings(const pluginSettings_t& pluginSettings) final {
        _mqttPrefix = pluginSettings.mqttPrefix;
        _cacheInterval = pluginSettings.cacheInterval;
    }

135 136 137 138 139 140 141
    /**
    * @brief                    Print configuration as read in.
    *
    * @param ll                 Logging level to log with
    */
    void printConfig(LOG_LEVEL ll) final {
        LOG_VAR(ll) << "    General: ";
Michael Ott's avatar
Michael Ott committed
142
        LOG_VAR(ll) << "          MQTT-Prefix:    " << (_mqttPrefix != "" ? _mqttPrefix : std::string("DEFAULT"));
143
        LOG_VAR(ll) << "          CacheInterval: " << _cacheInterval/1000 << " [s]";
144 145 146 147

        //prints plugin specific configurator attributes and entities if present
        printConfiguratorConfig(ll);

148 149 150
        LOG_VAR(ll) << "    Operators: ";
        for(auto a : _operators) {
            LOG_VAR(ll) << "        Operator: " << a->getName();
151 152 153 154
            a->printConfig(ll);
        }
    }

Alessio Netti's avatar
Alessio Netti committed
155
    /**
156
    * @brief            Read a config file and instantiate operators accordingly
Alessio Netti's avatar
Alessio Netti committed
157
    *
158
    *                   This method supplies standard algorithms to instantiate operators and parse units from config
Alessio Netti's avatar
Alessio Netti committed
159 160 161 162 163 164 165 166
    *                   files accordingly. It should be overridden only if strictly necessary, which generally should
    *                   not happen.
    *
    * @param cfgPath    Path to the config file
    * @return	        True if successful, false otherwise
    */
    bool readConfig(std::string cfgPath) {
        _cfgPath = cfgPath;
Alessio Netti's avatar
Alessio Netti committed
167
        _unitGen.setNavigator(_queryEngine.getNavigator());
Alessio Netti's avatar
Alessio Netti committed
168 169 170 171 172 173 174

        boost::property_tree::iptree cfg;
        boost::property_tree::read_info(cfgPath, cfg);

        // Read global variables (if present overwrite those from global.conf)
        readGlobal(cfg);

175
        // Reading operators and template operators
Alessio Netti's avatar
Alessio Netti committed
176 177
        BOOST_FOREACH(boost::property_tree::iptree::value_type &val, cfg) {
            // In this block templates are parsed and read
178 179
            if (boost::iequals(val.first, "template_" + _operatorName)) {
                LOG(debug) << "Template " << _operatorName << " \"" << val.second.data() << "\"";
Alessio Netti's avatar
Alessio Netti committed
180
                if (!val.second.empty()) {
181 182 183 184 185
                    Operator* op = new Operator(val.second.data());
                    op->setTemplate(true);
                    if (!readOperator(*op, val.second)) {
                        LOG(warning) << "Template " << _operatorName << " \"" << val.second.data() << "\" has bad values! Ignoring...";
                        delete op;
Alessio Netti's avatar
Alessio Netti committed
186 187
                    }
                }
Alessio Netti's avatar
Alessio Netti committed
188 189 190 191 192 193 194 195 196 197
            // Sensor templates are read
            } else if (boost::iequals(val.first, "template_" + _baseName)) {
                LOG(debug) << "Template " << _baseName << " \"" << val.second.data() << "\"";
                if (!val.second.empty()) {
                    SBase* base = new SBase(val.second.data());
                    if (!readSensorBase(*base, val.second, true)) {
                        LOG(warning) << "Template " << _baseName << " \"" << val.second.data() << "\" has bad values! Ignoring...";
                        delete base;
                    }
                }
198 199 200
            // Here we read and instantiate operators
            } else if (boost::iequals(val.first, _operatorName)) {
                LOG(debug) << _operatorName << " \"" << val.second.data() << "\"";
Alessio Netti's avatar
Alessio Netti committed
201
                if (!val.second.empty()) {
202 203 204
                    O_Ptr op = std::make_shared<Operator>(val.second.data());
                    if (readOperator(*op, val.second)) {
                        // If the operator must be duplicated for each compute unit, we copy-construct identical
Alessio Netti's avatar
Alessio Netti committed
205
                        // instances that have different unit IDs
206 207 208
                        unsigned numUnits = op->getUnits().size();
                        op->releaseUnits();
                        if(op->getDuplicate() && numUnits>1) {
Alessio Netti's avatar
Alessio Netti committed
209
                            for(unsigned int i=0; i < numUnits; i++) {
210 211
                                O_Ptr opCopy = std::make_shared<Operator>(*op);
                                opCopy->setUnitID(i);
Alessio Netti's avatar
Alessio Netti committed
212
                                //opCopy->setName(opCopy->getName() + "@" + op->getUnits()[i]->getName());
213 214
                                opCopy->collapseUnits();
                                storeOperator(opCopy);
Alessio Netti's avatar
Alessio Netti committed
215 216
                            }
                        } else
217
                            storeOperator(op);
Alessio Netti's avatar
Alessio Netti committed
218
                    } else {
219
                        LOG(warning) << _operatorName << " \"" << val.second.data() << "\" has bad values! Ignoring...";
Alessio Netti's avatar
Alessio Netti committed
220 221
                    }
                }
222 223 224
            } else if( !boost::iequals(val.first, "global") ) {
                LOG(error) << "\"" << val.first << "\": unknown construct!";
                return false;
Alessio Netti's avatar
Alessio Netti committed
225 226 227 228 229 230
            }
        }
        return true;
    }

    /**
Alessio Netti's avatar
Alessio Netti committed
231
    * @brief            Clears the plugin configuration
Alessio Netti's avatar
Alessio Netti committed
232
    *
233
    *                   This will stop any operators that have been created, destroy them and return the plugin to
Alessio Netti's avatar
Alessio Netti committed
234
    *                   its uninitialized state.
Alessio Netti's avatar
Alessio Netti committed
235 236
    *
    */
Alessio Netti's avatar
Alessio Netti committed
237
    void clearConfig() final {
238 239 240
        // Stop all operators
        for(auto op : _operators)
            op->stop();
Alessio Netti's avatar
Alessio Netti committed
241

242 243 244
        // Wait for all operators to finish
        for(auto op : _operators)
            op->wait();
Alessio Netti's avatar
Alessio Netti committed
245

246 247 248
        // First of all, delete all template operators and sensors
        for (auto to : _templateOperators)
            delete to.second;
Alessio Netti's avatar
Alessio Netti committed
249 250
        for (auto ts : _templateSensors)
            delete ts.second;
Alessio Netti's avatar
Alessio Netti committed
251

252 253 254 255
        // Clear all operators
        _operatorInterfaces.clear();
        _operators.clear();
        _templateOperators.clear();
Alessio Netti's avatar
Alessio Netti committed
256 257
        _templateSensors.clear();
        _templateProtoInputs.clear();
Alessio Netti's avatar
Alessio Netti committed
258 259 260
    }

    /**
261
    * @brief            Clear all instantiated operators and read the configuration again
Alessio Netti's avatar
Alessio Netti committed
262
    *
263
    *                   This will stop any operators that have been created, destroy them and finally create new ones
Alessio Netti's avatar
Alessio Netti committed
264 265 266 267 268 269
    *                   from a new configuration read pass.
    *
    * @return	        True if successful, false otherwise
    */
    bool reReadConfig() final {
        clearConfig();
Alessio Netti's avatar
Alessio Netti committed
270 271 272 273 274 275

        // Reading the configuration once again
        return readConfig(_cfgPath);
    }

    /**
276
    * @brief                   Return all instantiated operators
Alessio Netti's avatar
Alessio Netti committed
277
    *
278
    * @return	               Vector containing pointers to all operator interfaces of this plugin
Alessio Netti's avatar
Alessio Netti committed
279
    */
280 281
    std::vector<OperatorPtr>& getOperators() final {
        return _operatorInterfaces;
Alessio Netti's avatar
Alessio Netti committed
282 283 284 285 286
    }

protected:

    /**
287
    * @brief           Reads any derived operator attributes
Alessio Netti's avatar
Alessio Netti committed
288
    *
289
    *                  Pure virtual interface method, responsible for reading plugin-specific operator attributes.
Alessio Netti's avatar
Alessio Netti committed
290
    *
291
    * @param op		   The operator for which derived attributes must be set
Alessio Netti's avatar
Alessio Netti committed
292 293
    * @param config	   A Boost property (sub-)tree containing the config attributes
    */
294
    virtual void operatorAttributes(Operator& op, CFG_VAL config) = 0;
Alessio Netti's avatar
Alessio Netti committed
295 296 297 298 299 300 301 302 303 304 305

    /**
    * @brief           Reads any derived sensor attributes
    *
    *                  Pure virtual interface method, responsible for reading plugin-specific sensor attributes.
    *
    * @param s		   The sensor for which derived attributes must be set
    * @param config	   A Boost property (sub-)tree containing the config attributes
    */
    virtual void sensorBase(SBase& s, CFG_VAL config) = 0;

306 307 308 309 310 311
    /**
    * @brief           Performs additional checks on instantiated units
    *
    *                  Pure virtual interface method, responsible for performing user-specified checks on units.
    *
    * @param u		   The unit that has been created
Alessio Netti's avatar
Alessio Netti committed
312
    * @return          True if the unit is valid, False otherwise
313
    */
Alessio Netti's avatar
Alessio Netti committed
314
    virtual bool unit(UnitTemplate<SBase>& u) = 0;
315

Alessio Netti's avatar
Alessio Netti committed
316 317 318 319 320 321 322 323 324
    /**
    * @brief           Reads additional global attributes on top of the default ones
    *
    *                  Virtual interface method, responsible for reading plugin-specific global attributes.
    *
    * @param config	   A Boost property (sub-)tree containing the global values
    */
    virtual void global(CFG_VAL config) {}

325 326 327 328 329 330 331 332
    /**
    * @brief            Print information about configurable configurator attributes.
    *
    *                   This method is virtual and can be overridden on a per-plugin basis.
    *
    * @param ll         Severity level to log with
    */
    virtual void printConfiguratorConfig(LOG_LEVEL ll) {
Alessio Netti's avatar
Alessio Netti committed
333
        LOG_VAR(ll) << "          No other plugin-specific general parameters defined";
334
    }
Alessio Netti's avatar
Alessio Netti committed
335 336

    /**
337
    * @brief                   Store an operator in the internal vectors
Alessio Netti's avatar
Alessio Netti committed
338
    *
339
    * @param op                Shared pointer to a OperatorInterface object
Alessio Netti's avatar
Alessio Netti committed
340
    */
341 342 343
    void storeOperator(O_Ptr op) {
        _operators.push_back(op);
        _operatorInterfaces.push_back(op);
Alessio Netti's avatar
Alessio Netti committed
344 345 346
    }

    /**
347
    * @brief                   Reads a single operator configuration block
Alessio Netti's avatar
Alessio Netti committed
348 349
    *
    *                          Non-virtual interface method for class-internal use only. This will configure an
350 351
    *                          Operator object, and instantiate all units associated to it. All derived attributes
    *                          and additional configuration must be performed in the operatorAttributes() virtual method.
Alessio Netti's avatar
Alessio Netti committed
352
    *
353 354
    * @param op	               The operator that must be configured
    * @param config	           A boost property (sub-)tree containing the operator values
Alessio Netti's avatar
Alessio Netti committed
355 356
    * @return	               True if successful, false otherwise
    */
357
    bool readOperator(Operator& op, CFG_VAL config) {
Alessio Netti's avatar
Alessio Netti committed
358
        // Vectors containing "prototype" inputs and outputs to be modified with the actual compute units
359
        std::vector<shared_ptr<SBase>> protoInputs, protoOutputs, protoGlobalOutputs;
360
        inputMode_t inputMode = SELECTIVE;
361
        // Check for the existence of a template definition to initialize the operator
Alessio Netti's avatar
Alessio Netti committed
362 363 364
        boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
        if(def) {
            LOG(debug) << "  Using \"" << def.get().data() << "\" as default.";
365 366 367 368 369 370
            auto it = _templateOperators.find(def.get().data());
            if(it != _templateOperators.end()) {
                op = *(it->second);
                op.setName(config.data());
                op.setTemplate(false);
                // Operators instantiated from templates DO NOT share the same units and output sensors. 
Alessio Netti's avatar
Alessio Netti committed
371
                // This would lead to too much naming ambiguity and is generally just not needed
372
                op.clearUnits();
Alessio Netti's avatar
Alessio Netti committed
373 374 375
                // The input sensors defined in the template are on the other hand preserved; this is meant as a 
                // workaround to shorten certain configurations 
                protoInputs = _templateProtoInputs[def.get().data()];
Alessio Netti's avatar
Alessio Netti committed
376
            } else {
377
                LOG(warning) << "Template " << _operatorName << "\"" << def.get().data() << "\" not found! Using standard values.";
Alessio Netti's avatar
Alessio Netti committed
378 379
            }
        }
380
        // Reading attributes associated to OperatorInterface
Alessio Netti's avatar
Alessio Netti committed
381 382 383
        BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
        {
            if (boost::iequals(val.first, "interval")) {
384
                op.setInterval(stoull(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
385
            } else if (boost::iequals(val.first, "minValues")) {
386
                op.setMinValues(stoull(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
387
            } else if (boost::iequals(val.first, "mqttPart")) {
388
                op.setMqttPart(val.second.data());
389 390
            } else if (boost::iequals(val.first, "enforceTopics")) {
                op.setEnforceTopics(to_bool(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
391
            } else if (boost::iequals(val.first, "sync")) {
392
                op.setSync(to_bool(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
393 394
            } else if (boost::iequals(val.first, "disabled")) {
                op.setDisabled(to_bool(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
395
            } else if (boost::iequals(val.first, "delay")) {
396
                op.setDelayInterval(stoull(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
397
            } else if (boost::iequals(val.first, "duplicate")) {
398
                op.setDuplicate(to_bool(val.second.data()));
399
            } else if (boost::iequals(val.first, "relaxed")) {
400
                op.setRelaxed(to_bool(val.second.data()));
401
            } else if (boost::iequals(val.first, "unitCacheLimit")) {
402
                op.setUnitCacheLimit(stoull(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
403
            } else if (boost::iequals(val.first, "streaming")) {
404
                op.setStreaming(to_bool(val.second.data()));
405 406
            } else if (isInputBlock(val.first) || isOutputBlock(val.first) || isGlobalOutputBlock(val.first)) {
                // Instantiating all sensors contained within the "unitInput", "unitOutput" or "globalOutput" block
407
                BOOST_FOREACH(boost::property_tree::iptree::value_type &valInner, val.second)
Alessio Netti's avatar
Alessio Netti committed
408 409
                {
                    if (boost::iequals(valInner.first, _baseName)) {
410
                        LOG(debug) << "    I/O " << _baseName << " " << valInner.second.data();
411
                        SBase sensor(valInner.second.data());
Alessio Netti's avatar
Alessio Netti committed
412
                        if (readSensorBase(sensor, valInner.second, false)) {
Alessio Netti's avatar
Alessio Netti committed
413
                            shared_ptr<SBase> sensorPtr = make_shared<SBase>(sensor);
414 415 416 417 418 419
                            if(isInputBlock(val.first))
                                protoInputs.push_back(sensorPtr);
                            else if(isOutputBlock(val.first))
                                protoOutputs.push_back(sensorPtr);
                            else
                                protoGlobalOutputs.push_back(sensorPtr);
Alessio Netti's avatar
Alessio Netti committed
420
                        } else {
421
                            LOG(warning) << "I/O " << _baseName << " " << op.getName() << "::" << sensor.getName() << " could not be read! Omitting";
Alessio Netti's avatar
Alessio Netti committed
422
                        }
423 424 425
                    // An "all" or "all-recursive" statement in the input block causes all sensors related to the specific
                    // unit to be picked
                    } else if (isInputBlock(val.first) && (boost::iequals(valInner.first, ALL_CLAUSE) || boost::iequals(valInner.first, ALL_REC_CLAUSE))) {
426 427 428 429
                        inputMode = boost::iequals(valInner.first, ALL_CLAUSE) ? ALL : ALL_RECURSIVE;
                    } else {
                        LOG(error) << "\"" << valInner.first << "\": unknown I/O construct!";
                        return false;
Alessio Netti's avatar
Alessio Netti committed
430 431 432 433
                    }
                }
            }
        }
434
        
Alessio Netti's avatar
Alessio Netti committed
435
        // Reading all derived attributes, if any
436
        operatorAttributes(op, config);
437
        // Instantiating units
438 439
        if(!op.getTemplate()) {
            op.setMqttPart(MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(op.getMqttPart()));
440
            return readUnits(op, protoInputs, protoOutputs, protoGlobalOutputs, inputMode);
Alessio Netti's avatar
Alessio Netti committed
441
        } else {
442 443
            // If the operator is a template, we add it to the related map
            auto ret = _templateOperators.insert(std::pair<std::string, Operator*>(op.getName(), &op));
Alessio Netti's avatar
Alessio Netti committed
444
            if(!ret.second) {
445
                LOG(warning) << "Template " << _operatorName << " " << op.getName() << " already exists! Omitting...";
Alessio Netti's avatar
Alessio Netti committed
446 447
                return false;
            }
448
            _templateProtoInputs.insert(std::pair<std::string, std::vector<shared_ptr<SBase>>>(op.getName(), protoInputs));
Alessio Netti's avatar
Alessio Netti committed
449
        }
450
        
Alessio Netti's avatar
Alessio Netti committed
451 452 453 454 455 456 457 458 459 460 461 462
        return true;
    }

    /**
    * @brief                   Reads a single sensor configuration block
    *
    *                          Non-virtual interface method for class-internal use only. This will configure a
    *                          sensor object. All derived attributes and additional configuration must be performed
    *                          in the sensorBase() virtual method.
    *
    * @param sBase	           The sensor that must be configured
    * @param config	           A boost property (sub-)tree containing the sensor values
Micha Müller's avatar
Micha Müller committed
463
    * @param isTemplate        Do we read a template sensor?
Alessio Netti's avatar
Alessio Netti committed
464 465
    * @return	               True if successful, false otherwise
    */
Alessio Netti's avatar
Alessio Netti committed
466
    bool readSensorBase(SBase& sBase, CFG_VAL config, bool isTemplate=false) {
Alessio Netti's avatar
Alessio Netti committed
467
        sBase.setCacheInterval(_cacheInterval);
Alessio Netti's avatar
Alessio Netti committed
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
        if (!isTemplate) {
            // Copying parameters from the template (if defined)
            boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
            if(def) {
                LOG(debug) << "  Using \"" << def.get().data() << "\" as default.";
                auto it = _templateSensors.find(def.get().data());
                if(it != _templateSensors.end()) {
                    sBase = *(it->second);
                    sBase.setName(config.data());
                } else {
                    LOG(warning) << "Template " << _baseName << "\" " << def.get().data() << "\" not found! Using standard values.";
                }
            }
        }
        // Reading other sensor parameters
Alessio Netti's avatar
Alessio Netti committed
483 484 485
        BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
            if (boost::iequals(val.first, "mqttsuffix")) {
                sBase.setMqtt(val.second.data());
486
            } else if (boost::iequals(val.first, "skipConstVal")) {
Alessio Netti's avatar
Alessio Netti committed
487
                sBase.setSkipConstVal(to_bool(val.second.data()));
488
            } else if (boost::iequals(val.first, "delta")) {
Alessio Netti's avatar
Alessio Netti committed
489
                sBase.setDelta(to_bool(val.second.data()));
490
            } else if (boost::iequals(val.first, "subSampling")) {
Alessio Netti's avatar
Alessio Netti committed
491
                sBase.setSubsampling(std::stoi(val.second.data()));
492 493
            } else if (boost::iequals(val.first, "publish")) {
                sBase.setPublish(to_bool(val.second.data()));
494 495
            } else if (boost::iequals(val.first, "metadata")) {
                SensorMetadata sm;
Alessio Netti's avatar
Alessio Netti committed
496 497
                if(sBase.getMetadata())
                    sm = *sBase.getMetadata();
498 499 500 501 502 503
                try {
                    sm.parsePTREE(val.second);
                    sBase.setMetadata(sm);
                } catch(const std::exception& e) {
                    LOG(error) << "  Metadata parsing failed for sensor " << sBase.getName() << "!" << std::endl;
                }
Alessio Netti's avatar
Alessio Netti committed
504 505 506
            }
        }
        sensorBase(sBase, config);
Alessio Netti's avatar
Alessio Netti committed
507 508 509 510 511 512 513 514 515
        
        if(isTemplate) {
            auto ret = _templateSensors.insert(std::pair<std::string, SBase*>(sBase.getName(), &sBase));
            if(!ret.second) {
                LOG(warning) << "Template " << _baseName << " " << sBase.getName() << " already exists! Omitting...";
                return false;
            }
        }
        
Alessio Netti's avatar
Alessio Netti committed
516 517
        return true;
    }
518 519
    
    /**
520
     * @brief               Instantiates all necessary units for a single operator
521
     * 
522
     *                      This method will create and assign all unit objects for a single operator, given a set
523
     *                      of prototype input sensors, prototype output sensors and an input mode. This method is
524 525
     *                      virtual such as to allow for flexibility in case specific operators require different
     *                      assignment policies (such as job operators).
526
     * 
527 528 529 530 531 532
     * @param op                  The operator whose units must be created
     * @param protoInputs         The vector of prototype input sensors
     * @param protoOutputs        The vector of prototype output sensors
     * @param protoGlobalOutputs  The vector of prototype global output sensors, if any
     * @param inputMode           Input mode to be used (selective, all or all_recursive)
     * @return                    True if successful, false otherwise
533
     */
534 535
    virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs,
            std::vector<shared_ptr<SBase>>& protoGlobalOutputs, inputMode_t inputMode) {
536
        vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
Alessio Netti's avatar
Alessio Netti committed
537 538
        if(protoOutputs.empty())
            LOG(debug) << "    No output specified, generating sink unit.";
539 540 541
        // If we employ a hierarchical unit (which will be the root unit) we disable duplication 
        if(!protoGlobalOutputs.empty())
            op.setDuplicate(false);
Alessio Netti's avatar
Alessio Netti committed
542
        
543
        try {
544 545
            units = _unitGen.generateAutoUnit(SensorNavigator::rootKey, std::list<std::string>(), protoGlobalOutputs, protoInputs, 
                    protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), op.getEnforceTopics(), op.getRelaxed());
546 547
        }
        catch (const std::exception &e) {
548
            LOG(error) << _operatorName << " " << op.getName() << ": Error when creating units: " << e.what();
549 550 551 552 553
            delete units;
            return false;
        }

        for (auto &u: *units) {
554 555 556
            if (op.getStreaming()) {
                if(!constructSensorTopics(*u, op)) {
                    op.clearUnits();
557 558 559
                    delete units;
                    return false;
                }
560 561
                if (!unit(*u)) {
                    LOG(error) << "    Unit " << u->getName() << " did not pass the final check!";
562
                    op.clearUnits();
563 564 565 566
                    delete units;
                    return false;
                } else {
                    LOG(debug) << "    Unit " << u->getName() << " generated.";
567
                    op.addUnit(u);
568 569 570
                }
            } else {
                if (unit(*u)) {
571
                    op.addToUnitCache(u);
572 573 574
                    LOG(debug) << "    Template unit for on-demand operation " + u->getName() + " generated.";
                } else {
                    LOG(error) << "    Template unit " << u->getName() << " did not pass the final check!";
575
                    op.clearUnits();
576 577 578 579 580 581 582 583
                    delete units;
                    return false;
                }
            }
        }
        delete units;
        return true;
    }
Alessio Netti's avatar
Alessio Netti committed
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605

    /**
    * @brief                   Reads the global configuration block
    *
    *                          Non-virtual interface method for class-internal use only. This will read the "global"
    *                          configuration block in a file, overwriting any default settings on a per-plugin base.
    *                          Any derived or additional attributes must be added through the global() virtual method.
    *
    * @param config	           A Boost property (sub-)tree containing the global block
    * @return	               True if successful, false otherwise
    */
    bool readGlobal(CFG_VAL config) {
        boost::optional<boost::property_tree::iptree&> globalVals = config.get_child_optional("global");
        if (globalVals) {
            BOOST_FOREACH(boost::property_tree::iptree::value_type &global, config.get_child("global")) {
                if (boost::iequals(global.first, "mqttprefix")) {
                    _mqttPrefix = global.second.data();
                    LOG(debug) << "  Using own MQTT-Prefix " << _mqttPrefix;
                } else if (boost::iequals(global.first, "cacheInterval")) {
                    _cacheInterval = stoul(global.second.data());
                    LOG(debug) << "  Using own caching interval " << _cacheInterval << " [s]";
                    _cacheInterval *= 1000;
Alessio Netti's avatar
Alessio Netti committed
606
                } 
Alessio Netti's avatar
Alessio Netti committed
607 608 609 610 611 612
            }
            global(config.get_child("global"));
        }
        return true;
    }

Alessio Netti's avatar
Alessio Netti committed
613
    /**
Alessio Netti's avatar
Alessio Netti committed
614
    * @brief                   Adjusts the topics and names of the sensors
Alessio Netti's avatar
Alessio Netti committed
615
    *
Alessio Netti's avatar
Alessio Netti committed
616
    *                          Names are set according to the corresponding topic.
Alessio Netti's avatar
Alessio Netti committed
617 618
    *
    * @return                  true if successful, false otherwise
Alessio Netti's avatar
Alessio Netti committed
619
    */
620
    bool constructSensorTopics(UnitTemplate<SBase>& u, Operator& op) {
Alessio Netti's avatar
Alessio Netti committed
621
        // Performing name construction
622
        for(auto& s: u.getOutputs()) {
623
            adjustSensor(s, op, u);
624
        }
625
        for(auto& subUnit: u.getSubUnits())
626
            for(auto& s : subUnit->getOutputs()) {
627
                adjustSensor(s, op, u);
628
            }
Alessio Netti's avatar
Alessio Netti committed
629
        return true;
Alessio Netti's avatar
Alessio Netti committed
630
    }
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659

    /**
    * @brief                   Adjusts a single sensor
    *
    */
    void adjustSensor(std::shared_ptr<SBase> s, Operator& op, UnitTemplate<SBase>& u) {
        s->setName(s->getMqtt());
        SensorMetadata* sm = s->getMetadata();
        if(sm) {
            if(sm->isOperation) {
                s->clearMetadata();
                if(u.getInputs().size() != 1) {
                    LOG(error) << _operatorName << " " << op.getName() << ": Ambiguous operation field for sensor " << s->getName();
                    return;
                }
                // Replacing the metadata to publish the sensor as an operation of its corresponding input
                SensorMetadata smNew;
                smNew.publicName = u.getInputs()[0]->getMqtt();
                smNew.addOperation(s->getMqtt());
                s->setMetadata(smNew);
            } else {
                sm->publicName = s->getMqtt();
                sm->pattern = s->getMqtt();
                sm->isVirtual = false;
                if (sm->interval == 0)
                    sm->interval = (unsigned long long) op.getInterval() * 1000000;
            }
        }
    }
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
    
    /**
    * @brief                   Returns true if the input string describes an input block
    * 
    * @param s                 The string to be checked 
    * @return                  True if s is a input block, false otherwise
    */
    bool isInputBlock(const std::string& s ) {
        return boost::iequals(s, INPUT_BLOCK) || boost::iequals(s, INPUT_BLOCK_LEGACY);
    }

    /**
    * @brief                   Returns true if the input string describes an output block
    * 
    * @param s                 The string to be checked 
    * @return                  True if s is a output block, false otherwise
    */
    bool isOutputBlock(const std::string& s ) {
        return boost::iequals(s, OUTPUT_BLOCK) || boost::iequals(s, OUTPUT_BLOCK_LEGACY);
    }

    /**
    * @brief                   Returns true if the input string describes a global output block
    * 
    * @param s                 The string to be checked 
    * @return                  True if s is a global output block, false otherwise
    */
    bool isGlobalOutputBlock(const std::string& s ) {
        return boost::iequals(s, GLOBAL_OUTPUT_BLOCK);
    }
Alessio Netti's avatar
Alessio Netti committed
690

Alessio Netti's avatar
Alessio Netti committed
691
    // Instance of a QueryEngine object
692
    QueryEngine&     _queryEngine;
Alessio Netti's avatar
Alessio Netti committed
693 694
    // UnitGenerator object used to create units
    UnitGenerator<SBase>    _unitGen;
Alessio Netti's avatar
Alessio Netti committed
695

696 697
    // Keyword used to identify operator blocks in config files
    std::string		_operatorName;
Alessio Netti's avatar
Alessio Netti committed
698 699 700 701 702 703 704 705 706
    // Keyword used to identify sensors in config files
    std::string     _baseName;

    // Path of the configuration file that must be used
    std::string 	_cfgPath;
    // Default MQTT prefix to be used when creating output sensors
    std::string		_mqttPrefix;
    // Interval in seconds for the cache of each sensor
    unsigned int	_cacheInterval;
707 708 709 710 711 712
    // The vector of operators, in the form of pointers to OperatorInterface objects
    std::vector<OperatorPtr> 	_operatorInterfaces;
    // Like the above, but containing the operators in their actual types
    std::vector<O_Ptr>		_operators;
    // Map of the template operators that were defined in the config file - used for easy retrieval and instantiation
    std::map<std::string, Operator*> _templateOperators;
Alessio Netti's avatar
Alessio Netti committed
713 714
    // Map of the template sensors that were defined
    std::map<std::string, SBase*> _templateSensors;
715
    // Map of the protoinputs belonging to template operators
Alessio Netti's avatar
Alessio Netti committed
716
    std::map<std::string, std::vector<shared_ptr<SBase>>> _templateProtoInputs;
Alessio Netti's avatar
Alessio Netti committed
717 718
};

719
#endif //PROJECT_OPERATORCONFIGURATORTEMPLATE_H