OperatorManager.h 20.1 KB
Newer Older
1
//================================================================================
2
// Name        : OperatorManager.h
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright   : Leibniz Supercomputing Centre
// Description : Management class for the data analytics framework.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28
29
30
31
32
33
/**
 * @defgroup analytics Analytics Framework
 *
 * @brief Framework for data analytics at various points within DCDB.
 */

34
35
#ifndef PROJECT_OPERATORMANAGER_H
#define PROJECT_OPERATORMANAGER_H
Alessio Netti's avatar
Alessio Netti committed
36

37
#include <set>
Alessio Netti's avatar
Alessio Netti committed
38
#include <boost/foreach.hpp>
Alessio Netti's avatar
Alessio Netti committed
39
#include <boost/property_tree/ptree.hpp>
Alessio Netti's avatar
Alessio Netti committed
40
#include <boost/property_tree/info_parser.hpp>
Alessio Netti's avatar
Alessio Netti committed
41
#include <boost/property_tree/json_parser.hpp>
Alessio Netti's avatar
Alessio Netti committed
42
43
44
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <dlfcn.h>
Alessio Netti's avatar
Alessio Netti committed
45
#include <exception>
Alessio Netti's avatar
Alessio Netti committed
46
#include "logging.h"
47
#include "RESTHttpsServer.h"
Alessio Netti's avatar
Alessio Netti committed
48
#include "mqttchecker.h"
Alessio Netti's avatar
Alessio Netti committed
49
#include "includes/UnitInterface.h"
50
#include "includes/OperatorConfiguratorInterface.h"
Alessio Netti's avatar
Alessio Netti committed
51

Alessio Netti's avatar
Alessio Netti committed
52
53
using namespace std;

54
/**
55
 * Struct of values required to load operator dynamic libraries.
56
57
58
 *
 * @ingroup analytics
 */
Alessio Netti's avatar
Alessio Netti committed
59
60
61
typedef struct {
    std::string id;
    void*	DL;
62
63
64
65
    OperatorConfiguratorInterface* configurator;
    op_create_t*  create;
    op_destroy_t* destroy;
} op_dl_t;
Alessio Netti's avatar
Alessio Netti committed
66

67
typedef std::vector<op_dl_t> op_pluginVector_t;
68

Alessio Netti's avatar
Alessio Netti committed
69
/**
70
71
72
73
 * @brief Management class for the entire data analytics framework
 *
 * @details Allows to load, configure, start and generally manage plugins
 *          developed with the data analytics framework.
Alessio Netti's avatar
Alessio Netti committed
74
 *
75
 * @ingroup analytics
Alessio Netti's avatar
Alessio Netti committed
76
 */
77
class OperatorManager {
Alessio Netti's avatar
Alessio Netti committed
78
79
80

public:

Alessio Netti's avatar
Alessio Netti committed
81
82
    enum managerState_t { CLEAR = 1, LOADED = 2};

Alessio Netti's avatar
Alessio Netti committed
83
84
85
    /**
    * @brief            Class constructor
    */
86
    OperatorManager() { _status = CLEAR; }
Alessio Netti's avatar
Alessio Netti committed
87
88
89
90

    /**
    * @brief            Class destructor
    */
91
    ~OperatorManager() { clear(); }
Alessio Netti's avatar
Alessio Netti committed
92
93
94
95

    /**
    * @brief            Resets the state of the data analytics framework
    *
96
    *                   All currently running operators will be stopped, and the related objects destroyed.
Alessio Netti's avatar
Alessio Netti committed
97
98
99
    */
    void clear();

100
101
102
103
    /**
    * @brief                Probes a configuration file to determine if initialization is required
    *
    *                       This method will read through the specified configuration file, and search for an
104
    *                       operatorPlugin block, with its associated data analytics plugin. If the method returns
105
106
107
108
109
110
111
112
    *                       true, then one or more plugins were requested for initialization.
    *
    * @param path           Path to the global and plugin configuration files
    * @param globalFile     Name of the global file (usually global.conf or collectagent.conf)
    * @return               true if a configuration is necessary, false otherwise
    */
    bool probe(const string& path, const string& globalFile);

Alessio Netti's avatar
Alessio Netti committed
113
114
115
116
117
    /**
    * @brief                Loads plugins as specified in the input config file
    *
    *                       This method will load .dll libraries containing data analytics plugins. It will then
    *                       configure them, according to the respective configuration files, and store the generated
118
    *                       OperatorInterface objects, ready to be started.
Alessio Netti's avatar
Alessio Netti committed
119
120
121
122
123
124
125
126
127
128
129
    *
    * @param path           Path to the global and plugin configuration files
    * @param globalFile     Name of the global file (usually global.conf or collectagent.conf)
    * @param pluginSettings Structure containing global plugin settings
    * @return               true if successful, false otherwise
    */
    bool load(const string& path, const string& globalFile, const pluginSettings_t& pluginSettings);

