AnalyzerTemplate.h 16.4 KB
Newer Older
Alessio Netti's avatar
Alessio Netti committed
1
2
3
4
5
6
7
8
9
//
// Created by Netti, Alessio on 10.12.18.
//

#ifndef PROJECT_ANALYZERTEMPLATE_H
#define PROJECT_ANALYZERTEMPLATE_H

#include "AnalyzerInterface.h"
#include "UnitTemplate.h"
Alessio Netti's avatar
Alessio Netti committed
10
#include "UnitGenerator.h"
Alessio Netti's avatar
Alessio Netti committed
11
12
13
14
15
16
17
#include "timestamp.h"
#include "QueryEngine.h"

#include <vector>
#include <map>
#include <memory>

Alessio Netti's avatar
Alessio Netti committed
18
using namespace std;
Alessio Netti's avatar
Alessio Netti committed
19
20
21
22
23
24
25
26
27
28
29

/**
 * Template that implements features needed by Analyzers and complying to AnalyzerInterface.
 *
 * The template accepts any class derived from SensorBase, allowing users to add their own attributes to input and output
 * sensors. This template also employs UnitTemplates, which are instantiated according to the input sensor type.
 *
 */
template <typename S>
class AnalyzerTemplate : public AnalyzerInterface {
    // The template shall only be instantiated for classes which derive from SensorBase
Alessio Netti's avatar
Alessio Netti committed
30
    static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
Alessio Netti's avatar
Alessio Netti committed
31
32
33

protected:
    // For readability
Alessio Netti's avatar
Alessio Netti committed
34
35
    using S_Ptr = shared_ptr<S>;
    using U_Ptr = shared_ptr< UnitTemplate<S> >;
Alessio Netti's avatar
Alessio Netti committed
36
37
38

public:

Alessio Netti's avatar
Alessio Netti committed
39
40
    static const unsigned cacheLimit = 100;

Alessio Netti's avatar
Alessio Netti committed
41
42
43
44
45
    /**
    * @brief            Class constructor
    *
    * @param name       Name of the analyzer
    */
Alessio Netti's avatar
Alessio Netti committed
46
    AnalyzerTemplate(const string name) :
47
            AnalyzerInterface(name),
Alessio Netti's avatar
Alessio Netti committed
48
49
            _ondemandCache(nullptr),
            _insertionLUT(nullptr),
50
            _queryEngine(QueryEngine::getInstance()) {}
Alessio Netti's avatar
Alessio Netti committed
51
52
53
54
55
56
57
58
59
60

    /**
    * @brief            Copy constructor
    *
    *                   Achtung! The vectors of units will be copied in shallow fashion, and the underlying output
    *                   sensors across different copied units will point to the same location. This is intended,
    *                   as "duplicated" units share the same view of the data analytics module.
    *
    */
    AnalyzerTemplate(const AnalyzerTemplate& other) :
61
62
            AnalyzerInterface(other),
            _queryEngine(QueryEngine::getInstance()) {
Alessio Netti's avatar
Alessio Netti committed
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

        for(auto u : other._units) {
            _units.push_back(u);
            _baseUnits.push_back(u);
        }
    }

    /**
    * @brief            Assignment operator
    *
    *                   Here, the same considerations done for the copy constructor apply.
    *
    */
    AnalyzerTemplate& operator=(const AnalyzerTemplate& other) {
        AnalyzerInterface::operator=(other);
        _units.clear();

        for(auto u : other._units) {
            _units.push_back(u);
            _baseUnits.push_back(u);
        }

        return *this;
    }

