OperatorTemplate.h 21.3 KB
Newer Older
1
//================================================================================
2
// Name        : OperatorTemplate.h
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Template implementing features needed by Operators.
7
8
9
10
11
12
13
14
15
16
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28
29
#ifndef PROJECT_OPERATORTEMPLATE_H
#define PROJECT_OPERATORTEMPLATE_H
Alessio Netti's avatar
Alessio Netti committed
30

31
#include "OperatorInterface.h"
Alessio Netti's avatar
Alessio Netti committed
32
#include "UnitTemplate.h"
Alessio Netti's avatar
Alessio Netti committed
33
#include "UnitGenerator.h"
Alessio Netti's avatar
Alessio Netti committed
34
35
36
37
38
39
40
#include "timestamp.h"
#include "QueryEngine.h"

#include <vector>
#include <map>
#include <memory>

Alessio Netti's avatar
Alessio Netti committed
41
using namespace std;
Alessio Netti's avatar
Alessio Netti committed
42
43

/**
44
45
 * @brief Template that implements features needed by Operators and complying
 *        to OperatorInterface.
Alessio Netti's avatar
Alessio Netti committed
46
 *
47
48
49
50
 * @details The template accepts any class derived from SensorBase, allowing
 *          users to add their own attributes to input and output sensors. This
 *          template also employs UnitTemplates, which are instantiated
 *          according to the input sensor type.
Alessio Netti's avatar
Alessio Netti committed
51
 *
52
 * @ingroup operator
Alessio Netti's avatar
Alessio Netti committed
53
54
 */
template <typename S>
55
class OperatorTemplate : public OperatorInterface {
Alessio Netti's avatar
Alessio Netti committed
56
    // The template shall only be instantiated for classes which derive from SensorBase
Alessio Netti's avatar
Alessio Netti committed
57
    static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
Alessio Netti's avatar
Alessio Netti committed
58
59
60

protected:
    // For readability
Alessio Netti's avatar
Alessio Netti committed
61
62
    using S_Ptr = shared_ptr<S>;
    using U_Ptr = shared_ptr< UnitTemplate<S> >;
Alessio Netti's avatar
Alessio Netti committed
63
64

public:
65
    
Alessio Netti's avatar
Alessio Netti committed
66
67
68
    /**
    * @brief            Class constructor
    *
69
    * @param name       Name of the operator
Alessio Netti's avatar
Alessio Netti committed
70
    */
71
72
    OperatorTemplate(const string name) :
            OperatorInterface(name),
73
            _unitCache(nullptr),
Alessio Netti's avatar
Alessio Netti committed
74
            _insertionLUT(nullptr),
75
            _queryEngine(QueryEngine::getInstance()) {}
Alessio Netti's avatar
Alessio Netti committed
76
77
78
79
80
81
82
83
84

    /**
    * @brief            Copy constructor
    *
    *                   Achtung! The vectors of units will be copied in shallow fashion, and the underlying output
    *                   sensors across different copied units will point to the same location. This is intended,
    *                   as "duplicated" units share the same view of the data analytics module.
    *
    */
85
86
    OperatorTemplate(const OperatorTemplate& other) :
            OperatorInterface(other),
87
88
            _unitCache(nullptr),
            _insertionLUT(nullptr),
89
            _queryEngine(QueryEngine::getInstance()) {
Alessio Netti's avatar
Alessio Netti committed
90
91
92
93
94
95
96
97
98
99
100
101
102

        for(auto u : other._units) {
            _units.push_back(u);
            _baseUnits.push_back(u);
        }
    }

    /**
    * @brief            Assignment operator
    *
    *                   Here, the same considerations done for the copy constructor apply.
    *
    */
103
104
    OperatorTemplate& operator=(const OperatorTemplate& other) {
        OperatorInterface::operator=(other);
Alessio Netti's avatar
Alessio Netti committed
105
106
107
108
109
110
111
112
113
114
115
116
117
        _units.clear();

        for(auto u : other._units) {
            _units.push_back(u);
            _baseUnits.push_back(u);
        }

        return *this;
    }