    /**
    * @brief            Initialize one or more stored plugins
    *
130
    *                   This method must be called after "load", and before "start". It will prepare operators for
Alessio Netti's avatar
Alessio Netti committed
131
132
133
134
135
136
137
138
139
140
141
    *                   operation, and initialize the related sensors and caches.
    *
    * @param io         Boost ASIO service to be used
    * @param plugin     Name of the plugin on which the action must be performed. If none, all plugins will be affected
    * @return           true if successful, false otherwise
    */
    bool init(boost::asio::io_service& io, const string& plugin="");

    /**
    * @brief            Reload one or more plugins
    *
142
143
    *                   This method will cause all running operators of a plugin to be stopped and destroyed. The
    *                   configuration file is then read once again, and new operators are created and initialized.
Alessio Netti's avatar
Alessio Netti committed
144
145
146
147
148
149
150
151
152
153
    *
    * @param io         Boost ASIO service to be used
    * @param plugin     Name of the plugin on which the action must be performed. If none, all plugins will be affected
    * @return           true if successful, false otherwise
    */
    bool reload(boost::asio::io_service& io, const string& plugin="");

    /**
    * @brief            Start one or more stored plugins
    *
154
    *                   This method must be called after "init". It will start up all operators stored in a plugin,
Alessio Netti's avatar
Alessio Netti committed
155
156
157
    *                   which will then perform computation autonomously according to their sampling rates.
    *
    * @param plugin     Name of the plugin on which the action must be performed. If none, all plugins will be affected
158
    * @param operatorN  Name of the operator in the specified plugin affected by the action
Alessio Netti's avatar
Alessio Netti committed
159
160
    * @return           true if successful, false otherwise
    */
161
    bool start(const string& plugin="", const string& operatorN="");
Alessio Netti's avatar
Alessio Netti committed
162
163
164
165

    /**
    * @brief            Stops one or more stored plugins
    *
166
167
    *                   This method must be called after "start". It will stop all operators of a plugin that are
    *                   currently running, and halt their computation. The operators can be re-started again later
Alessio Netti's avatar
Alessio Netti committed
168
169
170
    *                   with the "start" method.
    *
    * @param plugin     Name of the plugin on which the action must be performed. If none, all plugins will be affected
171
    * @param operatorN  Name of the operator in the specified plugin affected by the action
Alessio Netti's avatar
Alessio Netti committed
172
173
    * @return           true if successful, false otherwise
    */
174
    bool stop(const string& plugin="", const string& operatorN="");
Alessio Netti's avatar
Alessio Netti committed
175

176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
    /**
     * @brief             Loads a plugin dynamically
     * 
     * @param name        Name of the plugin
     * @param pluginPath  Path to the plugin library
     * @param config      Path to the configuration file
     * @return            True if successful, false otherwise
     */
    bool loadPlugin(const string& name, const string& pluginPath, const string& config);

    /**
     * @brief             Unloads a currently loaded plugin
     * 
     * @param id          Name of the plugin to be unloaded
     */
    void unloadPlugin(const string& id);

Alessio Netti's avatar
Alessio Netti committed
193
194
195
    /**
    * @brief            Get the vector of currently loaded plugins
    *
196
    *                   The vector can then be used to access single operators within plugins, the related units, and
Alessio Netti's avatar
Alessio Netti committed
197
198
    *                   finally all output sensors.
    *
199
    * @return           Vector of plugins represented as op_dl_t structures
Alessio Netti's avatar
Alessio Netti committed
200
    */
201
    std::vector<op_dl_t>& getPlugins() { return _plugins; }
202

203
    /**
204
     * @brief   Get the current status of the OperatorManager.
205
     *
206
     * @return  Status of the OperatorManager.
207
208
209
     */
    managerState_t getStatus() { return _status; }

210
211
212
213
214
215
216
217
218
    /**
     * @brief   Adds analytics RestAPI endpoints to an RestAPI server.
     *
     * @param restServer    The RestAPI server which should offer analytics
     *                      endpoints.
     */
    void addRestEndpoints(RESTHttpsServer* restServer);

