AnalyzerTemplate.h 16.4 KB
Newer Older
Alessio Netti's avatar
Alessio Netti committed
1
2
3
4
5
6
7
8
9
//
// Created by Netti, Alessio on 10.12.18.
//

#ifndef PROJECT_ANALYZERTEMPLATE_H
#define PROJECT_ANALYZERTEMPLATE_H

#include "AnalyzerInterface.h"
#include "UnitTemplate.h"
Alessio Netti's avatar
Alessio Netti committed
10
#include "UnitGenerator.h"
Alessio Netti's avatar
Alessio Netti committed
11
12
13
14
15
16
17
#include "timestamp.h"
#include "QueryEngine.h"

#include <vector>
#include <map>
#include <memory>

Alessio Netti's avatar
Alessio Netti committed
18
using namespace std;
Alessio Netti's avatar
Alessio Netti committed
19
20
21
22
23
24
25
26
27
28
29

/**
 * Template that implements features needed by Analyzers and complying to AnalyzerInterface.
 *
 * The template accepts any class derived from SensorBase, allowing users to add their own attributes to input and output
 * sensors. This template also employs UnitTemplates, which are instantiated according to the input sensor type.
 *
 */
template <typename S>
class AnalyzerTemplate : public AnalyzerInterface {
    // The template shall only be instantiated for classes which derive from SensorBase
Alessio Netti's avatar
Alessio Netti committed
30
    static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
Alessio Netti's avatar
Alessio Netti committed
31
32
33

protected:
    // For readability
Alessio Netti's avatar
Alessio Netti committed
34
35
    using S_Ptr = shared_ptr<S>;
    using U_Ptr = shared_ptr< UnitTemplate<S> >;
Alessio Netti's avatar
Alessio Netti committed
36
37
38

public:

Alessio Netti's avatar
Alessio Netti committed
39
40
    static const unsigned cacheLimit = 100;

Alessio Netti's avatar
Alessio Netti committed
41
42
43
44
45
    /**
    * @brief            Class constructor
    *
    * @param name       Name of the analyzer
    */
Alessio Netti's avatar
Alessio Netti committed
46
    AnalyzerTemplate(const string name) :
47
            AnalyzerInterface(name),
Alessio Netti's avatar
Alessio Netti committed
48
49
            _ondemandCache(nullptr),
            _insertionLUT(nullptr),
50
            _queryEngine(QueryEngine::getInstance()) {}
Alessio Netti's avatar
Alessio Netti committed
51
52
53
54
55
56
57
58
59
60

    /**
    * @brief            Copy constructor
    *
    *                   Achtung! The vectors of units will be copied in shallow fashion, and the underlying output
    *                   sensors across different copied units will point to the same location. This is intended,
    *                   as "duplicated" units share the same view of the data analytics module.
    *
    */
    AnalyzerTemplate(const AnalyzerTemplate& other) :
61
62
            AnalyzerInterface(other),
            _queryEngine(QueryEngine::getInstance()) {
Alessio Netti's avatar
Alessio Netti committed
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

        for(auto u : other._units) {
            _units.push_back(u);
            _baseUnits.push_back(u);
        }
    }

    /**
    * @brief            Assignment operator
    *
    *                   Here, the same considerations done for the copy constructor apply.
    *
    */
    AnalyzerTemplate& operator=(const AnalyzerTemplate& other) {
        AnalyzerInterface::operator=(other);
        _units.clear();

        for(auto u : other._units) {
            _units.push_back(u);
            _baseUnits.push_back(u);
        }

        return *this;
    }