    /**
    * @brief            Class destructor
    */
118
    virtual ~OperatorTemplate() {
Alessio Netti's avatar
Alessio Netti committed
119
120
        _units.clear();
        _baseUnits.clear();
Alessio Netti's avatar
Alessio Netti committed
121

122
123
124
        if(_unitCache) {
            _unitCache->clear();
            delete _unitCache;
Alessio Netti's avatar
Alessio Netti committed
125
126
127
128
129
        }
        if(_insertionLUT) {
            _insertionLUT->clear();
            delete _insertionLUT;
        }
Alessio Netti's avatar
Alessio Netti committed
130
131
    }

132
    /**
133
    * @brief            Prints the current operator configuration
134
135
136
137
    *
    * @param ll         Logging level at which the configuration is printed
    */
    virtual void printConfig(LOG_LEVEL ll) override {
Alessio Netti's avatar
Alessio Netti committed
138
        if(_mqttPart!="")
139
            LOG_VAR(ll) << "            MQTT prefix:     " << _mqttPart;
Alessio Netti's avatar
Alessio Netti committed
140
        LOG_VAR(ll) << "            Disabled:        " << (_disabled ? "true" : "false");
141
142
143
144
145
        LOG_VAR(ll) << "            Sync readings:   " << (_sync ? "enabled" : "disabled");
        LOG_VAR(ll) << "            Streaming mode:  " << (_streaming ? "enabled" : "disabled");
        LOG_VAR(ll) << "            Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
        LOG_VAR(ll) << "            MinValues:       " << _minValues;
        LOG_VAR(ll) << "            Interval:        " << _interval;
146
        LOG_VAR(ll) << "            Interval Delay:  " << _delayInterval;
147
        LOG_VAR(ll) << "            Unit Cache Size: " << _unitCacheLimit;
148
149
        if(!_units.empty()) {
            LOG_VAR(ll) << "            Units:";
150
151
152
153
154
            if(_unitID<0)
                for (auto u : _units)
                    u->printConfig(ll, lg);
            else
                _units[_unitID]->printConfig(ll, lg);
155
156
157
158
        } else
            LOG_VAR(ll) << "            Units:           none";
    }

Alessio Netti's avatar
Alessio Netti committed
159
    /**
160
    * @brief              Adds an unit to this operator
Alessio Netti's avatar
Alessio Netti committed
161
162
163
164
    *
    * @param u            Shared pointer to a UnitInterface object
    */
    virtual void addUnit(UnitPtr u)  override {
165
        // Since the OperatorInterface method accepts UnitInterface objects, we must cast the input argument
Alessio Netti's avatar
Alessio Netti committed
166
        // to its actual type, which is UnitTemplate<S>
Alessio Netti's avatar
Alessio Netti committed
167
        if (U_Ptr dUnit = dynamic_pointer_cast< UnitTemplate<S> >(u)) {
Alessio Netti's avatar
Alessio Netti committed
168
169
            _units.push_back(dUnit);
            _baseUnits.push_back(u);
170
171
172
173
174
175
176
            if(dUnit->isTopUnit())
                for(auto& subUnit : dUnit->getSubUnits()) {
                    if (U_Ptr dSubUnit = dynamic_pointer_cast< UnitTemplate<S> >(subUnit))
                        _baseUnits.push_back(dSubUnit);
                    else
                        LOG(error) << "Operator " << _name << ": Type mismatch when storing sub-unit! Will be omitted";
                }
Alessio Netti's avatar
Alessio Netti committed
177
178
        }
        else
179
            LOG(error) << "Operator " << _name << ": Type mismatch when storing unit! Will be omitted";
Alessio Netti's avatar
Alessio Netti committed
180
181
182
    }

    /**
183
    * @brief              Returns the units of this operator
Alessio Netti's avatar
Alessio Netti committed
184
185
    *
    *                     The units returned by this method are of the UnitInterface type. The actual units, in their
186
    *                     derived type, are used internally. If the operator uses a lock to regulate unit access,
187
    *                     this will be acquired and must be released through the releaseUnits() method.
188
    *                     
189
    * @return             The vector of UnitInterface objects of this operator
Alessio Netti's avatar
Alessio Netti committed
190
    */
191
192
193
194
195
196
197
    virtual vector<UnitPtr>& getUnits() override	{ return _baseUnits; }
    
