OperatorConfiguratorTemplate.h 31.6 KB
Newer Older
1
//================================================================================
2
// Name        : OperatorConfiguratorTemplate.h
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Template implementing a standard OperatorConfiguratorInterface.
7
8
9
10
11
12
13
14
15
16
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28
29
#ifndef PROJECT_OPERATORCONFIGURATORTEMPLATE_H
#define PROJECT_OPERATORCONFIGURATORTEMPLATE_H
Alessio Netti's avatar
Alessio Netti committed
30
31
32
33
34
35
36
37

#include <map>
#include <set>

#include <boost/foreach.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
38
39
#include "OperatorTemplate.h"
#include "OperatorConfiguratorInterface.h"
40
#include "sensorbase.h"
Alessio Netti's avatar
Alessio Netti committed
41
42
43
44
45
46
47
48

#include <iostream>
#include <sstream>
#include <iomanip>

#define CFG_VAL	boost::property_tree::iptree&

/**
49
 * @brief Template that implements a standard OperatorConfiguratorInterface.
Alessio Netti's avatar
Alessio Netti committed
50
 *
51
52
53
 * @details Users should employ this template whenever possible, and create
 *          their own configurators only when strictly necessary.
 *
54
 * @ingroup operator
Alessio Netti's avatar
Alessio Netti committed
55
 */
56
57
template <class Operator, class SBase = SensorBase>
class OperatorConfiguratorTemplate : public OperatorConfiguratorInterface {
Alessio Netti's avatar
Alessio Netti committed
58
59
60

    // Verifying the types of input classes
    static_assert(std::is_base_of<SensorBase, SBase>::value, "SBase must derive from SensorBase!");
61
    static_assert(std::is_base_of<OperatorInterface, Operator>::value, "Operator must derive from OperatorInterface!");
Alessio Netti's avatar
Alessio Netti committed
62
63
64
65

protected:

    // For readability
66
    using O_Ptr = std::shared_ptr<Operator>;
Alessio Netti's avatar
Alessio Netti committed
67
68
69
70
71
72
73
74

    // Some wildcard characters
    const char COMMA = ',';
    const char OPEN_SQBRKET = '[';
    const char CLOSE_SQBRKET = ']';
    const char DASH = '-';

    // Keywords used to identify input and output sensor blocks
75
76
77
78
79
    const string INPUT_BLOCK_LEGACY = "input";
    const string OUTPUT_BLOCK_LEGACY = "output";
    const string INPUT_BLOCK = "unitInput";
    const string OUTPUT_BLOCK = "unitOutput";
    const string GLOBAL_OUTPUT_BLOCK = "globalOutput";
80
81
    const string ALL_CLAUSE = "all";
    const string ALL_REC_CLAUSE = "all-recursive";
Alessio Netti's avatar
Alessio Netti committed
82
    
Alessio Netti's avatar
Alessio Netti committed
83
84
85
86
87
public:

    /**
    * @brief            Class constructor
    */
88
    OperatorConfiguratorTemplate() :
89
            _queryEngine(QueryEngine::getInstance()),
90
            _operatorName("INVALID"),
Alessio Netti's avatar
Alessio Netti committed
91
92
93
94
95
96
97
98
            _baseName("INVALID"),
            _cfgPath(""),
            _mqttPrefix(""),
            _cacheInterval(900000) {}

    /**
    * @brief            Copy constructor is not available
    */
99
    OperatorConfiguratorTemplate(const OperatorConfiguratorTemplate&) = delete;
Alessio Netti's avatar
Alessio Netti committed
100
101
102
103

    /**
    * @brief            Assignment operator is not available
    */
104
    OperatorConfiguratorTemplate& operator=(const OperatorConfiguratorTemplate&) = delete;
Alessio Netti's avatar
Alessio Netti committed
105
106
107
108

    /**
    * @brief            Class destructor
    */
109
110
    virtual ~OperatorConfiguratorTemplate() {
        for (auto ta : _templateOperators)
Alessio Netti's avatar
Alessio Netti committed
111
            delete ta.second;
Alessio Netti's avatar
Alessio Netti committed
112
113
        for (auto ts : _templateSensors)
            delete ts.second;
114
        _templateOperators.clear();
Alessio Netti's avatar
Alessio Netti committed
115
116
        _templateSensors.clear();
        _templateProtoInputs.clear();
117
118
        _operatorInterfaces.clear();
        _operators.clear();
Alessio Netti's avatar
Alessio Netti committed
119
120
121
    }

