2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

AnalyzerInterface.h 10.5 KB
Newer Older
Alessio Netti's avatar
Alessio Netti committed
1
2
3
4
5
6
7
8
9
10
//
// Created by Netti, Alessio on 10.12.18.
//

#ifndef PROJECT_ANALYZERINTERFACE_H
#define PROJECT_ANALYZERINTERFACE_H

#include <atomic>
#include <memory>
#include <vector>
Alessio Netti's avatar
Alessio Netti committed
11
#include <map>
Alessio Netti's avatar
Alessio Netti committed
12
13
#include <boost/asio.hpp>

14
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
15
16
#include "UnitInterface.h"

Alessio Netti's avatar
Alessio Netti committed
17
18
using namespace std;

Alessio Netti's avatar
Alessio Netti committed
19
20
// Struct defining a response to a REST request
typedef struct {
Alessio Netti's avatar
Alessio Netti committed
21
22
    string response;
    string data;
Alessio Netti's avatar
Alessio Netti committed
23
24
} restResponse_t;

Alessio Netti's avatar
Alessio Netti committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * Interface to data analyzers
 *
 * This interface supplies methods to instantiate, start and retrieve data from analyzers, which are "modules" that
 * implement data analytics models and are loaded by DCDB data analytics plugins. For it to be used in the DCDB
 * data analytics framework, the analyzer must comply to this interface.
 *
 * An analyzer acts on "units", which are logical entities represented by certain inputs and outputs. An unit can be,
 * for example, a node, a CPU or a rack in a HPC system.
 */
class AnalyzerInterface {
public:

    /**
    * @brief            Class constructor
    *
    * @param name       Name of the analyzer
    */
Alessio Netti's avatar
Alessio Netti committed
43
    AnalyzerInterface(const string& name) :
Alessio Netti's avatar
Alessio Netti committed
44
45
            _name(name),
            _mqttPart(""),
46
            _isTemplate(false),
Alessio Netti's avatar
Alessio Netti committed
47
48
49
50
51
52
53
54
55
            _duplicate(false),
            _streaming(true),
            _sync(true),
            _unitID(-1),
            _keepRunning(0),
            _minValues(1),
            _interval(1000),
            _cacheInterval(900000),
            _cacheSize(1),
Alessio Netti's avatar
Alessio Netti committed
56
            _delayInterval(0),
Alessio Netti's avatar
Alessio Netti committed
57
            _pendingTasks(0),
Alessio Netti's avatar
Alessio Netti committed
58
            _onDemandLock(false),
Alessio Netti's avatar
Alessio Netti committed
59
60
61
62
63
64
65
66
            _timer(nullptr) {}

    /**
    * @brief            Copy constructor
    */
    AnalyzerInterface(const AnalyzerInterface& other) :
            _name(other._name),
            _mqttPart(other._mqttPart),
67
            _isTemplate(other._isTemplate),
Alessio Netti's avatar
Alessio Netti committed
68
69
70
71
72
73
74
75
76
            _duplicate(other._duplicate),
            _streaming(other._streaming),
            _sync(other._sync),
            _unitID(other._unitID),
            _keepRunning(other._keepRunning),
            _minValues(other._minValues),
            _interval(other._interval),
            _cacheInterval(other._cacheInterval),
            _cacheSize(other._cacheSize),
Alessio Netti's avatar
Alessio Netti committed
77
            _delayInterval(other._delayInterval),
Alessio Netti's avatar
Alessio Netti committed
78
79
80
            _pendingTasks(0),
            _onDemandLock(false),
            _timer(nullptr) {}
Alessio Netti's avatar
Alessio Netti committed
81
82
83
84
85
86
87
88
89
90
91
92

    /**
    * @brief            Class destructor
    */
    virtual ~AnalyzerInterface() {}

