OperatorTemplate.h 19.2 KB
Newer Older
1
//================================================================================
2
// Name        : OperatorTemplate.h
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Template implementing features needed by Operators.
7
8
9
10
11
12
13
14
15
16
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28
29
#ifndef PROJECT_OPERATORTEMPLATE_H
#define PROJECT_OPERATORTEMPLATE_H
Alessio Netti's avatar
Alessio Netti committed
30

31
#include "OperatorInterface.h"
Alessio Netti's avatar
Alessio Netti committed
32
#include "UnitTemplate.h"
Alessio Netti's avatar
Alessio Netti committed
33
#include "UnitGenerator.h"
Alessio Netti's avatar
Alessio Netti committed
34
35
36
37
38
39
40
#include "timestamp.h"
#include "QueryEngine.h"

#include <vector>
#include <map>
#include <memory>

Alessio Netti's avatar
Alessio Netti committed
41
using namespace std;
Alessio Netti's avatar
Alessio Netti committed
42
43

/**
44
45
 * @brief Template that implements features needed by Operators and complying
 *        to OperatorInterface.
Alessio Netti's avatar
Alessio Netti committed
46
 *
47
48
49
50
 * @details The template accepts any class derived from SensorBase, allowing
 *          users to add their own attributes to input and output sensors. This
 *          template also employs UnitTemplates, which are instantiated
 *          according to the input sensor type.
Alessio Netti's avatar
Alessio Netti committed
51
 *
52
 * @ingroup operator
Alessio Netti's avatar
Alessio Netti committed
53
54
 */
template <typename S>
55
class OperatorTemplate : public OperatorInterface {
Alessio Netti's avatar
Alessio Netti committed
56
    // The template shall only be instantiated for classes which derive from SensorBase
Alessio Netti's avatar
Alessio Netti committed
57
    static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
Alessio Netti's avatar
Alessio Netti committed
58
59
60

protected:
    // For readability
Alessio Netti's avatar
Alessio Netti committed
61
62
    using S_Ptr = shared_ptr<S>;
    using U_Ptr = shared_ptr< UnitTemplate<S> >;
Alessio Netti's avatar
Alessio Netti committed
63
64

public:
65
    
Alessio Netti's avatar
Alessio Netti committed
66
67
68
    /**
    * @brief            Class constructor
    *
69
    * @param name       Name of the operator
Alessio Netti's avatar
Alessio Netti committed
70
    */
71
72
    OperatorTemplate(const string name) :
            OperatorInterface(name),
73
            _unitCache(nullptr),
Alessio Netti's avatar
Alessio Netti committed
74
            _insertionLUT(nullptr),
75
            _queryEngine(QueryEngine::getInstance()) {}
Alessio Netti's avatar
Alessio Netti committed
76
77
78
79
80
81
82
83
84

    /**
    * @brief            Copy constructor
    *
    *                   Achtung! The vectors of units will be copied in shallow fashion, and the underlying output
    *                   sensors across different copied units will point to the same location. This is intended,
    *                   as "duplicated" units share the same view of the data analytics module.
    *
    */
85
86
    OperatorTemplate(const OperatorTemplate& other) :
            OperatorInterface(other),
87
88
            _unitCache(nullptr),
            _insertionLUT(nullptr),
89
            _queryEngine(QueryEngine::getInstance()) {
Alessio Netti's avatar
Alessio Netti committed
90
91
92
93
94
95
96
97
98
99
100
101
102

        for(auto u : other._units) {
            _units.push_back(u);
            _baseUnits.push_back(u);
        }
    }

    /**
    * @brief            Assignment operator
    *
    *                   Here, the same considerations done for the copy constructor apply.
    *
    */
103
104
    OperatorTemplate& operator=(const OperatorTemplate& other) {
        OperatorInterface::operator=(other);
Alessio Netti's avatar
Alessio Netti committed
105
106
107
108
109
110
111
112
113
114
115
116
117
        _units.clear();

        for(auto u : other._units) {
            _units.push_back(u);
            _baseUnits.push_back(u);
        }

        return *this;
    }

