OperatorInterface.h 14.9 KB
Newer Older
1
//================================================================================
2
// Name        : OperatorInterface.h
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Interface to data operators.
7
8
9
10
11
12
13
14
15
16
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28
/**
29
 * @defgroup operator Operator Plugins
30
31
 * @ingroup analytics
 *
32
 * @brief Operator for the analytics plugin.
33
 */
34
35
#ifndef PROJECT_OPERATORINTERFACE_H
#define PROJECT_OPERATORINTERFACE_H
Alessio Netti's avatar
Alessio Netti committed
36
37
38

#include <atomic>
#include <memory>
39
#include <unordered_map>
Alessio Netti's avatar
Alessio Netti committed
40
#include <vector>
Alessio Netti's avatar
Alessio Netti committed
41
#include <map>
Alessio Netti's avatar
Alessio Netti committed
42
43
#include <boost/asio.hpp>

44
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
45
46
#include "UnitInterface.h"

Alessio Netti's avatar
Alessio Netti committed
47
48
using namespace std;

Alessio Netti's avatar
Alessio Netti committed
49
50
// Struct defining a response to a REST request
typedef struct {
Alessio Netti's avatar
Alessio Netti committed
51
52
    string response;
    string data;
Alessio Netti's avatar
Alessio Netti committed
53
54
} restResponse_t;

Alessio Netti's avatar
Alessio Netti committed
55
/**
56
 * @brief Interface to data operators
57
58
 *
 * @details This interface supplies methods to instantiate, start and retrieve
59
 *          data from operators, which are "modules" that* implement data
60
 *          analytics models and are loaded by DCDB data analytics plugins. For
61
 *          it to be used in the DCDB data analytics framework, the operator
62
 *          must comply to this interface.
Alessio Netti's avatar
Alessio Netti committed
63
 *
64
 *          An operator acts on "units", which are logical entities represented
65
66
 *          by certain inputs and outputs. An unit can be, for example, a node,
 *          a CPU or a rack in a HPC system.
Alessio Netti's avatar
Alessio Netti committed
67
 *
68
 * @ingroup operator
Alessio Netti's avatar
Alessio Netti committed
69
 */
70
class OperatorInterface {
Alessio Netti's avatar
Alessio Netti committed
71
72
73
74
75
public:

    /**
    * @brief            Class constructor
    *
76
    * @param name       Name of the operator
Alessio Netti's avatar
Alessio Netti committed
77
    */
78
    OperatorInterface(const string& name) :
Alessio Netti's avatar
Alessio Netti committed
79
80
            _name(name),
            _mqttPart(""),
81
            _isTemplate(false),
82
            _relaxed(false),
83
            _enforceTopics(false),
Alessio Netti's avatar
Alessio Netti committed
84
85
86
            _duplicate(false),
            _streaming(true),
            _sync(true),
87
            _dynamic(false),
Alessio Netti's avatar
Alessio Netti committed
88
            _disabled(false),
Alessio Netti's avatar
Alessio Netti committed
89
90
91
92
            _unitID(-1),
            _keepRunning(0),
            _minValues(1),
            _interval(1000),
93
            _queueSize(1024),
Alessio Netti's avatar
Alessio Netti committed
94
            _cacheInterval(900000),
95
            _unitCacheLimit(1000),
Alessio Netti's avatar
Alessio Netti committed
96
            _cacheSize(1),
97
            _delayInterval(10),
Alessio Netti's avatar
Alessio Netti committed
98
            _pendingTasks(0),
Alessio Netti's avatar
Alessio Netti committed
99
            _onDemandLock(false),
Alessio Netti's avatar
Alessio Netti committed
100
101
102
103
104
            _timer(nullptr) {}

    /**
    * @brief            Copy constructor
    */
105
    OperatorInterface(const OperatorInterface& other) :
Alessio Netti's avatar
Alessio Netti committed
106
107
            _name(other._name),
            _mqttPart(other._mqttPart),
108
            _isTemplate(other._isTemplate),
109
            _relaxed(other._relaxed),
110
            _enforceTopics(other._enforceTopics),
Alessio Netti's avatar
Alessio Netti committed
111
112
113
            _duplicate(other._duplicate),
            _streaming(other._streaming),
            _sync(other._sync),
114
            _dynamic(other._dynamic),
Alessio Netti's avatar
Alessio Netti committed
115
            _disabled(other._disabled),
Alessio Netti's avatar
Alessio Netti committed
116
117
118
119
            _unitID(other._unitID),
            _keepRunning(other._keepRunning),
            _minValues(other._minValues),
            _interval(other._interval),
120
            _queueSize(other._queueSize),
Alessio Netti's avatar
Alessio Netti committed
121
            _cacheInterval(other._cacheInterval),
122
            _unitCacheLimit(other._unitCacheLimit),
Alessio Netti's avatar
Alessio Netti committed
123
            _cacheSize(other._cacheSize),
Alessio Netti's avatar
Alessio Netti committed
124
            _delayInterval(other._delayInterval),
Alessio Netti's avatar
Alessio Netti committed
125
126
127
            _pendingTasks(0),
            _onDemandLock(false),
            _timer(nullptr) {}
Alessio Netti's avatar
Alessio Netti committed
128
129
130
131

