24.09., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

OperatorInterface.h 14.6 KB
Newer Older
1
//================================================================================
2
// Name        : OperatorInterface.h
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Interface to data operators.
7 8 9 10 11 12 13 14 15 16
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18 19 20 21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23 24 25 26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28
/**
29
 * @defgroup operator Operator Plugins
30 31
 * @ingroup analytics
 *
32
 * @brief Operator for the analytics plugin.
33
 */
34 35
#ifndef PROJECT_OPERATORINTERFACE_H
#define PROJECT_OPERATORINTERFACE_H
Alessio Netti's avatar
Alessio Netti committed
36 37 38

#include <atomic>
#include <memory>
39
#include <unordered_map>
Alessio Netti's avatar
Alessio Netti committed
40
#include <vector>
Alessio Netti's avatar
Alessio Netti committed
41
#include <map>
Alessio Netti's avatar
Alessio Netti committed
42 43
#include <boost/asio.hpp>

44
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
45 46
#include "UnitInterface.h"

Alessio Netti's avatar
Alessio Netti committed
47 48
using namespace std;

Alessio Netti's avatar
Alessio Netti committed
49 50
// Struct defining a response to a REST request
typedef struct {
Alessio Netti's avatar
Alessio Netti committed
51 52
    string response;
    string data;
Alessio Netti's avatar
Alessio Netti committed
53 54
} restResponse_t;

Alessio Netti's avatar
Alessio Netti committed
55
/**
56
 * @brief Interface to data operators
57 58
 *
 * @details This interface supplies methods to instantiate, start and retrieve
59
 *          data from operators, which are "modules" that* implement data
60
 *          analytics models and are loaded by DCDB data analytics plugins. For
61
 *          it to be used in the DCDB data analytics framework, the operator
62
 *          must comply to this interface.
Alessio Netti's avatar
Alessio Netti committed
63
 *
64
 *          An operator acts on "units", which are logical entities represented
65 66
 *          by certain inputs and outputs. An unit can be, for example, a node,
 *          a CPU or a rack in a HPC system.
Alessio Netti's avatar
Alessio Netti committed
67
 *
68
 * @ingroup operator
Alessio Netti's avatar
Alessio Netti committed
69
 */
70
class OperatorInterface {
Alessio Netti's avatar
Alessio Netti committed
71 72 73 74 75
public:

    /**
    * @brief            Class constructor
    *
76
    * @param name       Name of the operator
Alessio Netti's avatar
Alessio Netti committed
77
    */
78
    OperatorInterface(const string& name) :
Alessio Netti's avatar
Alessio Netti committed
79 80
            _name(name),
            _mqttPart(""),
81
            _isTemplate(false),
82
            _relaxed(false),
83
            _enforceTopics(false),
Alessio Netti's avatar
Alessio Netti committed
84 85 86
            _duplicate(false),
            _streaming(true),
            _sync(true),
87
            _dynamic(false),
Alessio Netti's avatar
Alessio Netti committed
88
            _disabled(false),
Alessio Netti's avatar
Alessio Netti committed
89 90 91 92 93
            _unitID(-1),
            _keepRunning(0),
            _minValues(1),
            _interval(1000),
            _cacheInterval(900000),
94
            _unitCacheLimit(1000),
Alessio Netti's avatar
Alessio Netti committed
95
            _cacheSize(1),
96
            _delayInterval(10),
Alessio Netti's avatar
Alessio Netti committed
97
            _pendingTasks(0),
Alessio Netti's avatar
Alessio Netti committed
98
            _onDemandLock(false),
Alessio Netti's avatar
Alessio Netti committed
99 100 101 102 103
            _timer(nullptr) {}

    /**
    * @brief            Copy constructor
    */
104
    OperatorInterface(const OperatorInterface& other) :
Alessio Netti's avatar
Alessio Netti committed
105 106
            _name(other._name),
            _mqttPart(other._mqttPart),
107
            _isTemplate(other._isTemplate),
108
            _relaxed(other._relaxed),
109
            _enforceTopics(other._enforceTopics),
Alessio Netti's avatar
Alessio Netti committed
110 111 112
            _duplicate(other._duplicate),
            _streaming(other._streaming),
            _sync(other._sync),
113
            _dynamic(other._dynamic),
Alessio Netti's avatar
Alessio Netti committed
114
            _disabled(other._disabled),
Alessio Netti's avatar
Alessio Netti committed
115 116 117 118 119
            _unitID(other._unitID),
            _keepRunning(other._keepRunning),
            _minValues(other._minValues),
            _interval(other._interval),
            _cacheInterval(other._cacheInterval),
120
            _unitCacheLimit(other._unitCacheLimit),
Alessio Netti's avatar
Alessio Netti committed
121
            _cacheSize(other._cacheSize),
Alessio Netti's avatar
Alessio Netti committed
122
            _delayInterval(other._delayInterval),
Alessio Netti's avatar
Alessio Netti committed
123 124 125
            _pendingTasks(0),
            _onDemandLock(false),
            _timer(nullptr) {}
Alessio Netti's avatar
Alessio Netti committed
126 127 128 129

