OperatorInterface.h 14.3 KB
Newer Older
1
//================================================================================
2
// Name        : OperatorInterface.h
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Interface to data operators.
7
8
9
10
11
12
13
14
15
16
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28
/**
29
 * @defgroup operator Operator Plugins
30
31
 * @ingroup analytics
 *
32
 * @brief Operator for the analytics plugin.
33
 */
34
35
#ifndef PROJECT_OPERATORINTERFACE_H
#define PROJECT_OPERATORINTERFACE_H
Alessio Netti's avatar
Alessio Netti committed
36
37
38

#include <atomic>
#include <memory>
39
#include <unordered_map>
Alessio Netti's avatar
Alessio Netti committed
40
#include <vector>
Alessio Netti's avatar
Alessio Netti committed
41
#include <map>
Alessio Netti's avatar
Alessio Netti committed
42
43
#include <boost/asio.hpp>

44
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
45
46
#include "UnitInterface.h"

Alessio Netti's avatar
Alessio Netti committed
47
48
using namespace std;

Alessio Netti's avatar
Alessio Netti committed
49
50
// Struct defining a response to a REST request
typedef struct {
Alessio Netti's avatar
Alessio Netti committed
51
52
    string response;
    string data;
Alessio Netti's avatar
Alessio Netti committed
53
54
} restResponse_t;

Alessio Netti's avatar
Alessio Netti committed
55
/**
56
 * @brief Interface to data operators
57
58
 *
 * @details This interface supplies methods to instantiate, start and retrieve
59
 *          data from operators, which are "modules" that* implement data
60
 *          analytics models and are loaded by DCDB data analytics plugins. For
61
 *          it to be used in the DCDB data analytics framework, the operator
62
 *          must comply to this interface.
Alessio Netti's avatar
Alessio Netti committed
63
 *
64
 *          An operator acts on "units", which are logical entities represented
65
66
 *          by certain inputs and outputs. An unit can be, for example, a node,
 *          a CPU or a rack in a HPC system.
Alessio Netti's avatar
Alessio Netti committed
67
 *
68
 * @ingroup operator
Alessio Netti's avatar
Alessio Netti committed
69
 */
70
class OperatorInterface {
Alessio Netti's avatar
Alessio Netti committed
71
72
73
74
75
public:

    /**
    * @brief            Class constructor
    *
76
    * @param name       Name of the operator
Alessio Netti's avatar
Alessio Netti committed
77
    */
78
    OperatorInterface(const string& name) :
Alessio Netti's avatar
Alessio Netti committed
79
80
            _name(name),
            _mqttPart(""),
81
            _isTemplate(false),
82
            _relaxed(false),
Alessio Netti's avatar
Alessio Netti committed
83
84
85
            _duplicate(false),
            _streaming(true),
            _sync(true),
86
            _dynamic(false),
Alessio Netti's avatar
Alessio Netti committed
87
            _disabled(false),
88
            _flatten(false),
Alessio Netti's avatar
Alessio Netti committed
89
90
91
92
93
            _unitID(-1),
            _keepRunning(0),
            _minValues(1),
            _interval(1000),
            _cacheInterval(900000),
94
            _unitCacheLimit(1000),
Alessio Netti's avatar
Alessio Netti committed
95
            _cacheSize(1),
96
            _delayInterval(10),
Alessio Netti's avatar
Alessio Netti committed
97
            _pendingTasks(0),
Alessio Netti's avatar
Alessio Netti committed
98
            _onDemandLock(false),
Alessio Netti's avatar
Alessio Netti committed
99
100
101
102
103
            _timer(nullptr) {}

    /**
    * @brief            Copy constructor
    */
104
    OperatorInterface(const OperatorInterface& other) :
Alessio Netti's avatar
Alessio Netti committed
105
106
            _name(other._name),
            _mqttPart(other._mqttPart),
107
            _isTemplate(other._isTemplate),
108
            _relaxed(other._relaxed),
Alessio Netti's avatar
Alessio Netti committed
109
110
111
            _duplicate(other._duplicate),
            _streaming(other._streaming),
            _sync(other._sync),
112
            _dynamic(other._dynamic),
Alessio Netti's avatar
Alessio Netti committed
113
            _disabled(other._disabled),
114
            _flatten(other._flatten),
Alessio Netti's avatar
Alessio Netti committed
115
116
117
118
119
            _unitID(other._unitID),
            _keepRunning(other._keepRunning),
            _minValues(other._minValues),
            _interval(other._interval),
            _cacheInterval(other._cacheInterval),
120
            _unitCacheLimit(other._unitCacheLimit),
Alessio Netti's avatar
Alessio Netti committed
121
            _cacheSize(other._cacheSize),
Alessio Netti's avatar
Alessio Netti committed
122
            _delayInterval(other._delayInterval),
Alessio Netti's avatar
Alessio Netti committed
123
124
125
            _pendingTasks(0),
            _onDemandLock(false),
            _timer(nullptr) {}
Alessio Netti's avatar
Alessio Netti committed
126
127
128
129