    /**
    * @brief            Class destructor
    */
132
    virtual ~OperatorInterface() {}
Alessio Netti's avatar
Alessio Netti committed
133
134
135
136

    /**
    * @brief            Assignment operator
    */
137
    OperatorInterface& operator=(const OperatorInterface& other) {
Alessio Netti's avatar
Alessio Netti committed
138
139
        _name = other._name;
        _mqttPart = other._mqttPart;
140
        _isTemplate = other._isTemplate;
141
        _relaxed = other._relaxed;
142
        _enforceTopics = other._enforceTopics;
Alessio Netti's avatar
Alessio Netti committed
143
144
145
146
        _unitID = other._unitID;
        _duplicate = other._duplicate;
        _streaming = other._streaming;
        _sync = other._sync;
147
        _dynamic = other._dynamic;
Alessio Netti's avatar
Alessio Netti committed
148
        _disabled = other._disabled;
Alessio Netti's avatar
Alessio Netti committed
149
150
151
152
        _keepRunning = other._keepRunning;
        _minValues = other._minValues;
        _interval = other._interval;
        _cacheInterval = other._cacheInterval;
153
        _queueSize = other._queueSize;
154
        _unitCacheLimit = other._unitCacheLimit;
Alessio Netti's avatar
Alessio Netti committed
155
        _cacheSize = other._cacheSize;
Alessio Netti's avatar
Alessio Netti committed
156
        _delayInterval = other._delayInterval;
Alessio Netti's avatar
Alessio Netti committed
157
158
        _pendingTasks.store(0);
        _onDemandLock.store(false);
Alessio Netti's avatar
Alessio Netti committed
159
160
161
162
        _timer = nullptr;

        return *this;
    }
163
    
Alessio Netti's avatar
Alessio Netti committed
164
    /**
165
    * @brief              Initializes this operator
Alessio Netti's avatar
Alessio Netti committed
166
167
168
169
170
171
172
173
174
175
176
    *
    *                     This method initializes the timer used to schedule tasks. It can be overridden by derived
    *                     classes.
    *
    * @param io           Boost ASIO service to be used
    */
    virtual void init(boost::asio::io_service& io) {
        _cacheSize = _cacheInterval / _interval + 1;
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }

Alessio Netti's avatar
Alessio Netti committed
177
178
179
180
    /**
    * @brief              Perform a REST-triggered PUT action
    *
    *                     This method must be implemented in derived classes. It will perform an action (if any)
181
    *                     on the operator according to the input action string. Any thrown
Alessio Netti's avatar
Alessio Netti committed
182
183
184
185
186
    *                     exceptions will be reported in the response string.
    *
    * @param action       Name of the action to be performed
    * @param queries      Vector of queries (key-value pairs)
    *
Alessio Netti's avatar
Alessio Netti committed
187
    * @return             Response to the request as a <response, data> pair
Alessio Netti's avatar
Alessio Netti committed
188
    */
189
    virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) = 0;
Alessio Netti's avatar
Alessio Netti committed
190

191
192
193
194
195
196
197
    /**
    * @brief              Waits for the operator to complete its tasks
    *
    *                     This method must be implemented in derived classes.
    */
    virtual void wait() = 0;
    