    /**
    * @brief            Class destructor
    */
    virtual ~AnalyzerTemplate() {
        _units.clear();
        _baseUnits.clear();
Alessio Netti's avatar
Alessio Netti committed
94
95
96
97
98
99
100
101
102

        if(_ondemandCache) {
            _ondemandCache->clear();
            delete _ondemandCache;
        }
        if(_insertionLUT) {
            _insertionLUT->clear();
            delete _insertionLUT;
        }
Alessio Netti's avatar
Alessio Netti committed
103
104
    }

105
106
107
108
109
110
    /**
    * @brief            Prints the current analyzer configuration
    *
    * @param ll         Logging level at which the configuration is printed
    */
    virtual void printConfig(LOG_LEVEL ll) override {
Alessio Netti's avatar
Alessio Netti committed
111
112
        if(_mqttPart!="")
            LOG_VAR(ll) << "            MQTT part:       " << _mqttPart;
113
114
115
116
117
118
119
120
121
122
123
124
125
126
        LOG_VAR(ll) << "            Sync readings:   " << (_sync ? "enabled" : "disabled");
        LOG_VAR(ll) << "            Streaming mode:  " << (_streaming ? "enabled" : "disabled");
        LOG_VAR(ll) << "            Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
        LOG_VAR(ll) << "            MinValues:       " << _minValues;
        LOG_VAR(ll) << "            Interval:        " << _interval;
        LOG_VAR(ll) << "            Start delay:     " << _delayInterval;
        if(!_units.empty()) {
            LOG_VAR(ll) << "            Units:";
            for (auto u : _units)
                u->printConfig(ll, lg);
        } else
            LOG_VAR(ll) << "            Units:           none";
    }

Alessio Netti's avatar
Alessio Netti committed
127
128
129
130
131
132
133
134
    /**
    * @brief              Adds an unit to this analyzer
    *
    * @param u            Shared pointer to a UnitInterface object
    */
    virtual void addUnit(UnitPtr u)  override {
        // Since the AnalyzerInterface method accepts UnitInterface objects, we must cast the input argument
        // to its actual type, which is UnitTemplate<S>
Alessio Netti's avatar
Alessio Netti committed
135
        if (U_Ptr dUnit = dynamic_pointer_cast< UnitTemplate<S> >(u)) {
Alessio Netti's avatar
Alessio Netti committed
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
            _units.push_back(dUnit);
            _baseUnits.push_back(u);
        }
        else
            LOG(error) << "Analyzer " << _name << ": Type mismatch when storing output sensor! Sensor omitted";
    }

    /**
    * @brief              Returns the units of this analyzer
    *
    *                     The units returned by this method are of the UnitInterface type. The actual units, in their
    *                     derived type, are used internally.
    *
    * @return             The vector of UnitInterface objects of this analyzer
    */
Alessio Netti's avatar
Alessio Netti committed
151
    virtual vector<UnitPtr>& getUnits() override	{ return _baseUnits; }
Alessio Netti's avatar
Alessio Netti committed
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166

    /**
    * @brief              Clears all the units contained in this analyzer
    */
    virtual void clearUnits() override { _units.clear(); _baseUnits.clear(); _unitID = -1; }


    /**
    * @brief              Initializes this analyzer
    *
    *                     This method performs additional initialization compared to AnalyzerInterface. Specifically,
    *                     all output sensors in units are initialized, and their caches instantiated.
    *
    * @param io           Boost ASIO service to be used
    */
167
    virtual void init(boost::asio::io_service& io) override {
Alessio Netti's avatar
Alessio Netti committed
168
169
170
        AnalyzerInterface::init(io);

        for(const auto u : _units) {
171
            for(const auto s : u->getOutputs())
Alessio Netti's avatar
Alessio Netti committed
172
173
174
175
176
177
178
                s->initSensor(_cacheSize);
        }
    }

    /**
    * @brief              Starts this analyzer
    */
179
    virtual void start() override {
Alessio Netti's avatar
Alessio Netti committed
180
181
182
        if(_keepRunning) {
            LOG(info) << "Analyzer " << _name << " already running.";
            return;
Alessio Netti's avatar
Alessio Netti committed
183
184
185
        } else if(!_streaming) {
            LOG(error) << "On-demand analyzer " << _name << " cannot be started.";
            return;
Alessio Netti's avatar
Alessio Netti committed
186
187
188
189
        }

        _keepRunning = 1;
        _pendingTasks++;
Alessio Netti's avatar
Alessio Netti committed
190
        _timer->async_wait(bind(&AnalyzerTemplate<S>::computeAsync, this));
Alessio Netti's avatar
Alessio Netti committed
191
192
193
194
        if(_delayInterval == 0)
            LOG(info) << "Analyzer " << _name << " started.";
        else
            LOG(info) << "Analyzer " << _name << " will be started after a delay of " << _delayInterval << " seconds.";
Alessio Netti's avatar
Alessio Netti committed
195
196
197
198
199
    }