    /**
    * @brief            Class destructor
    */
130
    virtual ~OperatorInterface() {}
Alessio Netti's avatar
Alessio Netti committed
131
132
133
134

    /**
    * @brief            Assignment operator
    */
135
    OperatorInterface& operator=(const OperatorInterface& other) {
Alessio Netti's avatar
Alessio Netti committed
136
137
        _name = other._name;
        _mqttPart = other._mqttPart;
138
        _isTemplate = other._isTemplate;
139
        _relaxed = other._relaxed;
Alessio Netti's avatar
Alessio Netti committed
140
141
142
143
        _unitID = other._unitID;
        _duplicate = other._duplicate;
        _streaming = other._streaming;
        _sync = other._sync;
144
        _dynamic = other._dynamic;
Alessio Netti's avatar
Alessio Netti committed
145
        _disabled = other._disabled;
146
        _flatten = other._flatten;
Alessio Netti's avatar
Alessio Netti committed
147
148
149
150
        _keepRunning = other._keepRunning;
        _minValues = other._minValues;
        _interval = other._interval;
        _cacheInterval = other._cacheInterval;
151
        _unitCacheLimit = other._unitCacheLimit;
Alessio Netti's avatar
Alessio Netti committed
152
        _cacheSize = other._cacheSize;
Alessio Netti's avatar
Alessio Netti committed
153
        _delayInterval = other._delayInterval;
Alessio Netti's avatar
Alessio Netti committed
154
155
        _pendingTasks.store(0);
        _onDemandLock.store(false);
Alessio Netti's avatar
Alessio Netti committed
156
157
158
159
        _timer = nullptr;

        return *this;
    }
160
    
Alessio Netti's avatar
Alessio Netti committed
161
    /**
162
    * @brief              Initializes this operator
Alessio Netti's avatar
Alessio Netti committed
163
164
165
166
167
168
169
170
171
172
173
    *
    *                     This method initializes the timer used to schedule tasks. It can be overridden by derived
    *                     classes.
    *
    * @param io           Boost ASIO service to be used
    */
    virtual void init(boost::asio::io_service& io) {
        _cacheSize = _cacheInterval / _interval + 1;
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }

Alessio Netti's avatar
Alessio Netti committed
174
175
176
177
    /**
    * @brief              Perform a REST-triggered PUT action
    *
    *                     This method must be implemented in derived classes. It will perform an action (if any)
178
    *                     on the operator according to the input action string. Any thrown
Alessio Netti's avatar
Alessio Netti committed
179
180
181
182
183
    *                     exceptions will be reported in the response string.
    *
    * @param action       Name of the action to be performed
    * @param queries      Vector of queries (key-value pairs)
    *
Alessio Netti's avatar
Alessio Netti committed
184
    * @return             Response to the request as a <response, data> pair
Alessio Netti's avatar
Alessio Netti committed
185
    */
186
    virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) = 0;
Alessio Netti's avatar
Alessio Netti committed
187

188
189
190
191
192
193
194
    /**
    * @brief              Waits for the operator to complete its tasks
    *
    *                     This method must be implemented in derived classes.
    */
    virtual void wait() = 0;
    