    /**
    * @brief            Class destructor
    */
130
    virtual ~OperatorInterface() {}
Alessio Netti's avatar
Alessio Netti committed
131 132 133 134

    /**
    * @brief            Assignment operator
    */
135
    OperatorInterface& operator=(const OperatorInterface& other) {
Alessio Netti's avatar
Alessio Netti committed
136 137
        _name = other._name;
        _mqttPart = other._mqttPart;
138
        _isTemplate = other._isTemplate;
139
        _relaxed = other._relaxed;
140
        _enforceTopics = other._enforceTopics;
Alessio Netti's avatar
Alessio Netti committed
141 142 143 144
        _unitID = other._unitID;
        _duplicate = other._duplicate;
        _streaming = other._streaming;
        _sync = other._sync;
145
        _dynamic = other._dynamic;
Alessio Netti's avatar
Alessio Netti committed
146
        _disabled = other._disabled;
Alessio Netti's avatar
Alessio Netti committed
147 148 149 150
        _keepRunning = other._keepRunning;
        _minValues = other._minValues;
        _interval = other._interval;
        _cacheInterval = other._cacheInterval;
151
        _unitCacheLimit = other._unitCacheLimit;
Alessio Netti's avatar
Alessio Netti committed
152
        _cacheSize = other._cacheSize;
Alessio Netti's avatar
Alessio Netti committed
153
        _delayInterval = other._delayInterval;
Alessio Netti's avatar
Alessio Netti committed
154 155
        _pendingTasks.store(0);
        _onDemandLock.store(false);
Alessio Netti's avatar
Alessio Netti committed
156 157 158 159
        _timer = nullptr;

        return *this;
    }
160
    
Alessio Netti's avatar
Alessio Netti committed
161
    /**
162
    * @brief              Initializes this operator
Alessio Netti's avatar
Alessio Netti committed
163 164 165 166 167 168 169 170 171 172 173
    *
    *                     This method initializes the timer used to schedule tasks. It can be overridden by derived
    *                     classes.
    *
    * @param io           Boost ASIO service to be used
    */
    virtual void init(boost::asio::io_service& io) {
        _cacheSize = _cacheInterval / _interval + 1;
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }

Alessio Netti's avatar
Alessio Netti committed
174 175 176 177
    /**
    * @brief              Perform a REST-triggered PUT action
    *
    *                     This method must be implemented in derived classes. It will perform an action (if any)
178
    *                     on the operator according to the input action string. Any thrown
Alessio Netti's avatar
Alessio Netti committed
179 180 181 182 183
    *                     exceptions will be reported in the response string.
    *
    * @param action       Name of the action to be performed
    * @param queries      Vector of queries (key-value pairs)
    *
Alessio Netti's avatar
Alessio Netti committed
184
    * @return             Response to the request as a <response, data> pair
Alessio Netti's avatar
Alessio Netti committed
185
    */
186
    virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) = 0;
Alessio Netti's avatar
Alessio Netti committed
187

188 189 190 191 192 193 194
    /**
    * @brief              Waits for the operator to complete its tasks
    *
    *                     This method must be implemented in derived classes.
    */
    virtual void wait() = 0;
    