    /**
    * @brief            Class destructor
    */
118
    virtual ~OperatorTemplate() {
Alessio Netti's avatar
Alessio Netti committed
119
120
        _units.clear();
        _baseUnits.clear();
Alessio Netti's avatar
Alessio Netti committed
121

122
123
124
        if(_unitCache) {
            _unitCache->clear();
            delete _unitCache;
Alessio Netti's avatar
Alessio Netti committed
125
126
127
128
129
        }
        if(_insertionLUT) {
            _insertionLUT->clear();
            delete _insertionLUT;
        }
Alessio Netti's avatar
Alessio Netti committed
130
131
    }

132
    /**
133
    * @brief            Prints the current operator configuration
134
135
136
137
    *
    * @param ll         Logging level at which the configuration is printed
    */
    virtual void printConfig(LOG_LEVEL ll) override {
Alessio Netti's avatar
Alessio Netti committed
138
        if(_mqttPart!="")
139
            LOG_VAR(ll) << "            MQTT prefix:     " << _mqttPart;
140
141
142
143
144
        LOG_VAR(ll) << "            Sync readings:   " << (_sync ? "enabled" : "disabled");
        LOG_VAR(ll) << "            Streaming mode:  " << (_streaming ? "enabled" : "disabled");
        LOG_VAR(ll) << "            Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
        LOG_VAR(ll) << "            MinValues:       " << _minValues;
        LOG_VAR(ll) << "            Interval:        " << _interval;
145
        LOG_VAR(ll) << "            Unit Cache Size: " << _unitCacheLimit;
146
147
148
149
150
151
152
153
154
        LOG_VAR(ll) << "            Start delay:     " << _delayInterval;
        if(!_units.empty()) {
            LOG_VAR(ll) << "            Units:";
            for (auto u : _units)
                u->printConfig(ll, lg);
        } else
            LOG_VAR(ll) << "            Units:           none";
    }

Alessio Netti's avatar
Alessio Netti committed
155
    /**
156
    * @brief              Adds an unit to this operator
Alessio Netti's avatar
Alessio Netti committed
157
158
159
160
    *
    * @param u            Shared pointer to a UnitInterface object
    */
    virtual void addUnit(UnitPtr u)  override {
161
        // Since the OperatorInterface method accepts UnitInterface objects, we must cast the input argument
Alessio Netti's avatar
Alessio Netti committed
162
        // to its actual type, which is UnitTemplate<S>
Alessio Netti's avatar
Alessio Netti committed
163
        if (U_Ptr dUnit = dynamic_pointer_cast< UnitTemplate<S> >(u)) {
Alessio Netti's avatar
Alessio Netti committed
164
165
166
167
            _units.push_back(dUnit);
            _baseUnits.push_back(u);
        }
        else
168
            LOG(error) << "Operator " << _name << ": Type mismatch when storing output sensor! Sensor omitted";
Alessio Netti's avatar
Alessio Netti committed
169
170
171
    }

    /**
172
    * @brief              Returns the units of this operator
Alessio Netti's avatar
Alessio Netti committed
173
174
    *
    *                     The units returned by this method are of the UnitInterface type. The actual units, in their
175
    *                     derived type, are used internally. If the operator uses a lock to regulate unit access,
176
    *                     this will be acquired and must be released through the releaseUnits() method.
177
    *                     
178
    * @return             The vector of UnitInterface objects of this operator
Alessio Netti's avatar
Alessio Netti committed
179
    */
180
181
182
183
184
185
186
    virtual vector<UnitPtr>& getUnits() override	{ return _baseUnits; }
    
    /**
     * @brief             Releases the internal lock for unit access
     * 
     *                    This method is meant to regulate concurrent access to units which are generated dynamically.
     *                    In this specific implementation, the method does not perform anything as units in standard
187
     *                    operators are static and never modified.
188
189
190
     * 
     */
    virtual void releaseUnits() override {}
Alessio Netti's avatar
Alessio Netti committed
191
192

    /**
193
    * @brief              Clears all the units contained in this operator
Alessio Netti's avatar
Alessio Netti committed
194
195
196
197
198
    */
    virtual void clearUnits() override { _units.clear(); _baseUnits.clear(); _unitID = -1; }


    /**
199
    * @brief              Initializes this operator
Alessio Netti's avatar
Alessio Netti committed
200
    *
201
    *                     This method performs additional initialization compared to OperatorInterface. Specifically,
Alessio Netti's avatar
Alessio Netti committed
202
203
204
205
    *                     all output sensors in units are initialized, and their caches instantiated.
    *
    * @param io           Boost ASIO service to be used
    */
206
    virtual void init(boost::asio::io_service& io) final override {
207
        OperatorInterface::init(io);
Alessio Netti's avatar
Alessio Netti committed
208

209
210
        for(const auto u : _units)
            u->init(_cacheSize);
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234

        this->execOnInit();
    }