Alessio Netti's avatar
Alessio Netti committed
195
    /**
196
    * @brief              Starts this operator
Alessio Netti's avatar
Alessio Netti committed
197
198
    *
    *                     This method must be implemented in derived classes. It will start the operation of the
199
    *                     operator.
Alessio Netti's avatar
Alessio Netti committed
200
201
202
203
    */
    virtual void start() = 0;

    /**
204
    * @brief              Stops this operator
Alessio Netti's avatar
Alessio Netti committed
205
206
    *
    *                     This method must be implemented in derived classes. It will stop the operation of the
207
    *                     operator.
Alessio Netti's avatar
Alessio Netti committed
208
209
210
211
    */
    virtual void stop() = 0;

    /**
212
    * @brief              Adds an unit to this operator
Alessio Netti's avatar
Alessio Netti committed
213
214
    *
    *                     This method must be implemented in derived classes. It must add the input UnitInterface to
215
    *                     the internal structure storing units in the operator. Said unit must then be used during
Alessio Netti's avatar
Alessio Netti committed
216
217
218
219
220
221
222
    *                     computation.
    *
    * @param u            Shared pointer to a UnitInterface object
    */
    virtual void addUnit(UnitPtr u)             = 0;

    /**
223
    * @brief              Clears all the units contained in this operator
Alessio Netti's avatar
Alessio Netti committed
224
225
226
227
228
    *
    *                     This method must be implemented in derived classes.
    */
    virtual void clearUnits()                   = 0;

Alessio Netti's avatar
Alessio Netti committed
229
230
231
232
    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
233
    *                     perform data analytics queries on the operator, which must have the _streaming attribute set
Alessio Netti's avatar
Alessio Netti committed
234
235
236
237
238
239
240
241
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Sensor tree node that defines the query
    * @return             a map<string, reading_t> containing the output of the query
    */
    virtual map<string, reading_t> computeOnDemand(const string& node="") = 0;

242
    /**
243
    * @brief            Prints the current operator configuration
244
245
246
247
248
    *
    * @param ll         Logging level at which the configuration is printed
    */
    virtual void printConfig(LOG_LEVEL ll) = 0;

Alessio Netti's avatar
Alessio Netti committed
249
    // Getter methods
250
251
252
253
254
255
256
257
258
259
    const string& 	    getName()	        const { return _name; }
    const string& 	    getMqttPart()	    const { return _mqttPart; }
    bool 	            getTemplate()	    const { return _isTemplate; }
    bool                getRelaxed()        const { return _relaxed; }
    bool				getSync()		    const { return _sync; }
    bool				getDuplicate()	    const { return _duplicate; }
    bool				getStreaming()	    const { return _streaming; }
    unsigned			getMinValues()	    const { return _minValues; }
    unsigned			getInterval()	    const { return _interval; }
    unsigned			getCacheSize()	    const { return _cacheSize; }
260
    unsigned            getUnitCacheLimit() const { return _unitCacheLimit; }
261
262
263
    unsigned            getDelayInterval()  const { return _delayInterval; }
    int                 getUnitID()         const { return _unitID; }
    bool                getDynamic()        const { return _dynamic; }
Alessio Netti's avatar
Alessio Netti committed
264
    bool                getDisabled()       const { return _disabled; }
265
    bool                getFlatten()        const { return _flatten; }
Alessio Netti's avatar
Alessio Netti committed
266
267

    // Setter methods
Alessio Netti's avatar
Alessio Netti committed
268
269
    void setName(const string& name)	            { _name = name; }
    void setMqttPart(const string& mqttPart)	    { _mqttPart = mqttPart; }
270
    void setTemplate(bool t)                        { _isTemplate = t; }
271
    void setRelaxed(bool r)                         { _relaxed = r; }
Alessio Netti's avatar
Alessio Netti committed
272
273
274
275
    void setSync(bool sync)							{ _sync = sync; }
    void setUnitID(int u)                           { _unitID = u; }
    void setStreaming(bool streaming)				{ _streaming = streaming; }
    void setDuplicate(bool duplicate)				{ _duplicate = duplicate; }
Alessio Netti's avatar
Alessio Netti committed
276
    void setDisabled(bool disabled)                 { _disabled = disabled; }
Alessio Netti's avatar
Alessio Netti committed
277
278
    void setMinValues(unsigned minValues)			{ _minValues = minValues; }
    void setInterval(unsigned interval)				{ _interval = interval; }
279
    void setUnitCacheLimit(unsigned uc)             { _unitCacheLimit = uc+1; }