Alessio Netti's avatar
Alessio Netti committed
198
    /**
199
    * @brief              Starts this operator
Alessio Netti's avatar
Alessio Netti committed
200
201
    *
    *                     This method must be implemented in derived classes. It will start the operation of the
202
    *                     operator.
Alessio Netti's avatar
Alessio Netti committed
203
204
205
206
    */
    virtual void start() = 0;

    /**
207
    * @brief              Stops this operator
Alessio Netti's avatar
Alessio Netti committed
208
209
    *
    *                     This method must be implemented in derived classes. It will stop the operation of the
210
    *                     operator.
Alessio Netti's avatar
Alessio Netti committed
211
212
213
214
    */
    virtual void stop() = 0;

    /**
215
    * @brief              Adds an unit to this operator
Alessio Netti's avatar
Alessio Netti committed
216
217
    *
    *                     This method must be implemented in derived classes. It must add the input UnitInterface to
218
    *                     the internal structure storing units in the operator. Said unit must then be used during
Alessio Netti's avatar
Alessio Netti committed
219
220
221
222
223
224
225
    *                     computation.
    *
    * @param u            Shared pointer to a UnitInterface object
    */
    virtual void addUnit(UnitPtr u)             = 0;

    /**
226
    * @brief              Clears all the units contained in this operator
Alessio Netti's avatar
Alessio Netti committed
227
228
229
230
231
    *
    *                     This method must be implemented in derived classes.
    */
    virtual void clearUnits()                   = 0;

232
233
234
235
236
237
238
239
240
	/**
	 * @brief              Returns the number of messages this operator will
	 *                     generate per second
	 *
	 *                     This method must be implemented in derived classes.
	 */
	virtual float getMsgRate()                  = 0;

	/**
Alessio Netti's avatar
Alessio Netti committed
241
242
243
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
244
    *                     perform data analytics queries on the operator, which must have the _streaming attribute set
Alessio Netti's avatar
Alessio Netti committed
245
246
247
248
249
250
251
252
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Sensor tree node that defines the query
    * @return             a map<string, reading_t> containing the output of the query
    */
    virtual map<string, reading_t> computeOnDemand(const string& node="") = 0;

253
    /**
254
    * @brief            Prints the current operator configuration
255
256
257
258
259
    *
    * @param ll         Logging level at which the configuration is printed
    */
    virtual void printConfig(LOG_LEVEL ll) = 0;

Alessio Netti's avatar
Alessio Netti committed
260
    // Getter methods
261
262
263
264
    const string& 	    getName()	        const { return _name; }
    const string& 	    getMqttPart()	    const { return _mqttPart; }
    bool 	            getTemplate()	    const { return _isTemplate; }
    bool                getRelaxed()        const { return _relaxed; }
265
    bool                getEnforceTopics()  const { return _enforceTopics; }
266
267
268
269
270
    bool				getSync()		    const { return _sync; }
    bool				getDuplicate()	    const { return _duplicate; }
    bool				getStreaming()	    const { return _streaming; }
    unsigned			getMinValues()	    const { return _minValues; }
    unsigned			getInterval()	    const { return _interval; }
271
    unsigned			getQueueSize()	    const { return _queueSize; }
272
    unsigned			getCacheSize()	    const { return _cacheSize; }
273
    unsigned            getUnitCacheLimit() const { return _unitCacheLimit; }
274
275
276
    unsigned            getDelayInterval()  const { return _delayInterval; }
    int                 getUnitID()         const { return _unitID; }
    bool                getDynamic()        const { return _dynamic; }
Alessio Netti's avatar
Alessio Netti committed
277
    bool                getDisabled()       const { return _disabled; }
Alessio Netti's avatar
Alessio Netti committed
278
279

    // Setter methods
Alessio Netti's avatar
Alessio Netti committed
280
281
    void setName(const string& name)	            { _name = name; }
    void setMqttPart(const string& mqttPart)	    { _mqttPart = mqttPart; }
282
    void setTemplate(bool t)                        { _isTemplate = t; }
283
    void setRelaxed(bool r)                         { _relaxed = r; }
284
    void setEnforceTopics(bool e)                   { _enforceTopics = e; }
Alessio Netti's avatar
Alessio Netti committed
285
286
287
288
    void setSync(bool sync)							{ _sync = sync; }
    void setUnitID(int u)                           { _unitID = u; }
    void setStreaming(bool streaming)				{ _streaming = streaming; }
    void setDuplicate(bool duplicate)				{ _duplicate = duplicate; }
Alessio Netti's avatar
Alessio Netti committed
289
    void setDisabled(bool disabled)                 { _disabled = disabled; }
Alessio Netti's avatar
Alessio Netti committed
290
291
    void setMinValues(unsigned minValues)			{ _minValues = minValues; }
    void setInterval(unsigned interval)				{ _interval = interval; }
292
    void setQueueSize(unsigned queueSize)			{ _queueSize = queueSize; }
293
    void setUnitCacheLimit(unsigned uc)             { _unitCacheLimit = uc+1; }