    /**
    * @brief              Stops this analyzer
    */
200
    virtual void stop() override {
Alessio Netti's avatar
Alessio Netti committed
201
202
203
        if(_keepRunning == 0) {
            LOG(info) << "Analyzer " << _name << " already stopped.";
            return;
Alessio Netti's avatar
Alessio Netti committed
204
205
206
        } else if(!_streaming) {
            LOG(error) << "On-demand analyzer " << _name << " cannot be stopped.";
            return;
Alessio Netti's avatar
Alessio Netti committed
207
208
209
210
211
212
213
        }

        _keepRunning = 0;
        wait();
        LOG(info) << "Analyzer " << _name << " stopped.";
    }

Alessio Netti's avatar
Alessio Netti committed
214
215
216
217
218
219
220
221
222
    /**
    * @brief              Perform a REST-triggered PUT action
    *
    *                     This is a dummy implementation that can be overridden in user plugins. Any thrown
    *                     exceptions will be reported in the response string.
    *
    * @param action       Name of the action to be performed
    * @param queries      Vector of queries (key-value pairs)
    *
Alessio Netti's avatar
Alessio Netti committed
223
    * @return             Response to the request as a <response, data> pair
Alessio Netti's avatar
Alessio Netti committed
224
    */
Alessio Netti's avatar
Alessio Netti committed
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
    virtual restResponse_t REST(const string& action, const vector<pair<string,string>>& queries) override {
        throw invalid_argument("Unknown plugin action " + action + " requested!");
    }

    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
    *                     perform data analytics queries on the analyzer, which must have the _streaming attribute set
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Unit name for which the query must be performed
    * @return             a map<string, reading_t> containing the output of the query
    */
Alessio Netti's avatar
Alessio Netti committed
240
    virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
Alessio Netti's avatar
Alessio Netti committed
241
242
        map<string, reading_t> outMap;
        if( !_streaming ) {
243
244
            shared_ptr<SensorNavigator> navi = _queryEngine.getNavigator();
            UnitGenerator<S> unitGen(navi);
Alessio Netti's avatar
Alessio Netti committed
245
            // We check whether the input node belongs to this analyzer's unit domain
Alessio Netti's avatar
Alessio Netti committed
246
247
            if(!_ondemandCache)
                throw std::runtime_error("Initialization error in analyzer " + _name + "!");
Alessio Netti's avatar
Alessio Netti committed
248
249
250
251

            // Getting exclusive access to the analyzer
            while( _onDemandLock.exchange(true) ) {}

Alessio Netti's avatar
Alessio Netti committed
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
            // If the ondemand template unit refers to root it means it has been completely resolved already,
            //  and therefore we can use such unit without doing any resolution
            try {
                U_Ptr tempUnit = nullptr;
                if (_ondemandCache->count(node)) {
                    LOG(debug) << "Analyzer " << _name << ": cache hit for unit " << node << ".";
                    tempUnit = _ondemandCache->at(node);
                } else {
                    if (!_ondemandCache->count(SensorNavigator::templateKey))
                        throw std::runtime_error("No template unit in analyzer " + _name + "!");
                    LOG(debug) << "Analyzer " << _name << ": cache miss for unit " << node << ".";
                    U_Ptr uTemplate = _ondemandCache->at(SensorNavigator::templateKey);
                    tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), "");
                    addToOndemandCache(tempUnit);
                }

                // Initializing sensors if necessary
                for (const auto s : tempUnit->getOutputs())
                    if (!s->isInit())
                        s->initSensor(_cacheSize);

                addUnit(tempUnit);
                int onDemandUnitID = _units.size() - 1;
                compute(onDemandUnitID);
Alessio Netti's avatar
Alessio Netti committed
276

Alessio Netti's avatar
Alessio Netti committed
277
278
279
280
                for (const auto &o : _units[onDemandUnitID]->getOutputs()) {
                    outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                    o->clearReadingQueue();
                }
Alessio Netti's avatar
Alessio Netti committed
281

Alessio Netti's avatar
Alessio Netti committed
282
283
284
285
286
287
                _units.erase(_units.begin() + onDemandUnitID);
                _baseUnits.erase(_baseUnits.begin() + onDemandUnitID);
            } catch(const exception& e) {
                _onDemandLock.store(false);
                throw;
            }
Alessio Netti's avatar
Alessio Netti committed
288
289
290
291

            _onDemandLock.store(false);
        } else if( _keepRunning ) {
            bool found = false;
292
293
            // We iterate over _baseUnits and not _units because it only contains the units this analyzer is working on
            for(const auto& u : _baseUnits)
Alessio Netti's avatar
Alessio Netti committed
294
295
                if(u->getName() == node) {
                    found = true;
296
                    for(const auto& o : u->getBaseOutputs())
Alessio Netti's avatar
Alessio Netti committed
297
298
299
300
301
302
                        outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                }

            if(!found)
                throw std::domain_error("Node " + node + " does not belong to the domain of " + _name + "!");
        } else
Alessio Netti's avatar
Alessio Netti committed
303
            throw std::runtime_error("Analyzer " + _name + ": not available for on-demand query!");
Alessio Netti's avatar
Alessio Netti committed
304
        return outMap;
Alessio Netti's avatar
Alessio Netti committed
305
306
    }

Alessio Netti's avatar
Alessio Netti committed
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
    /**
    * @brief              Adds an unit to the internal cache of on-demand units
    *
    *                     The cache is used to speed up response times to queries of on-demand analyzers, and reduce
    *                     overall overhead. The cache has a limited size: once this size is reached, at every insertion
    *                     the oldest entry in the cache is removed.
    *
    * @param unit         Shared pointer to the Unit objecy to be added to the cache
    */
    void addToOndemandCache(U_Ptr unit) {
        if(!_ondemandCache) {
            _ondemandCache = new map<string, U_Ptr>();
            _insertionLUT = new map<uint64_t, U_Ptr>();
        }

        if(_ondemandCache->size() >= cacheLimit) {
            U_Ptr oldest = _insertionLUT->begin()->second;
            _ondemandCache->erase(oldest->getName());
        }
        _ondemandCache->insert(make_pair(unit->getName(), unit));
        // The template unit must never be deleted, even if the cache is full; therefore, we omit its entry from
        // the insertion time LUT, so that it is never picked for deletion
        if(unit->getName() != SensorNavigator::templateKey)
            _insertionLUT->insert(make_pair(getTimestamp(), unit));
    }