    /**
    * @brief            Class destructor
    */
    virtual ~AnalyzerTemplate() {
        _units.clear();
        _baseUnits.clear();
Alessio Netti's avatar
Alessio Netti committed
94
95
96
97
98
99
100
101
102

        if(_ondemandCache) {
            _ondemandCache->clear();
            delete _ondemandCache;
        }
        if(_insertionLUT) {
            _insertionLUT->clear();
            delete _insertionLUT;
        }
Alessio Netti's avatar
Alessio Netti committed
103
104
    }

105
106
107
108
109
110
    /**
    * @brief            Prints the current analyzer configuration
    *
    * @param ll         Logging level at which the configuration is printed
    */
    virtual void printConfig(LOG_LEVEL ll) override {
Michael Ott's avatar
Michael Ott committed
111
        LOG_VAR(ll) << "            MQTT part:       " << (_mqttPart != "" ? _mqttPart : std::string("DEFAULT"));
112
113
114
115
116
117
118
119
120
121
122
123
124
125
        LOG_VAR(ll) << "            Sync readings:   " << (_sync ? "enabled" : "disabled");
        LOG_VAR(ll) << "            Streaming mode:  " << (_streaming ? "enabled" : "disabled");
        LOG_VAR(ll) << "            Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
        LOG_VAR(ll) << "            MinValues:       " << _minValues;
        LOG_VAR(ll) << "            Interval:        " << _interval;
        LOG_VAR(ll) << "            Start delay:     " << _delayInterval;
        if(!_units.empty()) {
            LOG_VAR(ll) << "            Units:";
            for (auto u : _units)
                u->printConfig(ll, lg);
        } else
            LOG_VAR(ll) << "            Units:           none";
    }

Alessio Netti's avatar
Alessio Netti committed
126
127
128
129
130
131
132
133
    /**
    * @brief              Adds an unit to this analyzer
    *
    * @param u            Shared pointer to a UnitInterface object
    */
    virtual void addUnit(UnitPtr u)  override {
        // Since the AnalyzerInterface method accepts UnitInterface objects, we must cast the input argument
        // to its actual type, which is UnitTemplate<S>
Alessio Netti's avatar
Alessio Netti committed
134
        if (U_Ptr dUnit = dynamic_pointer_cast< UnitTemplate<S> >(u)) {
Alessio Netti's avatar
Alessio Netti committed
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
            _units.push_back(dUnit);
            _baseUnits.push_back(u);
        }
        else
            LOG(error) << "Analyzer " << _name << ": Type mismatch when storing output sensor! Sensor omitted";
    }

    /**
    * @brief              Returns the units of this analyzer
    *
    *                     The units returned by this method are of the UnitInterface type. The actual units, in their
    *                     derived type, are used internally.
    *
    * @return             The vector of UnitInterface objects of this analyzer
    */
Alessio Netti's avatar
Alessio Netti committed
150
    virtual vector<UnitPtr>& getUnits() override	{ return _baseUnits; }
Alessio Netti's avatar
Alessio Netti committed
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165

    /**
    * @brief              Clears all the units contained in this analyzer
    */
    virtual void clearUnits() override { _units.clear(); _baseUnits.clear(); _unitID = -1; }


    /**
    * @brief              Initializes this analyzer
    *
    *                     This method performs additional initialization compared to AnalyzerInterface. Specifically,
    *                     all output sensors in units are initialized, and their caches instantiated.
    *
    * @param io           Boost ASIO service to be used
    */
166
    virtual void init(boost::asio::io_service& io) override {
Alessio Netti's avatar
Alessio Netti committed
167
168
169
        AnalyzerInterface::init(io);

        for(const auto u : _units) {
170
            for(const auto s : u->getOutputs())
Alessio Netti's avatar
Alessio Netti committed
171
172
173
174
175
176
177
                s->initSensor(_cacheSize);
        }
    }