    /**
     * @brief             Releases the internal lock for unit access
     * 
     *                    This method is meant to regulate concurrent access to units which are generated dynamically.
     *                    In this specific implementation, the method does not perform anything as units in standard
198
     *                    operators are static and never modified.
199
200
201
     * 
     */
    virtual void releaseUnits() override {}
Alessio Netti's avatar
Alessio Netti committed
202
203

    /**
204
205
     * @brief              Clears all the units contained in this operator
     */
Alessio Netti's avatar
Alessio Netti committed
206
207
    virtual void clearUnits() override { _units.clear(); _baseUnits.clear(); _unitID = -1; }

208
209
210
211
212
213
214
	/**
	 * @brief              Returns the number of messages this operator will
	 *                     generate per second
	 *
	 * @return             Messages/s
	 */
	virtual float getMsgRate() override {
215
216
217
218
219
220
221
222
		float val = 0.0f;
        for (const auto &u : this->getUnits())
            for(const auto &s : u->getBaseOutputs()) {
                if (s->getSubsampling() > 0)
                    val += 1.0f / ((float) s->getSubsampling());
            }
        this->releaseUnits();
        return val * (1000.0f / (float)_interval) / (float)_minValues;
223
	}
Alessio Netti's avatar
Alessio Netti committed
224
225

    /**
226
    * @brief              Initializes this operator
Alessio Netti's avatar
Alessio Netti committed
227
    *
228
    *                     This method performs additional initialization compared to OperatorInterface. Specifically,
Alessio Netti's avatar
Alessio Netti committed
229
230
231
232
    *                     all output sensors in units are initialized, and their caches instantiated.
    *
    * @param io           Boost ASIO service to be used
    */
233
    virtual void init(boost::asio::io_service& io) final override {
234
        OperatorInterface::init(io);
Alessio Netti's avatar
Alessio Netti committed
235

236
        for(const auto u : _units)
237
            u->init(_interval);
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261

        this->execOnInit();
    }

    /**
    * @brief              Waits for the operator to complete its tasks
    *
    *                     Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0).
    */
    virtual void wait() final override {
        uint64_t sleepms=10, i=0;
        uint64_t timeout = _interval<10000 ? 30000 : _interval*3;

        while(sleepms*i++ < timeout) {
            if (_pendingTasks)
                std::this_thread::sleep_for(std::chrono::milliseconds(sleepms));
            else {
                this->execOnStop();
                LOG(info) << "Operator " << _name << " stopped.";
                return;
            }
        }

        LOG(warning) << "Operator " << _name << " will not finish! Skipping it";
Alessio Netti's avatar
Alessio Netti committed
262
263
264
    }

    /**
265
    * @brief              Starts this operator
Alessio Netti's avatar
Alessio Netti committed
266
    */
267
    virtual void start() final override {
Alessio Netti's avatar
Alessio Netti committed
268
        if(_keepRunning) {
269
            LOG(info) << "Operator " << _name << " already running.";
Alessio Netti's avatar
Alessio Netti committed
270
            return;
Alessio Netti's avatar
Alessio Netti committed
271
        } else if(!_streaming) {
272
            LOG(error) << "On-demand operator " << _name << " cannot be started.";
Alessio Netti's avatar
Alessio Netti committed
273
            return;
Alessio Netti's avatar
Alessio Netti committed
274
275
        } else if(_disabled)
            return;
Alessio Netti's avatar
Alessio Netti committed
276

277
278
279
280
281
        if (!this->execOnStart()) {
            LOG(error) << "Operator " << _name << ": startup failed.";
            return;
        }

Alessio Netti's avatar
Alessio Netti committed
282
283
        _keepRunning = 1;
        _pendingTasks++;
284
        _timer->async_wait(bind(&OperatorTemplate<S>::computeAsync, this));
285
        LOG(info) << "Operator " << _name << " started.";
Alessio Netti's avatar
Alessio Netti committed
286
287
288
    }