    /**
    * @brief            Assignment operator
    */
    AnalyzerInterface& operator=(const AnalyzerInterface& other) {
        _name = other._name;
        _mqttPart = other._mqttPart;
93
        _isTemplate = other._isTemplate;
Alessio Netti's avatar
Alessio Netti committed
94
95
96
97
98
99
100
101
102
        _unitID = other._unitID;
        _duplicate = other._duplicate;
        _streaming = other._streaming;
        _sync = other._sync;
        _keepRunning = other._keepRunning;
        _minValues = other._minValues;
        _interval = other._interval;
        _cacheInterval = other._cacheInterval;
        _cacheSize = other._cacheSize;
Alessio Netti's avatar
Alessio Netti committed
103
        _delayInterval = other._delayInterval;
Alessio Netti's avatar
Alessio Netti committed
104
105
        _pendingTasks.store(0);
        _onDemandLock.store(false);
Alessio Netti's avatar
Alessio Netti committed
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
        _timer = nullptr;

        return *this;
    }

    /**
    * @brief              Waits for the analyzer to complete its tasks
    *
    *                     Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0).
    */
    void wait() {
        while(_pendingTasks) {
            sleep(1);
        }
    }

    /**
    * @brief              Initializes this analyzer
    *
    *                     This method initializes the timer used to schedule tasks. It can be overridden by derived
    *                     classes.
    *
    * @param io           Boost ASIO service to be used
    */
    virtual void init(boost::asio::io_service& io) {
        _cacheSize = _cacheInterval / _interval + 1;
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }

Alessio Netti's avatar
Alessio Netti committed
135
136
137
138
139
140
141
142
143
144
    /**
    * @brief              Perform a REST-triggered PUT action
    *
    *                     This method must be implemented in derived classes. It will perform an action (if any)
    *                     on the analyzer according to the input action string. Any thrown
    *                     exceptions will be reported in the response string.
    *
    * @param action       Name of the action to be performed
    * @param queries      Vector of queries (key-value pairs)
    *
Alessio Netti's avatar
Alessio Netti committed
145
    * @return             Response to the request as a <response, data> pair
Alessio Netti's avatar
Alessio Netti committed
146
    */
Alessio Netti's avatar
Alessio Netti committed
147
    virtual restResponse_t REST(const string& action, const vector<pair<string,string>>& queries) = 0;
Alessio Netti's avatar
Alessio Netti committed
148

Alessio Netti's avatar
Alessio Netti committed
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
    /**
    * @brief              Starts this analyzer
    *
    *                     This method must be implemented in derived classes. It will start the operation of the
    *                     analyzer.
    */
    virtual void start() = 0;

    /**
    * @brief              Stops this analyzer
    *
    *                     This method must be implemented in derived classes. It will stop the operation of the
    *                     analyzer.
    */
    virtual void stop() = 0;

    /**
    * @brief              Adds an unit to this analyzer
    *
    *                     This method must be implemented in derived classes. It must add the input UnitInterface to
    *                     the internal structure storing units in the analyzer. Said unit must then be used during
    *                     computation.
    *
    * @param u            Shared pointer to a UnitInterface object
    */
    virtual void addUnit(UnitPtr u)             = 0;

    /**
    * @brief              Clears all the units contained in this analyzer
    *
    *                     This method must be implemented in derived classes.
    */
    virtual void clearUnits()                   = 0;

Alessio Netti's avatar
Alessio Netti committed
183
184
185
186
187
188
189
190
191
192
193
194
195
    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
    *                     perform data analytics queries on the analyzer, which must have the _streaming attribute set
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Sensor tree node that defines the query
    * @return             a map<string, reading_t> containing the output of the query
    */
    virtual map<string, reading_t> computeOnDemand(const string& node="") = 0;

196
197
198
199
200
201
202
    /**
    * @brief            Prints the current analyzer configuration
    *
    * @param ll         Logging level at which the configuration is printed
    */
    virtual void printConfig(LOG_LEVEL ll) = 0;

Alessio Netti's avatar
Alessio Netti committed
203
    // Getter methods
Alessio Netti's avatar
Alessio Netti committed
204
205
    const string& 	    getName()	    const	{ return _name; }
    const string& 	    getMqttPart()	const	{ return _mqttPart; }
206
    bool 	            getTemplate()	const	{ return _isTemplate; }
Alessio Netti's avatar
Alessio Netti committed
207
208
209
210
211
212
    bool				getSync()		const	{ return _sync; }
    bool				getDuplicate()	const	{ return _duplicate; }
    bool				getStreaming()	const	{ return _streaming; }
    unsigned			getMinValues()	const	{ return _minValues; }
    unsigned			getInterval()	const	{ return _interval; }
    unsigned			getCacheSize()	const	{ return _cacheSize; }
Alessio Netti's avatar
Alessio Netti committed
213
    unsigned            getDelayInterval() const { return _delayInterval; }
Alessio Netti's avatar
Alessio Netti committed
214
215
216
    int                 getUnitID()     const   { return _unitID; }