    /**
    * @brief              Starts this analyzer
    */
178
    virtual void start() override {
Alessio Netti's avatar
Alessio Netti committed
179
180
181
        if(_keepRunning) {
            LOG(info) << "Analyzer " << _name << " already running.";
            return;
Alessio Netti's avatar
Alessio Netti committed
182
183
184
        } else if(!_streaming) {
            LOG(error) << "On-demand analyzer " << _name << " cannot be started.";
            return;
Alessio Netti's avatar
Alessio Netti committed
185
186
187
188
        }

        _keepRunning = 1;
        _pendingTasks++;
Alessio Netti's avatar
Alessio Netti committed
189
        _timer->async_wait(bind(&AnalyzerTemplate<S>::computeAsync, this));
Alessio Netti's avatar
Alessio Netti committed
190
191
192
193
        if(_delayInterval == 0)
            LOG(info) << "Analyzer " << _name << " started.";
        else
            LOG(info) << "Analyzer " << _name << " will be started after a delay of " << _delayInterval << " seconds.";
Alessio Netti's avatar
Alessio Netti committed
194
195
196
197
198
    }

    /**
    * @brief              Stops this analyzer
    */
199
    virtual void stop() override {
Alessio Netti's avatar
Alessio Netti committed
200
201
202
        if(_keepRunning == 0) {
            LOG(info) << "Analyzer " << _name << " already stopped.";
            return;
Alessio Netti's avatar
Alessio Netti committed
203
204
205
        } else if(!_streaming) {
            LOG(error) << "On-demand analyzer " << _name << " cannot be stopped.";
            return;
Alessio Netti's avatar
Alessio Netti committed
206
207
208
209
210
211
212
        }

        _keepRunning = 0;
        wait();
        LOG(info) << "Analyzer " << _name << " stopped.";
    }

Alessio Netti's avatar
Alessio Netti committed
213
214
215
216
217
218
219
220
221
    /**
    * @brief              Perform a REST-triggered PUT action
    *
    *                     This is a dummy implementation that can be overridden in user plugins. Any thrown
    *                     exceptions will be reported in the response string.
    *
    * @param action       Name of the action to be performed
    * @param queries      Vector of queries (key-value pairs)
    *
Alessio Netti's avatar
Alessio Netti committed
222
    * @return             Response to the request as a <response, data> pair
Alessio Netti's avatar
Alessio Netti committed
223
    */
224
    virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) override {
Alessio Netti's avatar
Alessio Netti committed
225
226
227
228
229
230
231
232
233
234
235
236
237
238
        throw invalid_argument("Unknown plugin action " + action + " requested!");
    }

    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
    *                     perform data analytics queries on the analyzer, which must have the _streaming attribute set
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Unit name for which the query must be performed
    * @return             a map<string, reading_t> containing the output of the query
    */
Alessio Netti's avatar
Alessio Netti committed
239
    virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
Alessio Netti's avatar
Alessio Netti committed
240
241
        map<string, reading_t> outMap;
        if( !_streaming ) {
242
243
            shared_ptr<SensorNavigator> navi = _queryEngine.getNavigator();
            UnitGenerator<S> unitGen(navi);
Alessio Netti's avatar
Alessio Netti committed
244
            // We check whether the input node belongs to this analyzer's unit domain
Alessio Netti's avatar
Alessio Netti committed
245
246
            if(!_ondemandCache)
                throw std::runtime_error("Initialization error in analyzer " + _name + "!");
Alessio Netti's avatar
Alessio Netti committed
247
248
249
250

            // Getting exclusive access to the analyzer
            while( _onDemandLock.exchange(true) ) {}

Alessio Netti's avatar
Alessio Netti committed
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
            // If the ondemand template unit refers to root it means it has been completely resolved already,
            //  and therefore we can use such unit without doing any resolution
            try {
                U_Ptr tempUnit = nullptr;
                if (_ondemandCache->count(node)) {
                    LOG(debug) << "Analyzer " << _name << ": cache hit for unit " << node << ".";
                    tempUnit = _ondemandCache->at(node);
                } else {
                    if (!_ondemandCache->count(SensorNavigator::templateKey))
                        throw std::runtime_error("No template unit in analyzer " + _name + "!");
                    LOG(debug) << "Analyzer " << _name << ": cache miss for unit " << node << ".";
                    U_Ptr uTemplate = _ondemandCache->at(SensorNavigator::templateKey);
                    tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), "");
                    addToOndemandCache(tempUnit);
                }

                // Initializing sensors if necessary
                for (const auto s : tempUnit->getOutputs())
                    if (!s->isInit())
                        s->initSensor(_cacheSize);

                addUnit(tempUnit);
                int onDemandUnitID = _units.size() - 1;
                compute(onDemandUnitID);
