AnalyzerInterface.h 12.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//================================================================================
// Name        : AnalyzerInterface.h
// Author      : Alessio Netti
// Copyright   : Leibniz Supercomputing Centre
// Description : Interface to data analyzers.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
16
//
17
18
19
20
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
21
//
22
23
24
25
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
26
27
28
29
30
31

#ifndef PROJECT_ANALYZERINTERFACE_H
#define PROJECT_ANALYZERINTERFACE_H

#include <atomic>
#include <memory>
32
#include <unordered_map>
Alessio Netti's avatar
Alessio Netti committed
33
#include <vector>
Alessio Netti's avatar
Alessio Netti committed
34
#include <map>
Alessio Netti's avatar
Alessio Netti committed
35
36
#include <boost/asio.hpp>

37
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
38
39
#include "UnitInterface.h"

Alessio Netti's avatar
Alessio Netti committed
40
41
using namespace std;

Alessio Netti's avatar
Alessio Netti committed
42
43
// Struct defining a response to a REST request
typedef struct {
Alessio Netti's avatar
Alessio Netti committed
44
45
    string response;
    string data;
Alessio Netti's avatar
Alessio Netti committed
46
47
} restResponse_t;

Alessio Netti's avatar
Alessio Netti committed
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
 * Interface to data analyzers
 *
 * This interface supplies methods to instantiate, start and retrieve data from analyzers, which are "modules" that
 * implement data analytics models and are loaded by DCDB data analytics plugins. For it to be used in the DCDB
 * data analytics framework, the analyzer must comply to this interface.
 *
 * An analyzer acts on "units", which are logical entities represented by certain inputs and outputs. An unit can be,
 * for example, a node, a CPU or a rack in a HPC system.
 */
class AnalyzerInterface {
public:

    /**
    * @brief            Class constructor
    *
    * @param name       Name of the analyzer
    */
Alessio Netti's avatar
Alessio Netti committed
66
    AnalyzerInterface(const string& name) :
Alessio Netti's avatar
Alessio Netti committed
67
68
            _name(name),
            _mqttPart(""),
69
            _isTemplate(false),
70
            _relaxed(false),
Alessio Netti's avatar
Alessio Netti committed
71
72
73
            _duplicate(false),
            _streaming(true),
            _sync(true),
74
            _dynamic(false),
Alessio Netti's avatar
Alessio Netti committed
75
76
77
78
79
            _unitID(-1),
            _keepRunning(0),
            _minValues(1),
            _interval(1000),
            _cacheInterval(900000),
80
            _unitCacheLimit(1000),
Alessio Netti's avatar
Alessio Netti committed
81
            _cacheSize(1),
Alessio Netti's avatar
Alessio Netti committed
82
            _delayInterval(0),
Alessio Netti's avatar
Alessio Netti committed
83
            _pendingTasks(0),
Alessio Netti's avatar
Alessio Netti committed
84
            _onDemandLock(false),
Alessio Netti's avatar
Alessio Netti committed
85
86
87
88
89
90
91
92
            _timer(nullptr) {}

    /**
    * @brief            Copy constructor
    */
    AnalyzerInterface(const AnalyzerInterface& other) :
            _name(other._name),
            _mqttPart(other._mqttPart),
93
            _isTemplate(other._isTemplate),
94
            _relaxed(other._relaxed),
Alessio Netti's avatar
Alessio Netti committed
95
96
97
            _duplicate(other._duplicate),
            _streaming(other._streaming),
            _sync(other._sync),
98
            _dynamic(other._dynamic),
Alessio Netti's avatar
Alessio Netti committed
99
100
101
102
103
            _unitID(other._unitID),
            _keepRunning(other._keepRunning),
            _minValues(other._minValues),
            _interval(other._interval),
            _cacheInterval(other._cacheInterval),
104
            _unitCacheLimit(other._unitCacheLimit),
Alessio Netti's avatar
Alessio Netti committed
105
            _cacheSize(other._cacheSize),
Alessio Netti's avatar
Alessio Netti committed
106
            _delayInterval(other._delayInterval),
Alessio Netti's avatar
Alessio Netti committed
107
108
109
            _pendingTasks(0),
            _onDemandLock(false),
            _timer(nullptr) {}
Alessio Netti's avatar
Alessio Netti committed
110
111
112
113
114
115
116
117
118
119
120
121

    /**
    * @brief            Class destructor
    */
    virtual ~AnalyzerInterface() {}