Alessio Netti's avatar
Alessio Netti committed
195
    /**
196
    * @brief              Starts this operator
Alessio Netti's avatar
Alessio Netti committed
197 198
    *
    *                     This method must be implemented in derived classes. It will start the operation of the
199
    *                     operator.
Alessio Netti's avatar
Alessio Netti committed
200 201 202 203
    */
    virtual void start() = 0;

    /**
204
    * @brief              Stops this operator
Alessio Netti's avatar
Alessio Netti committed
205 206
    *
    *                     This method must be implemented in derived classes. It will stop the operation of the
207
    *                     operator.
Alessio Netti's avatar
Alessio Netti committed
208 209 210 211
    */
    virtual void stop() = 0;

    /**
212
    * @brief              Adds an unit to this operator
Alessio Netti's avatar
Alessio Netti committed
213 214
    *
    *                     This method must be implemented in derived classes. It must add the input UnitInterface to
215
    *                     the internal structure storing units in the operator. Said unit must then be used during
Alessio Netti's avatar
Alessio Netti committed
216 217 218 219 220 221 222
    *                     computation.
    *
    * @param u            Shared pointer to a UnitInterface object
    */
    virtual void addUnit(UnitPtr u)             = 0;

    /**
223
    * @brief              Clears all the units contained in this operator
Alessio Netti's avatar
Alessio Netti committed
224 225 226 227 228
    *
    *                     This method must be implemented in derived classes.
    */
    virtual void clearUnits()                   = 0;

229 230 231 232 233 234 235 236 237
	/**
	 * @brief              Returns the number of messages this operator will
	 *                     generate per second
	 *
	 *                     This method must be implemented in derived classes.
	 */
	virtual float getMsgRate()                  = 0;

	/**
Alessio Netti's avatar
Alessio Netti committed
238 239 240
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
241
    *                     perform data analytics queries on the operator, which must have the _streaming attribute set
Alessio Netti's avatar
Alessio Netti committed
242 243 244 245 246 247 248 249
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Sensor tree node that defines the query
    * @return             a map<string, reading_t> containing the output of the query
    */
    virtual map<string, reading_t> computeOnDemand(const string& node="") = 0;

250
    /**
251
    * @brief            Prints the current operator configuration
252 253 254 255 256
    *
    * @param ll         Logging level at which the configuration is printed
    */
    virtual void printConfig(LOG_LEVEL ll) = 0;

Alessio Netti's avatar
Alessio Netti committed
257
    // Getter methods
258 259 260 261
    const string& 	    getName()	        const { return _name; }
    const string& 	    getMqttPart()	    const { return _mqttPart; }
    bool 	            getTemplate()	    const { return _isTemplate; }
    bool                getRelaxed()        const { return _relaxed; }
262
    bool                getEnforceTopics()  const { return _enforceTopics; }
263 264 265 266 267 268
    bool				getSync()		    const { return _sync; }
    bool				getDuplicate()	    const { return _duplicate; }
    bool				getStreaming()	    const { return _streaming; }
    unsigned			getMinValues()	    const { return _minValues; }
    unsigned			getInterval()	    const { return _interval; }
    unsigned			getCacheSize()	    const { return _cacheSize; }
269
    unsigned            getUnitCacheLimit() const { return _unitCacheLimit; }
270 271 272
    unsigned            getDelayInterval()  const { return _delayInterval; }
    int                 getUnitID()         const { return _unitID; }
    bool                getDynamic()        const { return _dynamic; }
Alessio Netti's avatar
Alessio Netti committed
273
    bool                getDisabled()       const { return _disabled; }
Alessio Netti's avatar
Alessio Netti committed
274 275

    // Setter methods
Alessio Netti's avatar
Alessio Netti committed
276 277
    void setName(const string& name)	            { _name = name; }
    void setMqttPart(const string& mqttPart)	    { _mqttPart = mqttPart; }
278
    void setTemplate(bool t)                        { _isTemplate = t; }
279
    void setRelaxed(bool r)                         { _relaxed = r; }
280
    void setEnforceTopics(bool e)                   { _enforceTopics = e; }
Alessio Netti's avatar
Alessio Netti committed
281 282 283 284
    void setSync(bool sync)							{ _sync = sync; }
    void setUnitID(int u)                           { _unitID = u; }
    void setStreaming(bool streaming)				{ _streaming = streaming; }
    void setDuplicate(bool duplicate)				{ _duplicate = duplicate; }
Alessio Netti's avatar
Alessio Netti committed
285
    void setDisabled(bool disabled)                 { _disabled = disabled; }
Alessio Netti's avatar
Alessio Netti committed
286 287
    void setMinValues(unsigned minValues)			{ _minValues = minValues; }
    void setInterval(unsigned interval)				{ _interval = interval; }
288
    void setUnitCacheLimit(unsigned uc)             { _unitCacheLimit = uc+1; }