Alessio Netti's avatar
Alessio Netti committed
275

Alessio Netti's avatar
Alessio Netti committed
276
277
278
279
                for (const auto &o : _units[onDemandUnitID]->getOutputs()) {
                    outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                    o->clearReadingQueue();
                }
Alessio Netti's avatar
Alessio Netti committed
280

Alessio Netti's avatar
Alessio Netti committed
281
282
283
284
285
286
                _units.erase(_units.begin() + onDemandUnitID);
                _baseUnits.erase(_baseUnits.begin() + onDemandUnitID);
            } catch(const exception& e) {
                _onDemandLock.store(false);
                throw;
            }
Alessio Netti's avatar
Alessio Netti committed
287
288
289
290

            _onDemandLock.store(false);
        } else if( _keepRunning ) {
            bool found = false;
291
292
            // We iterate over _baseUnits and not _units because it only contains the units this analyzer is working on
            for(const auto& u : _baseUnits)
Alessio Netti's avatar
Alessio Netti committed
293
294
                if(u->getName() == node) {
                    found = true;
295
                    for(const auto& o : u->getBaseOutputs())
Alessio Netti's avatar
Alessio Netti committed
296
297
298
299
300
301
                        outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                }

            if(!found)
                throw std::domain_error("Node " + node + " does not belong to the domain of " + _name + "!");
        } else
Alessio Netti's avatar
Alessio Netti committed
302
            throw std::runtime_error("Analyzer " + _name + ": not available for on-demand query!");
Alessio Netti's avatar
Alessio Netti committed
303
        return outMap;
Alessio Netti's avatar
Alessio Netti committed
304
305
    }

Alessio Netti's avatar
Alessio Netti committed
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
    /**
    * @brief              Adds an unit to the internal cache of on-demand units
    *
    *                     The cache is used to speed up response times to queries of on-demand analyzers, and reduce
    *                     overall overhead. The cache has a limited size: once this size is reached, at every insertion
    *                     the oldest entry in the cache is removed.
    *
    * @param unit         Shared pointer to the Unit objecy to be added to the cache
    */
    void addToOndemandCache(U_Ptr unit) {
        if(!_ondemandCache) {
            _ondemandCache = new map<string, U_Ptr>();
            _insertionLUT = new map<uint64_t, U_Ptr>();
        }

        if(_ondemandCache->size() >= cacheLimit) {
            U_Ptr oldest = _insertionLUT->begin()->second;
            _ondemandCache->erase(oldest->getName());
        }
        _ondemandCache->insert(make_pair(unit->getName(), unit));
        // The template unit must never be deleted, even if the cache is full; therefore, we omit its entry from
        // the insertion time LUT, so that it is never picked for deletion
        if(unit->getName() != SensorNavigator::templateKey)
            _insertionLUT->insert(make_pair(getTimestamp(), unit));
    }

332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
    /**
    * @brief              Clears the internal baseUnits vector and only preserves the currently active unit
    *
    *                     This method is used for duplicated analyzers that share the same unit vector, with different
    *                     unit IDs. In order to expose distinct units to the outside, this method allows to keep
    *                     in the internal baseUnits vector (which is exposed through getUnits) only the unit that is
    *                     assigned to this specific analyzer, among those of the duplicated group. Access to the
    *                     other units is preserved through the _units vector, that can be accessed from within this
    *                     analyzer object.
    *
    */
    virtual void collapseUnits() {
        if(_unitID < 0 || _units.empty()) {
            LOG(error) << "Analyzer " << _name << ": Cannot collapse units!";
            return;
        }
        _baseUnits.clear();
        _baseUnits.push_back(_units[_unitID]);
    }