    // String used as a response for the REST GET /help command
219
    const string restCheatSheet = "DCDB Analytics RESTful API cheatsheet:\n"
220
221
                                    "(All commands must be prepended by \"/analytics\" !)\n"
                                    " -GET: /plugins?[json]   D List off currently loaded plugins.\n"
222
                                    "       /sensors?plugin;[operator];[json]\n"
223
                                    "                         D List of currently running sensors which belong to\n"
224
225
226
                                    "                           the specified plugin (and operator).\n"
                                    "       /operators?plugin;[json]\n"
                                    "                         D List of running operators in the specified data\n"
227
                                    "                           analytics plugin.\n"
228
                                    "       /units?plugin;[operator];[json]\n"
229
                                    "                         D List of units to which sensors are associated in\n"
230
231
                                    "                           the specified data analytics plugin (and operator).\n"
                                    " -PUT: /start?[plugin];[operator]\n"
232
                                    "                           Start all or only a specific analytics plugin or\n"
233
234
                                    "                           start only a specific operator within a plugin.\n"
                                    "       /stop?[plugin];[operator]\n"
235
                                    "                           Stop all or only a specific analytics plugin or\n"
236
                                    "                           stop only a specific operator within a plugin.\n"
237
                                    "       /reload?[plugin]    Reload all or only a specific analytics plugin.\n"
238
239
240
241
242
                                    "       /load?plugin;[path];[config]\n"
                                    "                           Load a new plugin. Optionally specify path to the\n"
                                    "                           shared library and/or the config file for the \n"
                                    "                           plugin.\n"
                                    "       /unload?plugin      Unload a plugin.\n"
243
244
                                    "       /compute?plugin;operator;[unit];[json]\n"
                                    "                           Query the specified operator for a unit. Default\n"
245
                                    "                           unit is the root.\n"
246
247
248
                                    "       /operator?plugin;action;[operator]\n"
                                    "                           Do a custom operator action for all or only an\n"
                                    "                           selected operator within a plugin (refer to plugin\n"
249
250
251
252
253
254
255
256
                                    "                           documentation).\n"
                                    "\n"
                                    "D = Discovery method\n"
                                    "All resources have to be prepended by host:port.\n"
                                    "A query can be appended as ?query=[value] at the end. Multiple queries\n"
                                    "need to be separated by semicolons(';'). \"query=value\" syntax was shortened\n"
                                    "to \"query\" for readability. Optional queries are marked with [ ]\n";

Alessio Netti's avatar
Alessio Netti committed
257
258
protected:

Alessio Netti's avatar
Alessio Netti committed
259
    // Utility method to drop all topics associated to a certain plugin
260
    void removeTopics(op_dl_t p);
Alessio Netti's avatar
Alessio Netti committed
261
    // Utility method to check all MQTT topics within a certain plugin
262
    bool checkTopics(op_dl_t p);
Alessio Netti's avatar
Alessio Netti committed
263

264
265
    // Vector of plugins represented as op_dl_t structures
    std::vector<op_dl_t> _plugins;
Alessio Netti's avatar
Alessio Netti committed
266
267
268
269
    // Path used to load config files
    string _configPath;
    // Structure containing global plugin settings
    pluginSettings_t _pluginSettings;
Alessio Netti's avatar
Alessio Netti committed
270
271
    // Keeps track of the manager's state
    managerState_t _status;
Alessio Netti's avatar
Alessio Netti committed
272
273
274
275

    //Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;

276
277
278
private:
    // all stuff related to REST API

279
    // Utility method to check the status of the operator manager. If not
280
281
282
283
284
    // loaded: prepares the response accordingly so no further actions are
    // required.
    // Return true if loaded, false otherwise.
    inline bool managerLoaded(http::response<http::string_body>& res) {
        if (_status != LOADED) {
285
            const std::string err = "OperatorManager is not loaded!\n";
286
287
288
289
290
291
292
293
294
295
296
297
298
            RESTAPILOG(error) << err;
            res.body() = err;
            res.result(http::status::internal_server_error);
            return false;
        }
        return true;
    }

    // methods for REST API endpoints
    /**
     * GET "/analytics/help"
     *
     * @brief Return a cheatsheet of available REST API endpoints specific for
299
     *        the operator manager.
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
     * Required |  -      |        -             |      -
     * Optional |  -      |        -             |      -
     */
    void GET_analytics_help(endpointArgs);

    /**
     * GET "/analytics/plugins"
     *
     * @brief (Discovery) List all currently loaded data analytic plugins.
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
     * Required |  -      |        -             |      -
     * Optional | json    | true                 | format response as json
     */
    void GET_analytics_plugins(endpointArgs);