Alessio Netti's avatar
Alessio Netti committed
289
    void setCacheInterval(unsigned cacheInterval)	{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
290
    void setDelayInterval(unsigned delayInterval)	{ _delayInterval = delayInterval; }
291 292
    virtual vector<UnitPtr>& getUnits()             = 0;
    virtual void    releaseUnits()                  = 0;
Alessio Netti's avatar
Alessio Netti committed
293 294 295

protected:

296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    /**
     * @brief             Implement plugin specific actions to initialize an operator here.
     *
     * @details           If a derived class requires further custom
     *                    actions for initialization, this should be implemented here.
     */
    virtual void execOnInit() {}

    /**
     * @brief             Implement plugin-specific actions to start an operator here.
     *
     * @details           If a derived class (i.e. a plugin group) requires further custom
     *                    actions to start analytics, this should be implemented here.
     *
     * @return            True on success, false otherwise.
     */
    virtual bool execOnStart() { return true; }

    /**
     * @brief             Implement plugin specific actions to stop a group here.
     *
     * @details           If a derived class requires further custom actions 
     *                    to stop operation this should be implemented here.
     */
    virtual void execOnStop() {}

Alessio Netti's avatar
Alessio Netti committed
322 323 324
    /**
    * @brief              Performs a compute task
    *
325
    * @details            This method is tasked with scheduling the next compute task, and invoking the internal
326 327
    *                     compute() method, which encapsulates the real logic of the operator. The compute method
    *                     is automatically called over units as required by the Operator's configuration.
Alessio Netti's avatar
Alessio Netti committed
328 329 330
    *
    */
    virtual void computeAsync() = 0;
331
    
332
    // Name of this operator
Alessio Netti's avatar
Alessio Netti committed
333
    string _name;
334
    // MQTT part (see docs) of this operator
Alessio Netti's avatar
Alessio Netti committed
335
    string	_mqttPart;
Alessio Netti's avatar
Alessio Netti committed
336

337
    // To distinguish between templates and actual operators
338
    bool _isTemplate;
339
    // If the operator's units must be built in relaxed mode
340
    bool _relaxed;
341 342
    // If true, when building the units of this operator all output sensors will have _mqttPart prepended to them
    bool _enforceTopics;
343
    // If true, the operator is a duplicate of another
Alessio Netti's avatar
Alessio Netti committed
344
    bool _duplicate;
345
    // If true, the operator performs computation in streaming
Alessio Netti's avatar
Alessio Netti committed
346 347 348
    bool _streaming;
    // If true, the computation intervals are synchronized
    bool _sync;
349
    // Indicates whether the operator generates units dynamically at runtime, or only at initialization
350
    bool _dynamic;
Alessio Netti's avatar
Alessio Netti committed
351 352
    // If true, the operator is initialized but "disabled" and cannot be used
    bool _disabled;
353
    // ID of the units this operator works on
Alessio Netti's avatar
Alessio Netti committed
354
    int _unitID;
355
    // Determines if the operator can keep running or must terminate
Alessio Netti's avatar
Alessio Netti committed
356 357 358 359 360
    int _keepRunning;
    // Minimum number of sensor values to be accumulated before output can be sent
    unsigned int _minValues;
    // Sampling period regulating compute batches
    unsigned int _interval;
361
    // Size of the cache in time for the output sensors in this operator
Alessio Netti's avatar
Alessio Netti committed
362
    unsigned int _cacheInterval;
363 364
    // Maximum number of units that can be contained in the unit cache
    unsigned int _unitCacheLimit;
Alessio Netti's avatar
Alessio Netti committed
365 366
    // Real size of the cache, as determined from cacheInterval
    unsigned int _cacheSize;
Alessio Netti's avatar
Alessio Netti committed
367 368
    // Time in seconds to wait for before starting computation
    unsigned int _delayInterval;
Alessio Netti's avatar
Alessio Netti committed
369
    // Number of pending ASIO tasks
Alessio Netti's avatar
Alessio Netti committed
370 371 372
    atomic_uint _pendingTasks;
    // Lock used to serialize access to the ondemand functionality
    atomic_bool _onDemandLock;
Alessio Netti's avatar
Alessio Netti committed
373
    // Timer for scheduling tasks
Alessio Netti's avatar
Alessio Netti committed
374
    unique_ptr<boost::asio::deadline_timer> _timer;
Alessio Netti's avatar
Alessio Netti committed
375 376 377 378 379 380

    // Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};

//for better readability
381
using OperatorPtr = shared_ptr<OperatorInterface>;
Alessio Netti's avatar
Alessio Netti committed
382

383
#endif //PROJECT_OPERATORINTERFACE_H