    /**
122
    * @brief                    Sets default global settings for operators
Alessio Netti's avatar
Alessio Netti committed
123
124
125
126
127
128
129
130
131
132
133
134
    *
    *                           This method should be called once after constructing a configurator and before reading
    *                           the configuration, so that it has access to the default global settings (which can
    *                           be overridden.
    *
    * @param pluginSettings	    struct with global default settings for the plugins.
    */
    virtual void setGlobalSettings(const pluginSettings_t& pluginSettings) final {
        _mqttPrefix = pluginSettings.mqttPrefix;
        _cacheInterval = pluginSettings.cacheInterval;
    }

135
136
137
138
139
140
141
    /**
    * @brief                    Print configuration as read in.
    *
    * @param ll                 Logging level to log with
    */
    void printConfig(LOG_LEVEL ll) final {
        LOG_VAR(ll) << "    General: ";
Michael Ott's avatar
Michael Ott committed
142
        LOG_VAR(ll) << "          MQTT-Prefix:    " << (_mqttPrefix != "" ? _mqttPrefix : std::string("DEFAULT"));
143
        LOG_VAR(ll) << "          CacheInterval: " << _cacheInterval/1000 << " [s]";
144
145
146
147

        //prints plugin specific configurator attributes and entities if present
        printConfiguratorConfig(ll);

148
149
150
        LOG_VAR(ll) << "    Operators: ";
        for(auto a : _operators) {
            LOG_VAR(ll) << "        Operator: " << a->getName();
151
152
153
154
            a->printConfig(ll);
        }
    }

Alessio Netti's avatar
Alessio Netti committed
155
    /**
156
    * @brief            Read a config file and instantiate operators accordingly
Alessio Netti's avatar
Alessio Netti committed
157
    *
158
    *                   This method supplies standard algorithms to instantiate operators and parse units from config
Alessio Netti's avatar
Alessio Netti committed
159
160
161
162
163
164
165
166
    *                   files accordingly. It should be overridden only if strictly necessary, which generally should
    *                   not happen.
    *
    * @param cfgPath    Path to the config file
    * @return	        True if successful, false otherwise
    */
    bool readConfig(std::string cfgPath) {
        _cfgPath = cfgPath;
Alessio Netti's avatar
Alessio Netti committed
167
        _unitGen.setNavigator(_queryEngine.getNavigator());
Alessio Netti's avatar
Alessio Netti committed
168
169
170
171
172
173
174

        boost::property_tree::iptree cfg;
        boost::property_tree::read_info(cfgPath, cfg);

        // Read global variables (if present overwrite those from global.conf)
        readGlobal(cfg);

175
        // Reading operators and template operators
Alessio Netti's avatar
Alessio Netti committed
176
177
        BOOST_FOREACH(boost::property_tree::iptree::value_type &val, cfg) {
            // In this block templates are parsed and read
178
179
            if (boost::iequals(val.first, "template_" + _operatorName)) {
                LOG(debug) << "Template " << _operatorName << " \"" << val.second.data() << "\"";
Alessio Netti's avatar
Alessio Netti committed
180
                if (!val.second.empty()) {
181
182
183
184
185
                    Operator* op = new Operator(val.second.data());
                    op->setTemplate(true);
                    if (!readOperator(*op, val.second)) {
                        LOG(warning) << "Template " << _operatorName << " \"" << val.second.data() << "\" has bad values! Ignoring...";
                        delete op;
Alessio Netti's avatar
Alessio Netti committed
186
187
                    }
                }
Alessio Netti's avatar
Alessio Netti committed
188
189
190
191
192
193
194
195
196
197
            // Sensor templates are read
            } else if (boost::iequals(val.first, "template_" + _baseName)) {
                LOG(debug) << "Template " << _baseName << " \"" << val.second.data() << "\"";
                if (!val.second.empty()) {
                    SBase* base = new SBase(val.second.data());
                    if (!readSensorBase(*base, val.second, true)) {
                        LOG(warning) << "Template " << _baseName << " \"" << val.second.data() << "\" has bad values! Ignoring...";
                        delete base;
                    }
                }
198
199
200
            // Here we read and instantiate operators
            } else if (boost::iequals(val.first, _operatorName)) {
                LOG(debug) << _operatorName << " \"" << val.second.data() << "\"";
Alessio Netti's avatar
Alessio Netti committed
201
                if (!val.second.empty()) {
202
203
204
                    O_Ptr op = std::make_shared<Operator>(val.second.data());
                    if (readOperator(*op, val.second)) {
                        // If the operator must be duplicated for each compute unit, we copy-construct identical
Alessio Netti's avatar
Alessio Netti committed
205
                        // instances that have different unit IDs
206
207
208
                        unsigned numUnits = op->getUnits().size();
                        op->releaseUnits();
                        if(op->getDuplicate() && numUnits>1) {
Alessio Netti's avatar
Alessio Netti committed
209
                            for(unsigned int i=0; i < numUnits; i++) {
210
211
                                O_Ptr opCopy = std::make_shared<Operator>(*op);
                                opCopy->setUnitID(i);
Alessio Netti's avatar
Alessio Netti committed
212
                                //opCopy->setName(opCopy->getName() + "@" + op->getUnits()[i]->getName());
213
214
                                opCopy->collapseUnits();
                                storeOperator(opCopy);
Alessio Netti's avatar
Alessio Netti committed
215
216
                            }
                        } else
217
                            storeOperator(op);
Alessio Netti's avatar
Alessio Netti committed
218
                    } else {
219
                        LOG(warning) << _operatorName << " \"" << val.second.data() << "\" has bad values! Ignoring...";
Alessio Netti's avatar
Alessio Netti committed
220
221
                    }
                }
222
223
224
            } else if( !boost::iequals(val.first, "global") ) {
                LOG(error) << "\"" << val.first << "\": unknown construct!";
                return false;
Alessio Netti's avatar
Alessio Netti committed
225
226
227
228
229
230
            }
        }
        return true;
    }