    /**
    * @brief              Waits for the operator to complete its tasks
    *
    *                     Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0).
    */
    virtual void wait() final override {
        uint64_t sleepms=10, i=0;
        uint64_t timeout = _interval<10000 ? 30000 : _interval*3;

        while(sleepms*i++ < timeout) {
            if (_pendingTasks)
                std::this_thread::sleep_for(std::chrono::milliseconds(sleepms));
            else {
                this->execOnStop();
                LOG(info) << "Operator " << _name << " stopped.";
                return;
            }
        }

        LOG(warning) << "Operator " << _name << " will not finish! Skipping it";
Alessio Netti's avatar
Alessio Netti committed
235
236
237
    }

    /**
238
    * @brief              Starts this operator
Alessio Netti's avatar
Alessio Netti committed
239
    */
240
    virtual void start() final override {
Alessio Netti's avatar
Alessio Netti committed
241
        if(_keepRunning) {
242
            LOG(info) << "Operator " << _name << " already running.";
Alessio Netti's avatar
Alessio Netti committed
243
            return;
Alessio Netti's avatar
Alessio Netti committed
244
        } else if(!_streaming) {
245
            LOG(error) << "On-demand operator " << _name << " cannot be started.";
Alessio Netti's avatar
Alessio Netti committed
246
            return;
Alessio Netti's avatar
Alessio Netti committed
247
248
        }

249
250
251
252
253
        if (!this->execOnStart()) {
            LOG(error) << "Operator " << _name << ": startup failed.";
            return;
        }

Alessio Netti's avatar
Alessio Netti committed
254
255
        _keepRunning = 1;
        _pendingTasks++;
256
        _timer->async_wait(bind(&OperatorTemplate<S>::computeAsync, this));
Alessio Netti's avatar
Alessio Netti committed
257
        if(_delayInterval == 0)
258
            LOG(info) << "Operator " << _name << " started.";
Alessio Netti's avatar
Alessio Netti committed
259
        else
260
            LOG(info) << "Operator " << _name << " will be started after a delay of " << _delayInterval << " seconds.";
Alessio Netti's avatar
Alessio Netti committed
261
262
263
    }

    /**
264
    * @brief              Stops this operator
Alessio Netti's avatar
Alessio Netti committed
265
    */
266
267
268
    virtual void stop() final override {
        if(_keepRunning == 0 || !_streaming) {
            LOG(debug) << "Operator " << _name << " already stopped.";
Alessio Netti's avatar
Alessio Netti committed
269
            return;
Alessio Netti's avatar
Alessio Netti committed
270
271
272
        }

        _keepRunning = 0;
273
274
        //cancel any outstanding readAsync()
        _timer->cancel();
Alessio Netti's avatar
Alessio Netti committed
275
276
    }

Alessio Netti's avatar
Alessio Netti committed
277
278
279
280
281
282
283
284
285
    /**
    * @brief              Perform a REST-triggered PUT action
    *
    *                     This is a dummy implementation that can be overridden in user plugins. Any thrown
    *                     exceptions will be reported in the response string.
    *
    * @param action       Name of the action to be performed
    * @param queries      Vector of queries (key-value pairs)
    *
Alessio Netti's avatar
Alessio Netti committed
286
    * @return             Response to the request as a <response, data> pair
Alessio Netti's avatar
Alessio Netti committed
287
    */
288
    virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) override {
Alessio Netti's avatar
Alessio Netti committed
289
290
291
292
293
294
295
        throw invalid_argument("Unknown plugin action " + action + " requested!");
    }

    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
296
    *                     perform data analytics queries on the operator, which must have the _streaming attribute set
Alessio Netti's avatar
Alessio Netti committed
297
298
299
300
301
302
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Unit name for which the query must be performed
    * @return             a map<string, reading_t> containing the output of the query
    */
Alessio Netti's avatar
Alessio Netti committed
303
    virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
Alessio Netti's avatar
Alessio Netti committed
304
305
        map<string, reading_t> outMap;
        if( !_streaming ) {
306
307
            shared_ptr<SensorNavigator> navi = _queryEngine.getNavigator();
            UnitGenerator<S> unitGen(navi);
308
            // We check whether the input node belongs to this operator's unit domain
309
            if(!_unitCache)
310
                throw std::runtime_error("Initialization error in operator " + _name + "!");
Alessio Netti's avatar
Alessio Netti committed
311

312
            // Getting exclusive access to the operator
Alessio Netti's avatar
Alessio Netti committed
313
314
            while( _onDemandLock.exchange(true) ) {}

Alessio Netti's avatar
Alessio Netti committed
315
316
317
318
            // If the ondemand template unit refers to root it means it has been completely resolved already,
            //  and therefore we can use such unit without doing any resolution
            try {
                U_Ptr tempUnit = nullptr;
319
                if (_unitCache->count(node)) {
320
                    LOG(debug) << "Operator " << _name << ": cache hit for unit " << node << ".";
321
                    tempUnit = _unitCache->at(node);
Alessio Netti's avatar
Alessio Netti committed
322
                } else {
323
                    if (!_unitCache->count(SensorNavigator::templateKey))
324
325
                        throw std::runtime_error("No template unit in operator " + _name + "!");
                    LOG(debug) << "Operator " << _name << ": cache miss for unit " << node << ".";
326
                    U_Ptr uTemplate = _unitCache->at(SensorNavigator::templateKey);
327
                    tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), _mqttPart, _relaxed);
328
                    addToUnitCache(tempUnit);
Alessio Netti's avatar
Alessio Netti committed
329
330
331
                }

                // Initializing sensors if necessary