    /**
289
    * @brief              Stops this operator
Alessio Netti's avatar
Alessio Netti committed
290
    */
291
292
293
    virtual void stop() final override {
        if(_keepRunning == 0 || !_streaming) {
            LOG(debug) << "Operator " << _name << " already stopped.";
Alessio Netti's avatar
Alessio Netti committed
294
            return;
Alessio Netti's avatar
Alessio Netti committed
295
296
297
        }

        _keepRunning = 0;
298
299
        //cancel any outstanding readAsync()
        _timer->cancel();
Alessio Netti's avatar
Alessio Netti committed
300
301
    }

Alessio Netti's avatar
Alessio Netti committed
302
303
304
305
306
307
308
309
310
    /**
    * @brief              Perform a REST-triggered PUT action
    *
    *                     This is a dummy implementation that can be overridden in user plugins. Any thrown
    *                     exceptions will be reported in the response string.
    *
    * @param action       Name of the action to be performed
    * @param queries      Vector of queries (key-value pairs)
    *
Alessio Netti's avatar
Alessio Netti committed
311
    * @return             Response to the request as a <response, data> pair
Alessio Netti's avatar
Alessio Netti committed
312
    */
313
    virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) override {
Alessio Netti's avatar
Alessio Netti committed
314
315
316
317
318
319
320
        throw invalid_argument("Unknown plugin action " + action + " requested!");
    }

    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
321
    *                     perform data analytics queries on the operator, which must have the _streaming attribute set
Alessio Netti's avatar
Alessio Netti committed
322
323
324
325
326
327
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Unit name for which the query must be performed
    * @return             a map<string, reading_t> containing the output of the query
    */
Alessio Netti's avatar
Alessio Netti committed
328
    virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
Alessio Netti's avatar
Alessio Netti committed
329
        map<string, reading_t> outMap;
Alessio Netti's avatar
Alessio Netti committed
330
        if( !_streaming && !_disabled ) {
331
332
            shared_ptr<SensorNavigator> navi = _queryEngine.getNavigator();
            UnitGenerator<S> unitGen(navi);
333
            // We check whether the input node belongs to this operator's unit domain
334
            if(!_unitCache)
335
                throw std::runtime_error("Initialization error in operator " + _name + "!");
Alessio Netti's avatar
Alessio Netti committed
336

337
            // Getting exclusive access to the operator
Alessio Netti's avatar
Alessio Netti committed
338
339
            while( _onDemandLock.exchange(true) ) {}

Alessio Netti's avatar
Alessio Netti committed
340
341
342
343
            // If the ondemand template unit refers to root it means it has been completely resolved already,
            //  and therefore we can use such unit without doing any resolution
            try {
                U_Ptr tempUnit = nullptr;
344
                if (_unitCache->count(node)) {
345
                    LOG(debug) << "Operator " << _name << ": cache hit for unit " << node << ".";
346
                    tempUnit = _unitCache->at(node);
Alessio Netti's avatar
Alessio Netti committed
347
                } else {
348
                    if (!_unitCache->count(SensorNavigator::templateKey))
349
350
                        throw std::runtime_error("No template unit in operator " + _name + "!");
                    LOG(debug) << "Operator " << _name << ": cache miss for unit " << node << ".";
351
                    U_Ptr uTemplate = _unitCache->at(SensorNavigator::templateKey);
352
                    tempUnit = unitGen.generateFromTemplate(uTemplate, node, list<string>(), _mqttPart, _enforceTopics, _relaxed);
353
                    addToUnitCache(tempUnit);
Alessio Netti's avatar
Alessio Netti committed
354
355
                }
                // Initializing sensors if necessary
356
                tempUnit->init(_interval);
357
                compute(tempUnit);
358
                retrieveAndFlush(outMap, tempUnit);
Alessio Netti's avatar
Alessio Netti committed
359
360
361
362
            } catch(const exception& e) {
                _onDemandLock.store(false);
                throw;
            }
Alessio Netti's avatar
Alessio Netti committed
363
            _onDemandLock.store(false);
Alessio Netti's avatar
Alessio Netti committed
364
        } else if( _keepRunning && !_disabled ) {
Alessio Netti's avatar
Alessio Netti committed
365
            bool found = false;
366
367
368
369
370
371
372
373
374
375
            if(!_duplicate) {
                for(const auto &u : _units)
                    if(u->getName() == node) {
                        found = true;
                        retrieveAndFlush(outMap, u, false);
                    }
            } else if(_unitID>=0 && node==_units[_unitID]->getName()) {
                found = true;
                retrieveAndFlush(outMap, _units[_unitID], false);
            }
Alessio Netti's avatar
Alessio Netti committed
376
377
378
379

            if(!found)
                throw std::domain_error("Node " + node + " does not belong to the domain of " + _name + "!");
        } else