    /**
     * GET "/analytics/sensors"
     *
     * @brief (Discovery) List all sensors of a plugin.
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
327
     * Required | plugin  | all operator plugin  | specify the plugin
328
     *          |         | names                |
329
330
     * Optional | operator| all operators of a   | restrict sensors list to an
     *          |         | plugin               | operator
331
332
333
334
335
336
337
338
339
340
341
     *          | json    | true                 | format response as json
     */
    void GET_analytics_sensors(endpointArgs);

    /**
     * GET "/analytics/units"
     *
     * @brief (Discovery) List all units of a plugin.
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
342
     * Required | plugin  | all operator plugin  | specify the plugin
343
     *          |         | names                |
344
345
     * Optional | operator| all operators of a   | restrict unit list to an
     *          |         | plugin               | operator
346
347
348
349
350
     *          | json    | true                 | format response as json
     */
    void GET_analytics_units(endpointArgs);

    /**
351
     * GET "/analytics/operators"
352
     *
353
     * @brief (Discovery) List all active operators of a plugin.
354
355
356
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
357
     * Required | plugin  | all operator plugin  | specify the plugin
358
359
360
     *          |         | names                |
     * Optional | json    | true                 | format response as json
     */
361
    void GET_analytics_operators(endpointArgs);
362
363
364
365
366

    /**
     * PUT "/analytics/start"
     *
     * @brief Start all or only a specific plugin. Or only start a specific
367
     *        streaming operator within a specific plugin.
368
369
370
371
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
     * Required |  -      |        -             |      -
372
     * Optional | plugin  | all operator plugin  | only start the specified
373
     *          |         | names                | plugin
374
375
     *          | operator| all operators of a   | only start the specified
     *          |         | plugin               | operator. Requires a plugin
376
     *          |         |                      | to be specified. Limited to
377
     *          |         |                      | streaming operators.
378
379
380
381
382
383
384
     */
    void PUT_analytics_start(endpointArgs);

    /**
     * PUT "/analytics/stop"
     *
     * @brief Stop all or only a specific plugin. Or only stop a specific
385
     *        streaming operator within a plugin.
386
387
388
389
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
     * Required |  -      |        -             |      -
390
     * Optional | plugin  | all operator plugin  | only stop the specified
391
     *          |         | names                | plugin
392
393
     *          | operator| all operators of a   | only stop the specified
     *          |         | plugin               | operator. Requires a plugin
394
     *          |         |                      | to be specified. Limited to
395
     *          |         |                      | streaming operators.
396
397
398
399
400
     */
    void PUT_analytics_stop(endpointArgs);

    /**
     * This endpoint must either be overwritten (by adding a custom
401
     * "analytics/reload" endpoint) or must not be used. A reload requires
402
403
404
405
406
407
408
409
410
411
412
     * an external io_service object and can therefore not be conducted by the
     * AnalyticsManager itself.
     *
     * PUT "/analytics/reload"
     *
     * @brief Reload configuration and initialization of all or only a specific
     *        analytics plugin.
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
     * Required |  -      |        -             |      -
413
     * Optional | plugin  | all operator plugin  | reload only the specified
414
415
416
417
418
419
420
     *          |         | names                | plugin
     */
    void PUT_analytics_reload(endpointArgs);

    /**
     * PUT "/analytics/compute"
     *
421
422
     * @brief Query the given operator for a certain input unit. Intended for
     *        "on-demand" operators, but works with "streaming" operators as
423
424
425
426
     *        well.
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
427
     * Required | plugin  | all operator plugin  | select plugin
428
     *          |         | names                |
429
     *          | operator| all operators of a   | select operator
430
431
432
433
434
435
436
     *          |         | plugin               |
     * Optional | unit    | all units of a plugin| select target unit
     *          | json    | true                 | format response as json
     */
    void PUT_analytics_compute(endpointArgs);

    /**
437
     * PUT "/analytics/operator"
438
     *
439
     * @brief Perform a custom REST PUT action defined at operator level.
440
441
442
     *
     * Queries  | key     | possible values      | explanation
     * -------------------------------------------------------------------------
443
     * Required | plugin  | all operator plugin  | select plugin
444
     *          |         | names                |
445
     *          | action  | see operator         | select custom action
446
447
     *          |         | documentation        |
     *          | custom action may require more queries!
448
     * Optional | operator| all operators of a   | select operator
449
450
451
     *          |         | plugin               |
     *          | custom action may allow for more queries!
     */
452
    void PUT_analytics_operator(endpointArgs);
453

Alessio Netti's avatar
Alessio Netti committed
454
455
456
};

#endif //PROJECT_ANALYTICSMANAGER_H