    /**
Alessio Netti's avatar
Alessio Netti committed
231
    * @brief            Clears the plugin configuration
Alessio Netti's avatar
Alessio Netti committed
232
    *
233
    *                   This will stop any operators that have been created, destroy them and return the plugin to
Alessio Netti's avatar
Alessio Netti committed
234
    *                   its uninitialized state.
Alessio Netti's avatar
Alessio Netti committed
235
236
    *
    */
Alessio Netti's avatar
Alessio Netti committed
237
    void clearConfig() final {
238
239
240
        // Stop all operators
        for(auto op : _operators)
            op->stop();
Alessio Netti's avatar
Alessio Netti committed
241

242
243
244
        // Wait for all operators to finish
        for(auto op : _operators)
            op->wait();
Alessio Netti's avatar
Alessio Netti committed
245

246
247
248
        // First of all, delete all template operators and sensors
        for (auto to : _templateOperators)
            delete to.second;
Alessio Netti's avatar
Alessio Netti committed
249
250
        for (auto ts : _templateSensors)
            delete ts.second;
Alessio Netti's avatar
Alessio Netti committed
251

252
253
254
255
        // Clear all operators
        _operatorInterfaces.clear();
        _operators.clear();
        _templateOperators.clear();
Alessio Netti's avatar
Alessio Netti committed
256
257
        _templateSensors.clear();
        _templateProtoInputs.clear();
Alessio Netti's avatar
Alessio Netti committed
258
259
260
    }

    /**
261
    * @brief            Clear all instantiated operators and read the configuration again
Alessio Netti's avatar
Alessio Netti committed
262
    *
263
    *                   This will stop any operators that have been created, destroy them and finally create new ones
Alessio Netti's avatar
Alessio Netti committed
264
265
266
267
268
269
    *                   from a new configuration read pass.
    *
    * @return	        True if successful, false otherwise
    */
    bool reReadConfig() final {
        clearConfig();
Alessio Netti's avatar
Alessio Netti committed
270
271
272
273
274
275

        // Reading the configuration once again
        return readConfig(_cfgPath);
    }

    /**
276
    * @brief                   Return all instantiated operators
Alessio Netti's avatar
Alessio Netti committed
277
    *
278
    * @return	               Vector containing pointers to all operator interfaces of this plugin
Alessio Netti's avatar
Alessio Netti committed
279
    */
280
281
    std::vector<OperatorPtr>& getOperators() final {
        return _operatorInterfaces;
Alessio Netti's avatar
Alessio Netti committed
282
283
284
285
286
    }

protected:

    /**
287
    * @brief           Reads any derived operator attributes
Alessio Netti's avatar
Alessio Netti committed
288
    *
289
    *                  Pure virtual interface method, responsible for reading plugin-specific operator attributes.
Alessio Netti's avatar
Alessio Netti committed
290
    *
291
    * @param op		   The operator for which derived attributes must be set
Alessio Netti's avatar
Alessio Netti committed
292
293
    * @param config	   A Boost property (sub-)tree containing the config attributes
    */
294
    virtual void operatorAttributes(Operator& op, CFG_VAL config) = 0;
Alessio Netti's avatar
Alessio Netti committed
295
296
297
298
299
300
301
302
303
304
305