    /**
    * @brief            Assignment operator
    */
    AnalyzerInterface& operator=(const AnalyzerInterface& other) {
        _name = other._name;
        _mqttPart = other._mqttPart;
122
        _isTemplate = other._isTemplate;
123
        _relaxed = other._relaxed;
Alessio Netti's avatar
Alessio Netti committed
124
125
126
127
        _unitID = other._unitID;
        _duplicate = other._duplicate;
        _streaming = other._streaming;
        _sync = other._sync;
128
        _dynamic = other._dynamic;
Alessio Netti's avatar
Alessio Netti committed
129
130
131
132
        _keepRunning = other._keepRunning;
        _minValues = other._minValues;
        _interval = other._interval;
        _cacheInterval = other._cacheInterval;
133
        _unitCacheLimit = other._unitCacheLimit;
Alessio Netti's avatar
Alessio Netti committed
134
        _cacheSize = other._cacheSize;
Alessio Netti's avatar
Alessio Netti committed
135
        _delayInterval = other._delayInterval;
Alessio Netti's avatar
Alessio Netti committed
136
137
        _pendingTasks.store(0);
        _onDemandLock.store(false);
Alessio Netti's avatar
Alessio Netti committed
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
        _timer = nullptr;

        return *this;
    }

    /**
    * @brief              Waits for the analyzer to complete its tasks
    *
    *                     Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0).
    */
    void wait() {
        while(_pendingTasks) {
            sleep(1);
        }
    }

    /**
    * @brief              Initializes this analyzer
    *
    *                     This method initializes the timer used to schedule tasks. It can be overridden by derived
    *                     classes.
    *
    * @param io           Boost ASIO service to be used
    */
    virtual void init(boost::asio::io_service& io) {
        _cacheSize = _cacheInterval / _interval + 1;
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }

Alessio Netti's avatar
Alessio Netti committed
167
168
169
170
171
172
173
174
175
176
    /**
    * @brief              Perform a REST-triggered PUT action
    *
    *                     This method must be implemented in derived classes. It will perform an action (if any)
    *                     on the analyzer according to the input action string. Any thrown
    *                     exceptions will be reported in the response string.
    *
    * @param action       Name of the action to be performed
    * @param queries      Vector of queries (key-value pairs)
    *
Alessio Netti's avatar
Alessio Netti committed
177
    * @return             Response to the request as a <response, data> pair
Alessio Netti's avatar
Alessio Netti committed
178
    */
179
    virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) = 0;
Alessio Netti's avatar
Alessio Netti committed
180

Alessio Netti's avatar
Alessio Netti committed
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
    /**
    * @brief              Starts this analyzer
    *
    *                     This method must be implemented in derived classes. It will start the operation of the
    *                     analyzer.
    */
    virtual void start() = 0;

    /**
    * @brief              Stops this analyzer
    *
    *                     This method must be implemented in derived classes. It will stop the operation of the
    *                     analyzer.
    */
    virtual void stop() = 0;

    /**
    * @brief              Adds an unit to this analyzer
    *
    *                     This method must be implemented in derived classes. It must add the input UnitInterface to
    *                     the internal structure storing units in the analyzer. Said unit must then be used during
    *                     computation.
    *
    * @param u            Shared pointer to a UnitInterface object
    */
    virtual void addUnit(UnitPtr u)             = 0;

    /**
    * @brief              Clears all the units contained in this analyzer
    *
    *                     This method must be implemented in derived classes.
    */
    virtual void clearUnits()                   = 0;

Alessio Netti's avatar
Alessio Netti committed
215
216
217
218
219
220
221
222
223
224
225
226
227
    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
    *                     perform data analytics queries on the analyzer, which must have the _streaming attribute set
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Sensor tree node that defines the query
    * @return             a map<string, reading_t> containing the output of the query
    */
    virtual map<string, reading_t> computeOnDemand(const string& node="") = 0;

228
229
230
231
232
233
234
    /**
    * @brief            Prints the current analyzer configuration
    *
    * @param ll         Logging level at which the configuration is printed
    */
    virtual void printConfig(LOG_LEVEL ll) = 0;

Alessio Netti's avatar
Alessio Netti committed
235
    // Getter methods
Alessio Netti's avatar
Alessio Netti committed
236
237
    const string& 	    getName()	    const	{ return _name; }
    const string& 	    getMqttPart()	const	{ return _mqttPart; }
238
    bool 	            getTemplate()	const	{ return _isTemplate; }
239
    bool                getRelaxed()    const   { return _relaxed; }
Alessio Netti's avatar
Alessio Netti committed
240
241
242
243
244
245
    bool				getSync()		const	{ return _sync; }
    bool				getDuplicate()	const	{ return _duplicate; }
    bool				getStreaming()	const	{ return _streaming; }
    unsigned			getMinValues()	const	{ return _minValues; }
    unsigned			getInterval()	const	{ return _interval; }
    unsigned			getCacheSize()	const	{ return _cacheSize; }
246
    unsigned            getUnitCacheLimit() const { return _unitCacheLimit; }