Alessio Netti's avatar
Alessio Netti committed
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
protected:

    /**
    * @brief              Returns the timestamp associated with the next compute task
    *
    *                     If the sync option is enabled, the timestamp will be adjusted to be synchronized with the
    *                     other sensor readings.
    *
    * @return             Timestamp of the next compute task
    */
    uint64_t nextReadingTime() {
        uint64_t now = getTimestamp();
        uint64_t next;
        if (_sync) {
            uint64_t interval64 = static_cast<uint64_t>(_interval);
            uint64_t now_ms = now / 1000 / 1000;
            uint64_t waitToStart = interval64 - (now_ms%interval64); //synchronize all measurements with other sensors
            if(!waitToStart ){ // less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
                return (now_ms + interval64)*1000*1000;
            }
            return (now_ms + waitToStart)*1000*1000;
        } else {
            return now + MS_TO_NS(_interval);
        }
    }

    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
    *                     compute() method, which encapsulates the real logic of the analyzer. The compute method
    *                     is automatically called over units as required by the Analyzer's configuration.
    *
    */
386
    virtual void computeAsync() override {
Alessio Netti's avatar
Alessio Netti committed
387
388
389
390
391
392
393
        // Sleeping until we are allowed to start
        if(_delayInterval > 0) {
            sleep(_delayInterval);
            _delayInterval = 0;
            LOG(info) << "Analyzer " + _name + ": starting computation after delayed start!";
        }

Alessio Netti's avatar
Alessio Netti committed
394
395
396
397
        try {
            if (_duplicate && _unitID >= 0)
                compute(_unitID);
            else
Alessio Netti's avatar
Alessio Netti committed
398
399
                for (unsigned i = 0; i < _units.size(); i++)
                    compute((int)i);
Alessio Netti's avatar
Alessio Netti committed
400
401
402
        } catch(const exception& e) {
            LOG(error) << "Analyzer " + _name + ": internal error " + e.what() + " during computation!";
        }
Alessio Netti's avatar
Alessio Netti committed
403
404
405
406

        if (_timer && _keepRunning) {
            _timer->expires_at(timestamp2ptime(nextReadingTime()));
            _pendingTasks++;
Alessio Netti's avatar
Alessio Netti committed
407
            _timer->async_wait(bind(&AnalyzerTemplate::computeAsync, this));
Alessio Netti's avatar
Alessio Netti committed
408
409
410
411
        }
        _pendingTasks--;
    }

Alessio Netti's avatar
Alessio Netti committed
412
413
414
415
    // Cache for frequently used units in ondemand mode
    map<string, U_Ptr>* _ondemandCache;
    // Helper map to keep track of the cache insertion times
    map<uint64_t, U_Ptr>* _insertionLUT;
Alessio Netti's avatar
Alessio Netti committed
416
    // Vector of pointers to the internal units
Alessio Netti's avatar
Alessio Netti committed
417
    vector<U_Ptr>   _units;
Alessio Netti's avatar
Alessio Netti committed
418
419
    // Vector of pointers to the internal units, casted to UnitInterface - only efficient way to do this in C++
    // unless we use raw arrays
Alessio Netti's avatar
Alessio Netti committed
420
    vector<UnitPtr> _baseUnits;
Alessio Netti's avatar
Alessio Netti committed
421
    // Instance of a QueryEngine object to get sensor data
Alessio Netti's avatar
Alessio Netti committed
422
    QueryEngine&    _queryEngine;
Alessio Netti's avatar
Alessio Netti committed
423
424
425
};

#endif //PROJECT_ANALYZERTEMPLATE_H