380
            throw std::runtime_error("Operator " + _name + ": not available for on-demand query!");
Alessio Netti's avatar
Alessio Netti committed
381
        return outMap;
Alessio Netti's avatar
Alessio Netti committed
382
383
    }

Alessio Netti's avatar
Alessio Netti committed
384
    /**
385
    * @brief              Adds an unit to the internal cache of units
Alessio Netti's avatar
Alessio Netti committed
386
    *
387
    *                     The cache is used to speed up response times to queries of on-demand operators, and reduce
Alessio Netti's avatar
Alessio Netti committed
388
389
390
    *                     overall overhead. The cache has a limited size: once this size is reached, at every insertion
    *                     the oldest entry in the cache is removed.
    *
391
    * @param unit         Shared pointer to the Unit object to be added to the cache
Alessio Netti's avatar
Alessio Netti committed
392
    */
393
394
395
    void addToUnitCache(U_Ptr unit) {
        if(!_unitCache) {
            _unitCache = new map<string, U_Ptr>();
Alessio Netti's avatar
Alessio Netti committed
396
397
398
            _insertionLUT = new map<uint64_t, U_Ptr>();
        }

399
        if(_unitCache->size() >= _unitCacheLimit) {
400
401
402
            auto oldest = _insertionLUT->begin();
            _unitCache->erase(oldest->second->getName());
            _insertionLUT->erase(oldest->first);
Alessio Netti's avatar
Alessio Netti committed
403
        }
404
        _unitCache->insert(make_pair(unit->getName(), unit));
Alessio Netti's avatar
Alessio Netti committed
405
406
407
408
409
410
        // The template unit must never be deleted, even if the cache is full; therefore, we omit its entry from
        // the insertion time LUT, so that it is never picked for deletion
        if(unit->getName() != SensorNavigator::templateKey)
            _insertionLUT->insert(make_pair(getTimestamp(), unit));
    }

411
412
413
    /**
    * @brief              Clears the internal baseUnits vector and only preserves the currently active unit
    *
414
    *                     This method is used for duplicated operators that share the same unit vector, with different
415
416
    *                     unit IDs. In order to expose distinct units to the outside, this method allows to keep
    *                     in the internal baseUnits vector (which is exposed through getUnits) only the unit that is
417
    *                     assigned to this specific operator, among those of the duplicated group. Access to the
418
    *                     other units is preserved through the _units vector, that can be accessed from within this
419
    *                     operator object.
420
421
422
    *
    */
    virtual void collapseUnits() {
423
        if (_unitID < 0 || _units.empty()) {
424
            LOG(error) << "Operator " << _name << ": Cannot collapse units!";
425
426
427
428
            return;
        }
        _baseUnits.clear();
        _baseUnits.push_back(_units[_unitID]);
429
430
431
432
        // If the unit is hierarchical, we add its subunits as well
        if(_units[_unitID]->isTopUnit())
            for(auto& subUnit : _units[_unitID]->getSubUnits())
                _baseUnits.push_back(subUnit);
433
434
    }

Alessio Netti's avatar
Alessio Netti committed
435
protected:
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
    
    /**
    * @brief              Retrieves output values of a unit and puts them in a map
    *  
    * @param outMap       string to reading_t map in which outputs must be stored
    * @param unit         Unit (flat or hierarchical) to be scanned
    * @param flushQueues  If true, the queues of outbound sensor values will be flushed as well 
    */
    void retrieveAndFlush(map<string, reading_t>& outMap, U_Ptr unit, bool flushQueues=true) {
        // Retrieving top-level outputs
        for (const auto &o : unit->getOutputs()) {
            outMap.insert(make_pair(o->getName(), o->getLatestValue()));
            if(flushQueues)
                o->clearReadingQueue();
        }
        // Retrieving sub-unit outputs (if any)
        for (const auto &subUnit : unit->getSubUnits()) {
            for (const auto &o : subUnit->getOutputs()) {
                outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                if(flushQueues)
                    o->clearReadingQueue();
            }
        }
    }