333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
    /**
    * @brief              Clears the internal baseUnits vector and only preserves the currently active unit
    *
    *                     This method is used for duplicated analyzers that share the same unit vector, with different
    *                     unit IDs. In order to expose distinct units to the outside, this method allows to keep
    *                     in the internal baseUnits vector (which is exposed through getUnits) only the unit that is
    *                     assigned to this specific analyzer, among those of the duplicated group. Access to the
    *                     other units is preserved through the _units vector, that can be accessed from within this
    *                     analyzer object.
    *
    */
    virtual void collapseUnits() {
        if(_unitID < 0 || _units.empty()) {
            LOG(error) << "Analyzer " << _name << ": Cannot collapse units!";
            return;
        }
        _baseUnits.clear();
        _baseUnits.push_back(_units[_unitID]);
    }

Alessio Netti's avatar
Alessio Netti committed
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
protected:

    /**
    * @brief              Returns the timestamp associated with the next compute task
    *
    *                     If the sync option is enabled, the timestamp will be adjusted to be synchronized with the
    *                     other sensor readings.
    *
    * @return             Timestamp of the next compute task
    */
    uint64_t nextReadingTime() {
        uint64_t now = getTimestamp();
        uint64_t next;
        if (_sync) {
            uint64_t interval64 = static_cast<uint64_t>(_interval);
            uint64_t now_ms = now / 1000 / 1000;
            uint64_t waitToStart = interval64 - (now_ms%interval64); //synchronize all measurements with other sensors
            if(!waitToStart ){ // less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
                return (now_ms + interval64)*1000*1000;
            }
            return (now_ms + waitToStart)*1000*1000;
        } else {
            return now + MS_TO_NS(_interval);
        }
    }

    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
    *                     compute() method, which encapsulates the real logic of the analyzer. The compute method
    *                     is automatically called over units as required by the Analyzer's configuration.
    *
    */
387
    virtual void computeAsync() override {
Alessio Netti's avatar
Alessio Netti committed
388
389
390
391
392
393
394
        // Sleeping until we are allowed to start
        if(_delayInterval > 0) {
            sleep(_delayInterval);
            _delayInterval = 0;
            LOG(info) << "Analyzer " + _name + ": starting computation after delayed start!";
        }

Alessio Netti's avatar
Alessio Netti committed
395
396
397
398
399
400
401
402
403
        try {
            if (_duplicate && _unitID >= 0)
                compute(_unitID);
            else
                for (int i = 0; i < _units.size(); i++)
                    compute(i);
        } catch(const exception& e) {
            LOG(error) << "Analyzer " + _name + ": internal error " + e.what() + " during computation!";
        }
Alessio Netti's avatar
Alessio Netti committed
404
405
406
407

        if (_timer && _keepRunning) {
            _timer->expires_at(timestamp2ptime(nextReadingTime()));
            _pendingTasks++;
Alessio Netti's avatar
Alessio Netti committed
408
            _timer->async_wait(bind(&AnalyzerTemplate::computeAsync, this));
Alessio Netti's avatar
Alessio Netti committed
409
410
411
412
        }
        _pendingTasks--;
    }

Alessio Netti's avatar
Alessio Netti committed
413
414
415
416
    // Cache for frequently used units in ondemand mode
    map<string, U_Ptr>* _ondemandCache;
    // Helper map to keep track of the cache insertion times
    map<uint64_t, U_Ptr>* _insertionLUT;
Alessio Netti's avatar
Alessio Netti committed
417
    // Vector of pointers to the internal units
Alessio Netti's avatar
Alessio Netti committed
418
    vector<U_Ptr>   _units;
Alessio Netti's avatar
Alessio Netti committed
419
420
    // Vector of pointers to the internal units, casted to UnitInterface - only efficient way to do this in C++
    // unless we use raw arrays
Alessio Netti's avatar
Alessio Netti committed
421
    vector<UnitPtr> _baseUnits;
Alessio Netti's avatar
Alessio Netti committed
422
    // Instance of a QueryEngine object to get sensor data
Alessio Netti's avatar
Alessio Netti committed
423
    QueryEngine&    _queryEngine;
Alessio Netti's avatar
Alessio Netti committed
424
425
426
};

#endif //PROJECT_ANALYZERTEMPLATE_H