    /**
    * @brief           Reads any derived sensor attributes
    *
    *                  Pure virtual interface method, responsible for reading plugin-specific sensor attributes.
    *
    * @param s		   The sensor for which derived attributes must be set
    * @param config	   A Boost property (sub-)tree containing the config attributes
    */
    virtual void sensorBase(SBase& s, CFG_VAL config) = 0;

306
307
308
309
310
311
    /**
    * @brief           Performs additional checks on instantiated units
    *
    *                  Pure virtual interface method, responsible for performing user-specified checks on units.
    *
    * @param u		   The unit that has been created
Alessio Netti's avatar
Alessio Netti committed
312
    * @return          True if the unit is valid, False otherwise
313
    */
Alessio Netti's avatar
Alessio Netti committed
314
    virtual bool unit(UnitTemplate<SBase>& u) = 0;
315

Alessio Netti's avatar
Alessio Netti committed
316
317
318
319
320
321
322
323
324
    /**
    * @brief           Reads additional global attributes on top of the default ones
    *
    *                  Virtual interface method, responsible for reading plugin-specific global attributes.
    *
    * @param config	   A Boost property (sub-)tree containing the global values
    */
    virtual void global(CFG_VAL config) {}

325
326
327
328
329
330
331
332
    /**
    * @brief            Print information about configurable configurator attributes.
    *
    *                   This method is virtual and can be overridden on a per-plugin basis.
    *
    * @param ll         Severity level to log with
    */
    virtual void printConfiguratorConfig(LOG_LEVEL ll) {
Alessio Netti's avatar
Alessio Netti committed
333
        LOG_VAR(ll) << "          No other plugin-specific general parameters defined";
334
    }
Alessio Netti's avatar
Alessio Netti committed
335
336

    /**
337
    * @brief                   Store an operator in the internal vectors
Alessio Netti's avatar
Alessio Netti committed
338
    *
339
    * @param op                Shared pointer to a OperatorInterface object
Alessio Netti's avatar
Alessio Netti committed
340
    */
341
342
343
    void storeOperator(O_Ptr op) {
        _operators.push_back(op);
        _operatorInterfaces.push_back(op);
Alessio Netti's avatar
Alessio Netti committed
344
345
346
    }