Alessio Netti's avatar
Alessio Netti committed
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476

    /**
    * @brief              Returns the timestamp associated with the next compute task
    *
    *                     If the sync option is enabled, the timestamp will be adjusted to be synchronized with the
    *                     other sensor readings.
    *
    * @return             Timestamp of the next compute task
    */
    uint64_t nextReadingTime() {
        uint64_t now = getTimestamp();
        uint64_t next;
        if (_sync) {
            uint64_t interval64 = static_cast<uint64_t>(_interval);
            uint64_t now_ms = now / 1000 / 1000;
            uint64_t waitToStart = interval64 - (now_ms%interval64); //synchronize all measurements with other sensors
            if(!waitToStart ){ // less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
Alessio Netti's avatar
Alessio Netti committed
477
                return (now_ms + interval64 + 10)*1000*1000;
Alessio Netti's avatar
Alessio Netti committed
478
            }
479
            return (now_ms + waitToStart + _delayInterval)*1000*1000;
Alessio Netti's avatar
Alessio Netti committed
480
481
482
483
484
485
486
487
488
        } else {
            return now + MS_TO_NS(_interval);
        }
    }

    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
489
490
    *                     compute() method, which encapsulates the real logic of the operator. The compute method
    *                     is automatically called over units as required by the Operator's configuration.
Alessio Netti's avatar
Alessio Netti committed
491
492
    *
    */
493
    virtual void computeAsync() override {
494
495
        if (_duplicate && _unitID >= 0) {
            try {
496
                compute(_units[_unitID]);
497
498
499
500
501
502
            } catch(const exception& e) {
                LOG(error) << e.what();
            }
        } else {
            for (unsigned i = 0; i < _units.size(); i++) {
                try {
503
                    compute(_units[i]);
504
505
506
507
508
                } catch (const exception &e) {
                    LOG(error) << e.what();
                    continue;
                }
            }
Alessio Netti's avatar
Alessio Netti committed
509
        }
Alessio Netti's avatar
Alessio Netti committed
510

Alessio Netti's avatar
Alessio Netti committed
511
        if (_timer && _keepRunning && !_disabled) {
Alessio Netti's avatar
Alessio Netti committed
512
513
            _timer->expires_at(timestamp2ptime(nextReadingTime()));
            _pendingTasks++;
514
            _timer->async_wait(bind(&OperatorTemplate::computeAsync, this));
Alessio Netti's avatar
Alessio Netti committed
515
516
517
518
        }
        _pendingTasks--;
    }

519
520
521
522
523
524
525
526
527
528
    /**
    * @brief              Data analytics computation logic
    *
    *                     This method contains the actual logic used by the analyzed, and is automatically called by
    *                     the computeAsync method.
    *
    * @param unit         Shared pointer to unit to be processed
    */
    virtual void compute(U_Ptr unit) = 0;

529
530
    // Cache for frequently used units in ondemand and job modes
    map<string, U_Ptr>* _unitCache;
Alessio Netti's avatar
Alessio Netti committed
531
532
    // Helper map to keep track of the cache insertion times
    map<uint64_t, U_Ptr>* _insertionLUT;
Alessio Netti's avatar
Alessio Netti committed
533
    // Vector of pointers to the internal units
Alessio Netti's avatar
Alessio Netti committed
534
    vector<U_Ptr>   _units;
Alessio Netti's avatar
Alessio Netti committed
535
536
    // Vector of pointers to the internal units, casted to UnitInterface - only efficient way to do this in C++
    // unless we use raw arrays
Alessio Netti's avatar
Alessio Netti committed
537
    vector<UnitPtr> _baseUnits;
Alessio Netti's avatar
Alessio Netti committed
538
    // Instance of a QueryEngine object to get sensor data
Alessio Netti's avatar
Alessio Netti committed
539
    QueryEngine&    _queryEngine;
Alessio Netti's avatar
Alessio Netti committed
540
541
};

542
#endif //PROJECT_OPERATORTEMPLATE_H