Alessio Netti's avatar
Alessio Netti committed
294
    void setCacheInterval(unsigned cacheInterval)	{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
295
    void setDelayInterval(unsigned delayInterval)	{ _delayInterval = delayInterval; }
296
297
    virtual vector<UnitPtr>& getUnits()             = 0;
    virtual void    releaseUnits()                  = 0;
Alessio Netti's avatar
Alessio Netti committed
298
299
300

protected:

301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
    /**
     * @brief             Implement plugin specific actions to initialize an operator here.
     *
     * @details           If a derived class requires further custom
     *                    actions for initialization, this should be implemented here.
     */
    virtual void execOnInit() {}

    /**
     * @brief             Implement plugin-specific actions to start an operator here.
     *
     * @details           If a derived class (i.e. a plugin group) requires further custom
     *                    actions to start analytics, this should be implemented here.
     *
     * @return            True on success, false otherwise.
     */
    virtual bool execOnStart() { return true; }

    /**
     * @brief             Implement plugin specific actions to stop a group here.
     *
     * @details           If a derived class requires further custom actions 
     *                    to stop operation this should be implemented here.
     */
    virtual void execOnStop() {}

Alessio Netti's avatar
Alessio Netti committed
327
328
329
    /**
    * @brief              Performs a compute task
    *
330
    * @details            This method is tasked with scheduling the next compute task, and invoking the internal
331
332
    *                     compute() method, which encapsulates the real logic of the operator. The compute method
    *                     is automatically called over units as required by the Operator's configuration.
Alessio Netti's avatar
Alessio Netti committed
333
334
335
    *
    */
    virtual void computeAsync() = 0;
336
    
337
    // Name of this operator
Alessio Netti's avatar
Alessio Netti committed
338
    string _name;
339
    // MQTT part (see docs) of this operator
Alessio Netti's avatar
Alessio Netti committed
340
    string	_mqttPart;
Alessio Netti's avatar
Alessio Netti committed
341

342
    // To distinguish between templates and actual operators
343
    bool _isTemplate;
344
    // If the operator's units must be built in relaxed mode
345
    bool _relaxed;
346
347
    // If true, when building the units of this operator all output sensors will have _mqttPart prepended to them
    bool _enforceTopics;
348
    // If true, the operator is a duplicate of another
Alessio Netti's avatar
Alessio Netti committed
349
    bool _duplicate;
350
    // If true, the operator performs computation in streaming
Alessio Netti's avatar
Alessio Netti committed
351
352
353
    bool _streaming;
    // If true, the computation intervals are synchronized
    bool _sync;
354
    // Indicates whether the operator generates units dynamically at runtime, or only at initialization
355
    bool _dynamic;
Alessio Netti's avatar
Alessio Netti committed
356
357
    // If true, the operator is initialized but "disabled" and cannot be used
    bool _disabled;
358
    // ID of the units this operator works on
Alessio Netti's avatar
Alessio Netti committed
359
    int _unitID;
360
    // Determines if the operator can keep running or must terminate
Alessio Netti's avatar
Alessio Netti committed
361
362
363
364
365
    int _keepRunning;
    // Minimum number of sensor values to be accumulated before output can be sent
    unsigned int _minValues;
    // Sampling period regulating compute batches
    unsigned int _interval;
366
367
	// readingQueue size
    unsigned int _queueSize;
368
    // Size of the cache in time for the output sensors in this operator
Alessio Netti's avatar
Alessio Netti committed
369
    unsigned int _cacheInterval;
370
371
    // Maximum number of units that can be contained in the unit cache
    unsigned int _unitCacheLimit;
Alessio Netti's avatar
Alessio Netti committed
372
373
    // Real size of the cache, as determined from cacheInterval
    unsigned int _cacheSize;
Alessio Netti's avatar
Alessio Netti committed
374
375
    // Time in seconds to wait for before starting computation
    unsigned int _delayInterval;
Alessio Netti's avatar
Alessio Netti committed
376
    // Number of pending ASIO tasks
Alessio Netti's avatar
Alessio Netti committed
377
378
379
    atomic_uint _pendingTasks;
    // Lock used to serialize access to the ondemand functionality
    atomic_bool _onDemandLock;
Alessio Netti's avatar
Alessio Netti committed
380
    // Timer for scheduling tasks
Alessio Netti's avatar
Alessio Netti committed
381
    unique_ptr<boost::asio::deadline_timer> _timer;
Alessio Netti's avatar
Alessio Netti committed
382
383
384
385
386
387

    // Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};

//for better readability
388
using OperatorPtr = shared_ptr<OperatorInterface>;
Alessio Netti's avatar
Alessio Netti committed
389

390
#endif //PROJECT_OPERATORINTERFACE_H