332
                tempUnit->init(_cacheSize);
Alessio Netti's avatar
Alessio Netti committed
333

334
335
                compute(tempUnit);
                for (const auto &o : tempUnit->getOutputs()) {
Alessio Netti's avatar
Alessio Netti committed
336
337
338
                    outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                    o->clearReadingQueue();
                }
339
                
Alessio Netti's avatar
Alessio Netti committed
340
341
342
343
            } catch(const exception& e) {
                _onDemandLock.store(false);
                throw;
            }
Alessio Netti's avatar
Alessio Netti committed
344
345
346
347

            _onDemandLock.store(false);
        } else if( _keepRunning ) {
            bool found = false;
348
            // We iterate over _baseUnits and not _units because it only contains the units this operator is working on
349
            for(const auto& u : _baseUnits)
Alessio Netti's avatar
Alessio Netti committed
350
351
                if(u->getName() == node) {
                    found = true;
352
                    for(const auto& o : u->getBaseOutputs())
Alessio Netti's avatar
Alessio Netti committed
353
354
355
356
357
358
                        outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                }

            if(!found)
                throw std::domain_error("Node " + node + " does not belong to the domain of " + _name + "!");
        } else
359
            throw std::runtime_error("Operator " + _name + ": not available for on-demand query!");
Alessio Netti's avatar
Alessio Netti committed
360
        return outMap;
Alessio Netti's avatar
Alessio Netti committed
361
362
    }

Alessio Netti's avatar
Alessio Netti committed
363
    /**
364
    * @brief              Adds an unit to the internal cache of units
Alessio Netti's avatar
Alessio Netti committed
365
    *
366
    *                     The cache is used to speed up response times to queries of on-demand operators, and reduce
Alessio Netti's avatar
Alessio Netti committed
367
368
369
    *                     overall overhead. The cache has a limited size: once this size is reached, at every insertion
    *                     the oldest entry in the cache is removed.
    *
370
    * @param unit         Shared pointer to the Unit object to be added to the cache
Alessio Netti's avatar
Alessio Netti committed
371
    */
372
373
374
    void addToUnitCache(U_Ptr unit) {
        if(!_unitCache) {
            _unitCache = new map<string, U_Ptr>();
Alessio Netti's avatar
Alessio Netti committed
375
376
377
            _insertionLUT = new map<uint64_t, U_Ptr>();
        }

378
        if(_unitCache->size() >= _unitCacheLimit) {
Alessio Netti's avatar
Alessio Netti committed
379
            U_Ptr oldest = _insertionLUT->begin()->second;
380
            _unitCache->erase(oldest->getName());
Alessio Netti's avatar
Alessio Netti committed
381
        }
382
        _unitCache->insert(make_pair(unit->getName(), unit));
Alessio Netti's avatar
Alessio Netti committed
383
384
385
386
387
388
        // The template unit must never be deleted, even if the cache is full; therefore, we omit its entry from
        // the insertion time LUT, so that it is never picked for deletion
        if(unit->getName() != SensorNavigator::templateKey)
            _insertionLUT->insert(make_pair(getTimestamp(), unit));
    }

389
390
391
    /**
    * @brief              Clears the internal baseUnits vector and only preserves the currently active unit
    *
392
    *                     This method is used for duplicated operators that share the same unit vector, with different
393
394
    *                     unit IDs. In order to expose distinct units to the outside, this method allows to keep
    *                     in the internal baseUnits vector (which is exposed through getUnits) only the unit that is
395
    *                     assigned to this specific operator, among those of the duplicated group. Access to the
396
    *                     other units is preserved through the _units vector, that can be accessed from within this
397
    *                     operator object.
398
399
400
    *
    */
    virtual void collapseUnits() {
401
        if (_unitID < 0 || _units.empty()) {
402
            LOG(error) << "Operator " << _name << ": Cannot collapse units!";
403
404
405
406
407
408
            return;
        }
        _baseUnits.clear();
        _baseUnits.push_back(_units[_unitID]);
    }