    /**
347
    * @brief                   Reads a single operator configuration block
Alessio Netti's avatar
Alessio Netti committed
348
349
    *
    *                          Non-virtual interface method for class-internal use only. This will configure an
350
351
    *                          Operator object, and instantiate all units associated to it. All derived attributes
    *                          and additional configuration must be performed in the operatorAttributes() virtual method.
Alessio Netti's avatar
Alessio Netti committed
352
    *
353
354
    * @param op	               The operator that must be configured
    * @param config	           A boost property (sub-)tree containing the operator values
Alessio Netti's avatar
Alessio Netti committed
355
356
    * @return	               True if successful, false otherwise
    */
357
    bool readOperator(Operator& op, CFG_VAL config) {
Alessio Netti's avatar
Alessio Netti committed
358
        // Vectors containing "prototype" inputs and outputs to be modified with the actual compute units
359
        std::vector<shared_ptr<SBase>> protoInputs, protoOutputs, protoGlobalOutputs;
360
        inputMode_t inputMode = SELECTIVE;
361
        // Check for the existence of a template definition to initialize the operator
Alessio Netti's avatar
Alessio Netti committed
362
363
364
        boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
        if(def) {
            LOG(debug) << "  Using \"" << def.get().data() << "\" as default.";
365
366
367
368
369
370
            auto it = _templateOperators.find(def.get().data());
            if(it != _templateOperators.end()) {
                op = *(it->second);
                op.setName(config.data());
                op.setTemplate(false);
                // Operators instantiated from templates DO NOT share the same units and output sensors. 
Alessio Netti's avatar
Alessio Netti committed
371
                // This would lead to too much naming ambiguity and is generally just not needed
372
                op.clearUnits();
Alessio Netti's avatar
Alessio Netti committed
373
374
375
                // The input sensors defined in the template are on the other hand preserved; this is meant as a 
                // workaround to shorten certain configurations 
                protoInputs = _templateProtoInputs[def.get().data()];
Alessio Netti's avatar
Alessio Netti committed
376
            } else {
377
                LOG(warning) << "Template " << _operatorName << "\"" << def.get().data() << "\" not found! Using standard values.";
Alessio Netti's avatar
Alessio Netti committed
378
379
            }
        }
380
        // Reading attributes associated to OperatorInterface
Alessio Netti's avatar
Alessio Netti committed
381
382
383
        BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
        {
            if (boost::iequals(val.first, "interval")) {
384
                op.setInterval(stoull(val.second.data()));
385
386
	    } else if (boost::iequals(val.first, "queueSize")) {
		op.setQueueSize(stoull(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
387
            } else if (boost::iequals(val.first, "minValues")) {
388
                op.setMinValues(stoull(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
389
            } else if (boost::iequals(val.first, "mqttPart")) {
390
                op.setMqttPart(val.second.data());
391
392
            } else if (boost::iequals(val.first, "enforceTopics")) {
                op.setEnforceTopics(to_bool(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
393
            } else if (boost::iequals(val.first, "sync")) {
394
                op.setSync(to_bool(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
395
396
            } else if (boost::iequals(val.first, "disabled")) {
                op.setDisabled(to_bool(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
397
            } else if (boost::iequals(val.first, "delay")) {
398
                op.setDelayInterval(stoull(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
399
            } else if (boost::iequals(val.first, "duplicate")) {
400
                op.setDuplicate(to_bool(val.second.data()));
401
            } else if (boost::iequals(val.first, "relaxed")) {
402
                op.setRelaxed(to_bool(val.second.data()));
403
            } else if (boost::iequals(val.first, "unitCacheLimit")) {
404
                op.setUnitCacheLimit(stoull(val.second.data()));
Alessio Netti's avatar
Alessio Netti committed
405
            } else if (boost::iequals(val.first, "streaming")) {
406
                op.setStreaming(to_bool(val.second.data()));
407
408
            } else if (isInputBlock(val.first) || isOutputBlock(val.first) || isGlobalOutputBlock(val.first)) {
                // Instantiating all sensors contained within the "unitInput", "unitOutput" or "globalOutput" block
409
                BOOST_FOREACH(boost::property_tree::iptree::value_type &valInner, val.second)
Alessio Netti's avatar
Alessio Netti committed
410
411
                {
                    if (boost::iequals(valInner.first, _baseName)) {
412
                        LOG(debug) << "    I/O " << _baseName << " " << valInner.second.data();
413
                        SBase sensor(valInner.second.data());
Alessio Netti's avatar
Alessio Netti committed
414
                        if (readSensorBase(sensor, valInner.second, false)) {
Alessio Netti's avatar
Alessio Netti committed
415
                            shared_ptr<SBase> sensorPtr = make_shared<SBase>(sensor);
416
417
418
419
420
421
                            if(isInputBlock(val.first))
                                protoInputs.push_back(sensorPtr);
                            else if(isOutputBlock(val.first))
                                protoOutputs.push_back(sensorPtr);
                            else
                                protoGlobalOutputs.push_back(sensorPtr);
Alessio Netti's avatar
Alessio Netti committed
422
                        } else {
423
                            LOG(warning) << "I/O " << _baseName << " " << op.getName() << "::" << sensor.getName() << " could not be read! Omitting";
Alessio Netti's avatar
Alessio Netti committed
424
                        }
425
426
427
                    // An "all" or "all-recursive" statement in the input block causes all sensors related to the specific
                    // unit to be picked
                    } else if (isInputBlock(val.first) && (boost::iequals(valInner.first, ALL_CLAUSE) || boost::iequals(valInner.first, ALL_REC_CLAUSE))) {
428
429
430
431
                        inputMode = boost::iequals(valInner.first, ALL_CLAUSE) ? ALL : ALL_RECURSIVE;
                    } else {
                        LOG(error) << "\"" << valInner.first << "\": unknown I/O construct!";
                        return false;
Alessio Netti's avatar
Alessio Netti committed
432
433
434
435
                    }
                }
            }
        }
436
        
Alessio Netti's avatar
Alessio Netti committed
437
        // Reading all derived attributes, if any
438
        operatorAttributes(op, config);
439
        // Instantiating units
440
441
        if(!op.getTemplate()) {
            op.setMqttPart(MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(op.getMqttPart()));
442
            return readUnits(op, protoInputs, protoOutputs, protoGlobalOutputs, inputMode);
Alessio Netti's avatar
Alessio Netti committed
443
        } else {
444
445
            // If the operator is a template, we add it to the related map
            auto ret = _templateOperators.insert(std::pair<std::string, Operator*>(op.getName(), &op));
Alessio Netti's avatar
Alessio Netti committed
446
            if(!ret.second) {
447
                LOG(warning) << "Template " << _operatorName << " " << op.getName() << " already exists! Omitting...";
Alessio Netti's avatar
Alessio Netti committed
448
449
                return false;
            }
450
            _templateProtoInputs.insert(std::pair<std::string, std::vector<shared_ptr<SBase>>>(op.getName(), protoInputs));
Alessio Netti's avatar
Alessio Netti committed
451
        }
452
        
Alessio Netti's avatar
Alessio Netti committed
453
454
455
456
457
458
459
460
461
462
463
464
        return true;
    }

    /**
    * @brief                   Reads a single sensor configuration block
    *
    *                          Non-virtual interface method for class-internal use only. This will configure a
    *                          sensor object. All derived attributes and additional configuration must be performed
    *                          in the sensorBase() virtual method.
    *
    * @param sBase	           The sensor that must be configured
    * @param config	           A boost property (sub-)tree containing the sensor values
Micha Müller's avatar
Micha Müller committed
465
    * @param isTemplate        Do we read a template sensor?
Alessio Netti's avatar
Alessio Netti committed
466
467
    * @return	               True if successful, false otherwise
    */
Alessio Netti's avatar
Alessio Netti committed
468
    bool readSensorBase(SBase& sBase, CFG_VAL config, bool isTemplate=false) {
Alessio Netti's avatar
Alessio Netti committed
469
        sBase.setCacheInterval(_cacheInterval);
Alessio Netti's avatar
Alessio Netti committed
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
        if (!isTemplate) {
            // Copying parameters from the template (if defined)
            boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
            if(def) {
                LOG(debug) << "  Using \"" << def.get().data() << "\" as default.";
                auto it = _templateSensors.find(def.get().data());
                if(it != _templateSensors.end()) {
                    sBase = *(it->second);
                    sBase.setName(config.data());
                } else {
                    LOG(warning) << "Template " << _baseName << "\" " << def.get().data() << "\" not found! Using standard values.";
                }
            }
        }
        // Reading other sensor parameters
Alessio Netti's avatar
Alessio Netti committed
485
486
487
        BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
            if (boost::iequals(val.first, "mqttsuffix")) {
                sBase.setMqtt(val.second.data());
488
            } else if (boost::iequals(val.first, "skipConstVal")) {
Alessio Netti's avatar
Alessio Netti committed
489
                sBase.setSkipConstVal(to_bool(val.second.data()));
490
            } else if (boost::iequals(val.first, "delta")) {
Alessio Netti's avatar
Alessio Netti committed
491
                sBase.setDelta(to_bool(val.second.data()));
492
            } else if (boost::iequals(val.first, "subSampling")) {
Alessio Netti's avatar
Alessio Netti committed
493
                sBase.setSubsampling(std::stoi(val.second.data()));
494
495
            } else if (boost::iequals(val.first, "publish")) {
                sBase.setPublish(to_bool(val.second.data()));
496
497
            } else if (boost::iequals(val.first, "metadata")) {
                SensorMetadata sm;
Alessio Netti's avatar
Alessio Netti committed
498
499
                if(sBase.getMetadata())
                    sm = *sBase.getMetadata();
500
501
502
503
504
505
                try {
                    sm.parsePTREE(val.second);
                    sBase.setMetadata(sm);
                } catch(const std::exception& e) {
                    LOG(error) << "  Metadata parsing failed for sensor " << sBase.getName() << "!" << std::endl;
                }
Alessio Netti's avatar
Alessio Netti committed
506
507
508
            }
        }
        sensorBase(sBase, config);
Alessio Netti's avatar
Alessio Netti committed
509
510
511
512
513
514
515
516
517
        
        if(isTemplate) {
            auto ret = _templateSensors.insert(std::pair<std::string, SBase*>(sBase.getName(), &sBase));
            if(!ret.second) {
                LOG(warning) << "Template " << _baseName << " " << sBase.getName() << " already exists! Omitting...";
                return false;
            }
        }
        
Alessio Netti's avatar
Alessio Netti committed
518
519
        return true;
    }
520
521
    
    /**
522
     * @brief               Instantiates all necessary units for a single operator
523
     * 
524
     *                      This method will create and assign all unit objects for a single operator, given a set
525
     *                      of prototype input sensors, prototype output sensors and an input mode. This method is
526
527
     *                      virtual such as to allow for flexibility in case specific operators require different
     *                      assignment policies (such as job operators).
528
     * 
529
530
531
532
533
534
     * @param op                  The operator whose units must be created
     * @param protoInputs         The vector of prototype input sensors
     * @param protoOutputs        The vector of prototype output sensors
     * @param protoGlobalOutputs  The vector of prototype global output sensors, if any
     * @param inputMode           Input mode to be used (selective, all or all_recursive)
     * @return                    True if successful, false otherwise
535
     */
536
537
    virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs,
            std::vector<shared_ptr<SBase>>& protoGlobalOutputs, inputMode_t inputMode) {
538
        vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
Alessio Netti's avatar
Alessio Netti committed
539
540
        if(protoOutputs.empty())
            LOG(debug) << "    No output specified, generating sink unit.";
541
542
543
        // If we employ a hierarchical unit (which will be the root unit) we disable duplication 
        if(!protoGlobalOutputs.empty())
            op.setDuplicate(false);
Alessio Netti's avatar
Alessio Netti committed
544
        
545
        try {
546
547
            units = _unitGen.generateAutoUnit(SensorNavigator::rootKey, std::list<std::string>(), protoGlobalOutputs, protoInputs, 
                    protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), op.getEnforceTopics(), op.getRelaxed());
548
549
        }
        catch (const std::exception &e) {
550
            LOG(error) << _operatorName << " " << op.getName() << ": Error when creating units: " << e.what();
551
552
553
554
555
            delete units;
            return false;
        }

        for (auto &u: *units) {
556
557
558
            if (op.getStreaming()) {
                if(!constructSensorTopics(*u, op)) {
                    op.clearUnits();
559
560
561
                    delete units;
                    return false;
                }
562
563
                if (!unit(*u)) {
                    LOG(error) << "    Unit " << u->getName() << " did not pass the final check!";
564
                    op.clearUnits();
565
566
567
568
                    delete units;
                    return false;
                } else {
                    LOG(debug) << "    Unit " << u->getName() << " generated.";
569
                    op.addUnit(u);
570
571
572
                }
            } else {
                if (unit(*u)) {
573
                    op.addToUnitCache(u);
574
575
576
                    LOG(debug) << "    Template unit for on-demand operation " + u->getName() + " generated.";
                } else {
                    LOG(error) << "    Template unit " << u->getName() << " did not pass the final check!";
577
                    op.clearUnits();
578
579
580
581
582
583
584
585
                    delete units;
                    return false;
                }
            }
        }
        delete units;
        return true;
    }
Alessio Netti's avatar
Alessio Netti committed
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607

    /**
    * @brief                   Reads the global configuration block
    *
    *                          Non-virtual interface method for class-internal use only. This will read the "global"
    *                          configuration block in a file, overwriting any default settings on a per-plugin base.
    *                          Any derived or additional attributes must be added through the global() virtual method.
    *
    * @param config	           A Boost property (sub-)tree containing the global block
    * @return	               True if successful, false otherwise
    */
    bool readGlobal(CFG_VAL config) {
        boost::optional<boost::property_tree::iptree&> globalVals = config.get_child_optional("global");
        if (globalVals) {
            BOOST_FOREACH(boost::property_tree::iptree::value_type &global, config.get_child("global")) {
                if (boost::iequals(global.first, "mqttprefix")) {
                    _mqttPrefix = global.second.data();
                    LOG(debug) << "  Using own MQTT-Prefix " << _mqttPrefix;
                } else if (boost::iequals(global.first, "cacheInterval")) {
                    _cacheInterval = stoul(global.second.data());
                    LOG(debug) << "  Using own caching interval " << _cacheInterval << " [s]";
                    _cacheInterval *= 1000;
Alessio Netti's avatar
Alessio Netti committed
608
                } 
Alessio Netti's avatar
Alessio Netti committed
609
610
611
612
613
614
            }
            global(config.get_child("global"));
        }
        return true;
    }

Alessio Netti's avatar
Alessio Netti committed
615
    /**
Alessio Netti's avatar
Alessio Netti committed
616
    * @brief                   Adjusts the topics and names of the sensors
Alessio Netti's avatar
Alessio Netti committed
617
    *
Alessio Netti's avatar
Alessio Netti committed
618
    *                          Names are set according to the corresponding topic.
Alessio Netti's avatar
Alessio Netti committed
619
620
    *
    * @return                  true if successful, false otherwise
Alessio Netti's avatar
Alessio Netti committed
621
    */
622
    bool constructSensorTopics(UnitTemplate<SBase>& u, Operator& op) {
Alessio Netti's avatar
Alessio Netti committed
623
        // Performing name construction
624
        for(auto& s: u.getOutputs()) {
625
            adjustSensor(s, op, u);
626
        }
627
        for(auto& subUnit: u.getSubUnits())
628
            for(auto& s : subUnit->getOutputs()) {
629
                adjustSensor(s, op, u);
630
            }
Alessio Netti's avatar
Alessio Netti committed
631
        return true;
Alessio Netti's avatar
Alessio Netti committed
632
    }
633
634
635
636
637
638
639
640
641

    /**
    * @brief                   Adjusts a single sensor
    *
    */
    void adjustSensor(std::shared_ptr<SBase> s, Operator& op, UnitTemplate<SBase>& u) {
        s->setName(s->getMqtt());
        SensorMetadata* sm = s->getMetadata();
        if(sm) {
642
            if(sm->getIsOperation() && *sm->getIsOperation()) {
643
644
645
646
647
648
649
                s->clearMetadata();
                if(u.getInputs().size() != 1) {
                    LOG(error) << _operatorName << " " << op.getName() << ": Ambiguous operation field for sensor " << s->getName();
                    return;
                }
                // Replacing the metadata to publish the sensor as an operation of its corresponding input
                SensorMetadata smNew;
650
651
                smNew.setPublicName(u.getInputs()[0]->getMqtt());
                smNew.setPattern(u.getInputs()[0]->getMqtt());
652
653
654
                smNew.addOperation(s->getMqtt());
                s->setMetadata(smNew);
            } else {
655
656
657
658
659
                sm->setPublicName(s->getMqtt());
                sm->setPattern(s->getMqtt());
                sm->setIsVirtual(false);
                if (!sm->getInterval())
                    sm->setInterval((uint64_t)op.getInterval() * 1000000);
660
661
662
            }
        }
    }
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
    
    /**
    * @brief                   Returns true if the input string describes an input block
    * 
    * @param s                 The string to be checked 
    * @return                  True if s is a input block, false otherwise
    */
    bool isInputBlock(const std::string& s ) {
        return boost::iequals(s, INPUT_BLOCK) || boost::iequals(s, INPUT_BLOCK_LEGACY);
    }

    /**
    * @brief                   Returns true if the input string describes an output block
    * 
    * @param s                 The string to be checked 
    * @return                  True if s is a output block, false otherwise
    */
    bool isOutputBlock(const std::string& s ) {
        return boost::iequals(s, OUTPUT_BLOCK) || boost::iequals(s, OUTPUT_BLOCK_LEGACY);
    }

    /**
    * @brief                   Returns true if the input string describes a global output block
    * 
    * @param s                 The string to be checked 
    * @return                  True if s is a global output block, false otherwise
    */
    bool isGlobalOutputBlock(const std::string& s ) {
        return boost::iequals(s, GLOBAL_OUTPUT_BLOCK);
    }
Alessio Netti's avatar
Alessio Netti committed
693

Alessio Netti's avatar
Alessio Netti committed
694
    // Instance of a QueryEngine object
695
    QueryEngine&     _queryEngine;
Alessio Netti's avatar
Alessio Netti committed
696
697
    // UnitGenerator object used to create units
    UnitGenerator<SBase>    _unitGen;
Alessio Netti's avatar
Alessio Netti committed
698

699
700
    // Keyword used to identify operator blocks in config files
    std::string		_operatorName;
Alessio Netti's avatar
Alessio Netti committed
701
702
703
704
705
706
707
708
709
    // Keyword used to identify sensors in config files
    std::string     _baseName;

    // Path of the configuration file that must be used
    std::string 	_cfgPath;
    // Default MQTT prefix to be used when creating output sensors
    std::string		_mqttPrefix;
    // Interval in seconds for the cache of each sensor
    unsigned int	_cacheInterval;
710
711
712
713
714
715
    // The vector of operators, in the form of pointers to OperatorInterface objects
    std::vector<OperatorPtr> 	_operatorInterfaces;
    // Like the above, but containing the operators in their actual types
    std::vector<O_Ptr>		_operators;
    // Map of the template operators that were defined in the config file - used for easy retrieval and instantiation
    std::map<std::string, Operator*> _templateOperators;
Alessio Netti's avatar
Alessio Netti committed
716
717
    // Map of the template sensors that were defined
    std::map<std::string, SBase*> _templateSensors;
718
    // Map of the protoinputs belonging to template operators
Alessio Netti's avatar
Alessio Netti committed
719
    std::map<std::string, std::vector<shared_ptr<SBase>>> _templateProtoInputs;
Alessio Netti's avatar
Alessio Netti committed
720
721
};

722
#endif //PROJECT_OPERATORCONFIGURATORTEMPLATE_H