Alessio Netti's avatar
Alessio Netti committed
280
    void setCacheInterval(unsigned cacheInterval)	{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
281
    void setDelayInterval(unsigned delayInterval)	{ _delayInterval = delayInterval; }
282
283
    virtual vector<UnitPtr>& getUnits()             = 0;
    virtual void    releaseUnits()                  = 0;
Alessio Netti's avatar
Alessio Netti committed
284
285
286

protected:

287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
    /**
     * @brief             Implement plugin specific actions to initialize an operator here.
     *
     * @details           If a derived class requires further custom
     *                    actions for initialization, this should be implemented here.
     */
    virtual void execOnInit() {}

    /**
     * @brief             Implement plugin-specific actions to start an operator here.
     *
     * @details           If a derived class (i.e. a plugin group) requires further custom
     *                    actions to start analytics, this should be implemented here.
     *
     * @return            True on success, false otherwise.
     */
    virtual bool execOnStart() { return true; }

    /**
     * @brief             Implement plugin specific actions to stop a group here.
     *
     * @details           If a derived class requires further custom actions 
     *                    to stop operation this should be implemented here.
     */
    virtual void execOnStop() {}

Alessio Netti's avatar
Alessio Netti committed
313
314
315
    /**
    * @brief              Performs a compute task
    *
316
    * @details            This method is tasked with scheduling the next compute task, and invoking the internal
317
318
    *                     compute() method, which encapsulates the real logic of the operator. The compute method
    *                     is automatically called over units as required by the Operator's configuration.
Alessio Netti's avatar
Alessio Netti committed
319
320
321
    *
    */
    virtual void computeAsync() = 0;
322
    
323
    // Name of this operator
Alessio Netti's avatar
Alessio Netti committed
324
    string _name;
325
    // MQTT part (see docs) of this operator
Alessio Netti's avatar
Alessio Netti committed
326
    string	_mqttPart;
Alessio Netti's avatar
Alessio Netti committed
327

328
    // To distinguish between templates and actual operators
329
    bool _isTemplate;
330
    // If the operator's units must be built in relaxed mode
331
    bool _relaxed;
332
    // If true, the operator is a duplicate of another
Alessio Netti's avatar
Alessio Netti committed
333
    bool _duplicate;
334
    // If true, the operator performs computation in streaming
Alessio Netti's avatar
Alessio Netti committed
335
336
337
    bool _streaming;
    // If true, the computation intervals are synchronized
    bool _sync;
338
    // Indicates whether the operator generates units dynamically at runtime, or only at initialization
339
    bool _dynamic;
Alessio Netti's avatar
Alessio Netti committed
340
341
    // If true, the operator is initialized but "disabled" and cannot be used
    bool _disabled;
342
    // Indicates whether the operator generates hierarchical units that must be flattened (their sub-units exposed)
343
    bool _flatten;
344
    // ID of the units this operator works on
Alessio Netti's avatar
Alessio Netti committed
345
    int _unitID;
346
    // Determines if the operator can keep running or must terminate
Alessio Netti's avatar
Alessio Netti committed
347
348
349
350
351
    int _keepRunning;
    // Minimum number of sensor values to be accumulated before output can be sent
    unsigned int _minValues;
    // Sampling period regulating compute batches
    unsigned int _interval;
352
    // Size of the cache in time for the output sensors in this operator
Alessio Netti's avatar
Alessio Netti committed
353
    unsigned int _cacheInterval;
354
355
    // Maximum number of units that can be contained in the unit cache
    unsigned int _unitCacheLimit;
Alessio Netti's avatar
Alessio Netti committed
356
357
    // Real size of the cache, as determined from cacheInterval
    unsigned int _cacheSize;
Alessio Netti's avatar
Alessio Netti committed
358
359
    // Time in seconds to wait for before starting computation
    unsigned int _delayInterval;
Alessio Netti's avatar
Alessio Netti committed
360
    // Number of pending ASIO tasks
Alessio Netti's avatar
Alessio Netti committed
361
362
363
    atomic_uint _pendingTasks;
    // Lock used to serialize access to the ondemand functionality
    atomic_bool _onDemandLock;
Alessio Netti's avatar
Alessio Netti committed
364
    // Timer for scheduling tasks
Alessio Netti's avatar
Alessio Netti committed
365
    unique_ptr<boost::asio::deadline_timer> _timer;
Alessio Netti's avatar
Alessio Netti committed
366
367
368
369
370
371

    // Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};

//for better readability
372
using OperatorPtr = shared_ptr<OperatorInterface>;
Alessio Netti's avatar
Alessio Netti committed
373

374
#endif //PROJECT_OPERATORINTERFACE_H