Alessio Netti's avatar
Alessio Netti committed
247
    unsigned            getDelayInterval() const { return _delayInterval; }
Alessio Netti's avatar
Alessio Netti committed
248
    int                 getUnitID()     const   { return _unitID; }
249
    bool                getDynamic()            { return _dynamic; }
Alessio Netti's avatar
Alessio Netti committed
250
251

    // Setter methods
Alessio Netti's avatar
Alessio Netti committed
252
253
    void setName(const string& name)	            { _name = name; }
    void setMqttPart(const string& mqttPart)	    { _mqttPart = mqttPart; }
254
    void setTemplate(bool t)                        { _isTemplate = t; }
255
    void setRelaxed(bool r)                         { _relaxed = r; }
Alessio Netti's avatar
Alessio Netti committed
256
257
258
259
260
261
    void setSync(bool sync)							{ _sync = sync; }
    void setUnitID(int u)                           { _unitID = u; }
    void setStreaming(bool streaming)				{ _streaming = streaming; }
    void setDuplicate(bool duplicate)				{ _duplicate = duplicate; }
    void setMinValues(unsigned minValues)			{ _minValues = minValues; }
    void setInterval(unsigned interval)				{ _interval = interval; }
262
    void setUnitCacheLimit(unsigned uc)             { _unitCacheLimit = uc+1; }
Alessio Netti's avatar
Alessio Netti committed
263
    void setCacheInterval(unsigned cacheInterval)	{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
264
    void setDelayInterval(unsigned delayInterval)	{ _delayInterval = delayInterval; }
265
266
    virtual vector<UnitPtr>& getUnits()             = 0;
    virtual void    releaseUnits()                  = 0;
Alessio Netti's avatar
Alessio Netti committed
267
268
269
270
271
272
273
274
275
276
277
278

protected:

    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
    *                     compute() method, which encapsulates the real logic of the analyzer. The compute method
    *                     is automatically called over units as required by the Analyzer's configuration.
    *
    */
    virtual void computeAsync() = 0;
279
    
Alessio Netti's avatar
Alessio Netti committed
280
    // Name of this analyzer
Alessio Netti's avatar
Alessio Netti committed
281
    string _name;
Alessio Netti's avatar
Alessio Netti committed
282
    // MQTT part (see docs) of this analyzer
Alessio Netti's avatar
Alessio Netti committed
283
    string	_mqttPart;
Alessio Netti's avatar
Alessio Netti committed
284

285
286
    // To distinguish between templates and actual analyzers
    bool _isTemplate;
287
288
    // If the analyzer's units must be built in relaxed mode
    bool _relaxed;
Alessio Netti's avatar
Alessio Netti committed
289
290
291
292
293
294
    // If true, the analyzer is a duplicate of another
    bool _duplicate;
    // If true, the analyzer performs computation in streaming
    bool _streaming;
    // If true, the computation intervals are synchronized
    bool _sync;
295
296
    // Indicates whether the analyzer generates units dynamically at runtime, or only at initialization
    bool _dynamic;
Alessio Netti's avatar
Alessio Netti committed
297
298
299
300
301
302
303
304
305
306
    // ID of the units this analyzer works on
    int _unitID;
    // Determines if the analyzer can keep running or must terminate
    int _keepRunning;
    // Minimum number of sensor values to be accumulated before output can be sent
    unsigned int _minValues;
    // Sampling period regulating compute batches
    unsigned int _interval;
    // Size of the cache in time for the output sensors in this analyzer
    unsigned int _cacheInterval;
307
308
    // Maximum number of units that can be contained in the unit cache
    unsigned int _unitCacheLimit;
Alessio Netti's avatar
Alessio Netti committed
309
310
    // Real size of the cache, as determined from cacheInterval
    unsigned int _cacheSize;
Alessio Netti's avatar
Alessio Netti committed
311
312
    // Time in seconds to wait for before starting computation
    unsigned int _delayInterval;
Alessio Netti's avatar
Alessio Netti committed
313
    // Number of pending ASIO tasks
Alessio Netti's avatar
Alessio Netti committed
314
315
316
    atomic_uint _pendingTasks;
    // Lock used to serialize access to the ondemand functionality
    atomic_bool _onDemandLock;
Alessio Netti's avatar
Alessio Netti committed
317
    // Timer for scheduling tasks
Alessio Netti's avatar
Alessio Netti committed
318
    unique_ptr<boost::asio::deadline_timer> _timer;
Alessio Netti's avatar
Alessio Netti committed
319
320
321
322
323
324

    // Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};

//for better readability
Alessio Netti's avatar
Alessio Netti committed
325
using AnalyzerPtr = shared_ptr<AnalyzerInterface>;
Alessio Netti's avatar
Alessio Netti committed
326
327

#endif //PROJECT_ANALYZERINTERFACE_H