Alessio Netti's avatar
Alessio Netti committed
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
protected:

    /**
    * @brief              Returns the timestamp associated with the next compute task
    *
    *                     If the sync option is enabled, the timestamp will be adjusted to be synchronized with the
    *                     other sensor readings.
    *
    * @return             Timestamp of the next compute task
    */
    uint64_t nextReadingTime() {
        uint64_t now = getTimestamp();
        uint64_t next;
        if (_sync) {
            uint64_t interval64 = static_cast<uint64_t>(_interval);
            uint64_t now_ms = now / 1000 / 1000;
            uint64_t waitToStart = interval64 - (now_ms%interval64); //synchronize all measurements with other sensors
            if(!waitToStart ){ // less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
Alessio Netti's avatar
Alessio Netti committed
427
                return (now_ms + interval64 + 10)*1000*1000;
Alessio Netti's avatar
Alessio Netti committed
428
            }
Alessio Netti's avatar
Alessio Netti committed
429
            return (now_ms + waitToStart + 10)*1000*1000;
Alessio Netti's avatar
Alessio Netti committed
430
431
432
433
434
435
436
437
438
        } else {
            return now + MS_TO_NS(_interval);
        }
    }

    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
439
440
    *                     compute() method, which encapsulates the real logic of the operator. The compute method
    *                     is automatically called over units as required by the Operator's configuration.
Alessio Netti's avatar
Alessio Netti committed
441
442
    *
    */
443
    virtual void computeAsync() override {
Alessio Netti's avatar
Alessio Netti committed
444
445
446
447
        // Sleeping until we are allowed to start
        if(_delayInterval > 0) {
            sleep(_delayInterval);
            _delayInterval = 0;
448
            LOG(info) << "Operator " + _name + ": starting computation after delayed start!";
Alessio Netti's avatar
Alessio Netti committed
449
450
        }

Alessio Netti's avatar
Alessio Netti committed
451
452
        try {
            if (_duplicate && _unitID >= 0)
453
                compute(_units[_unitID]);
Alessio Netti's avatar
Alessio Netti committed
454
            else
Alessio Netti's avatar
Alessio Netti committed
455
                for (unsigned i = 0; i < _units.size(); i++)
456
                    compute(_units[i]);
Alessio Netti's avatar
Alessio Netti committed
457
        } catch(const exception& e) {
Alessio Netti's avatar
Alessio Netti committed
458
            LOG(error) << e.what();
Alessio Netti's avatar
Alessio Netti committed
459
        }
Alessio Netti's avatar
Alessio Netti committed
460
461
462
463

        if (_timer && _keepRunning) {
            _timer->expires_at(timestamp2ptime(nextReadingTime()));
            _pendingTasks++;
464
            _timer->async_wait(bind(&OperatorTemplate::computeAsync, this));
Alessio Netti's avatar
Alessio Netti committed
465
466
467
468
        }
        _pendingTasks--;
    }

469
470
471
472
473
474
475
476
477
478
    /**
    * @brief              Data analytics computation logic
    *
    *                     This method contains the actual logic used by the analyzed, and is automatically called by
    *                     the computeAsync method.
    *
    * @param unit         Shared pointer to unit to be processed
    */
    virtual void compute(U_Ptr unit) = 0;

479
480
    // Cache for frequently used units in ondemand and job modes
    map<string, U_Ptr>* _unitCache;
Alessio Netti's avatar
Alessio Netti committed
481
482
    // Helper map to keep track of the cache insertion times
    map<uint64_t, U_Ptr>* _insertionLUT;
Alessio Netti's avatar
Alessio Netti committed
483
    // Vector of pointers to the internal units
Alessio Netti's avatar
Alessio Netti committed
484
    vector<U_Ptr>   _units;
Alessio Netti's avatar
Alessio Netti committed
485
486
    // Vector of pointers to the internal units, casted to UnitInterface - only efficient way to do this in C++
    // unless we use raw arrays
Alessio Netti's avatar
Alessio Netti committed
487
    vector<UnitPtr> _baseUnits;
Alessio Netti's avatar
Alessio Netti committed
488
    // Instance of a QueryEngine object to get sensor data
Alessio Netti's avatar
Alessio Netti committed
489
    QueryEngine&    _queryEngine;
Alessio Netti's avatar
Alessio Netti committed
490
491
};

492
#endif //PROJECT_OPERATORTEMPLATE_H