    // Setter methods
Alessio Netti's avatar
Alessio Netti committed
217
218
    void setName(const string& name)	            { _name = name; }
    void setMqttPart(const string& mqttPart)	    { _mqttPart = mqttPart; }
219
    void setTemplate(bool t)                        { _isTemplate = t; }
Alessio Netti's avatar
Alessio Netti committed
220
221
222
223
224
225
226
    void setSync(bool sync)							{ _sync = sync; }
    void setUnitID(int u)                           { _unitID = u; }
    void setStreaming(bool streaming)				{ _streaming = streaming; }
    void setDuplicate(bool duplicate)				{ _duplicate = duplicate; }
    void setMinValues(unsigned minValues)			{ _minValues = minValues; }
    void setInterval(unsigned interval)				{ _interval = interval; }
    void setCacheInterval(unsigned cacheInterval)	{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
227
    void setDelayInterval(unsigned delayInterval)	{ _delayInterval = delayInterval; }
Alessio Netti's avatar
Alessio Netti committed
228
    virtual vector<UnitPtr>& getUnits()             = 0;
Alessio Netti's avatar
Alessio Netti committed
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253

protected:

    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
    *                     compute() method, which encapsulates the real logic of the analyzer. The compute method
    *                     is automatically called over units as required by the Analyzer's configuration.
    *
    */
    virtual void computeAsync() = 0;

    /**
    * @brief              Data analytics computation logic
    *
    *                     This method contains the actual logic used by the analyzed, and is automatically called by
    *                     the computeAsync method.
    *
    * @param unitID       index of the unit over which computation must be performed
    */
    virtual void compute(int unitID) = 0;


    // Name of this analyzer
Alessio Netti's avatar
Alessio Netti committed
254
    string _name;
Alessio Netti's avatar
Alessio Netti committed
255
    // MQTT part (see docs) of this analyzer
Alessio Netti's avatar
Alessio Netti committed
256
    string	_mqttPart;
Alessio Netti's avatar
Alessio Netti committed
257

258
259
    // To distinguish between templates and actual analyzers
    bool _isTemplate;
Alessio Netti's avatar
Alessio Netti committed
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
    // If true, the analyzer is a duplicate of another
    bool _duplicate;
    // If true, the analyzer performs computation in streaming
    bool _streaming;
    // If true, the computation intervals are synchronized
    bool _sync;
    // ID of the units this analyzer works on
    int _unitID;
    // Determines if the analyzer can keep running or must terminate
    int _keepRunning;
    // Minimum number of sensor values to be accumulated before output can be sent
    unsigned int _minValues;
    // Sampling period regulating compute batches
    unsigned int _interval;
    // Size of the cache in time for the output sensors in this analyzer
    unsigned int _cacheInterval;
    // Real size of the cache, as determined from cacheInterval
    unsigned int _cacheSize;
Alessio Netti's avatar
Alessio Netti committed
278
279
    // Time in seconds to wait for before starting computation
    unsigned int _delayInterval;
Alessio Netti's avatar
Alessio Netti committed
280
    // Number of pending ASIO tasks
Alessio Netti's avatar
Alessio Netti committed
281
282
283
    atomic_uint _pendingTasks;
    // Lock used to serialize access to the ondemand functionality
    atomic_bool _onDemandLock;
Alessio Netti's avatar
Alessio Netti committed
284
    // Timer for scheduling tasks
Alessio Netti's avatar
Alessio Netti committed
285
    unique_ptr<boost::asio::deadline_timer> _timer;
Alessio Netti's avatar
Alessio Netti committed
286
287
288
289
290
291

    // Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};

//for better readability
Alessio Netti's avatar
Alessio Netti committed
292
using AnalyzerPtr = shared_ptr<AnalyzerInterface>;
Alessio Netti's avatar
Alessio Netti committed
293
294

#endif //PROJECT_ANALYZERINTERFACE_H