AnalyticsManager.cpp 26 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//================================================================================
// Name        : AnalyticsManager.cpp
// Author      : Alessio Netti
// Copyright   : Leibniz Supercomputing Centre
// Description : Management class implementation for the data analytics framework.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
16
//
17
18
19
20
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
21
//
22
23
24
25
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
26
27

#include "AnalyticsManager.h"
Alessio Netti's avatar
Alessio Netti committed
28
#include "timestamp.h"
Alessio Netti's avatar
Alessio Netti committed
29
30
31
32
33

void AnalyticsManager::clear() {
    for(const auto& p : _plugins)
        p.destroy(p.configurator);
    _plugins.clear();
Alessio Netti's avatar
Alessio Netti committed
34
    _status = CLEAR;
Alessio Netti's avatar
Alessio Netti committed
35
36
}

37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
bool AnalyticsManager::probe(const string& path, const string& globalFile) {
    std::string cfgPath = path;
    boost::property_tree::iptree cfg;

    if (cfgPath != "" && cfgPath[cfgPath.length() - 1] != '/')
        cfgPath.append("/");

    try {
        boost::property_tree::read_info(cfgPath + globalFile, cfg);
    } catch (boost::property_tree::info_parser_error &e) {
        return false;
    }

    if(cfg.find("analyzerPlugins") == cfg.not_found()) 
        return false;

    int pluginCtr = 0;
    BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("analyzerPlugins")) {
        if (boost::iequals(plugin.first, "analyzerPlugin"))
            pluginCtr++;
    }

    if( pluginCtr == 0) 
        return false;
    else
        return true;
}

Alessio Netti's avatar
Alessio Netti committed
65
66
67
68
69
bool AnalyticsManager::load(const string& path, const string& globalFile, const pluginSettings_t& pluginSettings) {
    //The load code is pretty much the same as in Configuration.cpp to load pusher plugins
    _configPath = path;
    _pluginSettings = pluginSettings;
    boost::property_tree::iptree cfg;
70
71
72

    if (_configPath != "" && _configPath[_configPath.length()-1] != '/')
        _configPath.append("/");
73
74
    else if(_configPath.empty())
        _configPath = "./";
75

Alessio Netti's avatar
Alessio Netti committed
76
    try {
77
        boost::property_tree::read_info(_configPath + globalFile, cfg);
Alessio Netti's avatar
Alessio Netti committed
78
    } catch (boost::property_tree::info_parser_error& e) {
Alessio Netti's avatar
Alessio Netti committed
79
        LOG(error) << "Error when reading analyzer plugins from " << globalFile << ": " << e.what();
Alessio Netti's avatar
Alessio Netti committed
80
81
82
        return false;
    }

Alessio Netti's avatar
Alessio Netti committed
83
84
    if(cfg.find("analyzerPlugins") == cfg.not_found()) {
        LOG(warning) << "No analyzerPlugins block found, skipping data analytics initialization!";
Alessio Netti's avatar
Alessio Netti committed
85
86
        _status = LOADED;
        return true;
Alessio Netti's avatar
Alessio Netti committed
87
88
    }

Alessio Netti's avatar
Alessio Netti committed
89
    //Reading plugins
Alessio Netti's avatar
Alessio Netti committed
90
91
92
    BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("analyzerPlugins")) {
        if (boost::iequals(plugin.first, "analyzerPlugin")) {
            if (!plugin.second.empty()) {
93
                string pluginConfig="", pluginPath="";
Alessio Netti's avatar
Alessio Netti committed
94
95
                BOOST_FOREACH(boost::property_tree::iptree::value_type &val, plugin.second) {
                    if (boost::iequals(val.first, "path")) {
96
                        pluginPath = val.second.data();
Alessio Netti's avatar
Alessio Netti committed
97
98
99
100
101
102
                    } else if (boost::iequals(val.first, "config")) {
                        pluginConfig = val.second.data();
                    } else {
                        LOG(warning) << "  Value \"" << val.first << "\" not recognized. Omitting";
                    }
                }
103
104
105
106
107
108
109
110
                if(!loadPlugin(plugin.second.data(), pluginPath, pluginConfig))
                    return false;
            }
        }
    }
    _status = LOADED;
    return true;
}
Alessio Netti's avatar
Alessio Netti committed
111

112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
bool AnalyticsManager::loadPlugin(const string& name, const string& pluginPath, const string& config) {
    LOG(info) << "Loading analyzer plugin \"" << name << "\"...";
    std::string pluginConfig; //path to config file for plugin
    std::string pluginLib = "libdcdbanalyzer_" + name;
#if __APPLE__
    pluginLib+= ".dylib";
#else
    pluginLib+= ".so";
#endif
    
    std::string iPath = pluginPath;
    // If path not specified we will look up in the default lib-directories (usr/lib and friends)
    if (iPath != "") {
        if (iPath[iPath.length()-1] != '/')
            iPath.append("/");
        pluginLib = iPath + pluginLib;
    }
    
    pluginConfig = config;
    // If config-path not specified we will look for pluginName.conf in the global conf directory
    if (pluginConfig == "")
        pluginConfig = _configPath + name + ".conf";
        
    // Open dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
    if (FILE *file = fopen(pluginConfig.c_str(), "r")) {
        fclose(file);
        an_dl_t dynLib;
        dynLib.id = name;
        dynLib.DL = NULL;
        dynLib.configurator = NULL;

        // If plugin.conf exists, open libdcdbanalyzer_pluginName.so and read config
        LOG(info) << pluginConfig << " found";
        dynLib.DL = dlopen(pluginLib.c_str(), RTLD_NOW);
        if(!dynLib.DL) {
            LOG(error) << "Cannot load " << dynLib.id << "-library: " << dlerror();
            return false;
        }
        dlerror();

        // Set dynLib an_dl_t struct, load create and destroy symbols
        dynLib.create = (an_create_t*) dlsym(dynLib.DL, "create");
        const char* dlsym_error = dlerror();
        if (dlsym_error) {
            LOG(error) << "Cannot load symbol create for " << dynLib.id << ": " << dlsym_error;
            return false;
        }
Alessio Netti's avatar
Alessio Netti committed
159

160
161
162
163
164
165
        dynLib.destroy = (an_destroy_t*) dlsym(dynLib.DL, "destroy");
        dlsym_error = dlerror();
        if (dlsym_error) {
            LOG(error) << "Cannot load symbol destroy for " << dynLib.id << ": " << dlsym_error;
            return false;
        }
Alessio Netti's avatar
Alessio Netti committed
166

167
168
169
170
171
172
173
174
175
176
177
178
179
180
        dynLib.configurator = dynLib.create();
        dynLib.configurator->setGlobalSettings(_pluginSettings);
        // Read the analyzer plugin configuration
        if (!(dynLib.configurator->readConfig(pluginConfig))) {
            LOG(error) << "Plugin \"" << dynLib.id << "\" could not read configuration!";
            return false;
        }

        // Returning an empty vector may indicate problems with the config file
        if(dynLib.configurator->getAnalyzers().size() == 0) {
            LOG(warning) << "Plugin \"" << dynLib.id << "\" created no analyzers!";
        } else if(!checkTopics(dynLib)) {
            LOG(error) << "Problematic MQTT topics or sensor names, please check your config files!";
            return false;
Alessio Netti's avatar
Alessio Netti committed
181
        }
182
183
184
185
186
187
        //save dl-struct
        _plugins.push_back(dynLib);
        LOG(info) << "Plugin \"" << dynLib.id << "\" loaded!";
    } else {
        LOG(info) << pluginConfig << " not found. Omitting";
        return false;
Alessio Netti's avatar
Alessio Netti committed
188
189
190
191
    }
    return true;
}

192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
void AnalyticsManager::unloadPlugin(const string& id) {
    for (auto it = _plugins.begin(); it != _plugins.end(); ++it) {
        if(it->id == id || id == "") {
            for (const auto& a : it->configurator->getAnalyzers())
                a->stop();

            removeTopics(*it);

            if (it->configurator)
                it->destroy(it->configurator);
            
            if (it->DL)
                dlclose(it->DL);

            if (id != "") {
                _plugins.erase(it);
                return;
            }
        }
    }

    if (id == "")
        _plugins.clear();
}

Alessio Netti's avatar
Alessio Netti committed
217
bool AnalyticsManager::init(boost::asio::io_service& io, const string& plugin) {
Alessio Netti's avatar
Alessio Netti committed
218
219
220
221
    if(_status != LOADED) {
        LOG(error) << "Cannot init, AnalyticsManager is not loaded!";
        return false;
    }
Alessio Netti's avatar
Alessio Netti committed
222
    bool out=false;
Alessio Netti's avatar
Alessio Netti committed
223
224
    for (const auto &p : _plugins)
        //Actions always affect either one or all plugins, and always all analyzers within said plugin
225
        if(plugin=="" || plugin==p.id) {
Alessio Netti's avatar
Alessio Netti committed
226
            out = true;
227
            LOG(info) << "Init \"" << p.id << "\" data analytics plugin";
Alessio Netti's avatar
Alessio Netti committed
228
229
            for (const auto &a : p.configurator->getAnalyzers())
                a->init(io);
230
        }
Alessio Netti's avatar
Alessio Netti committed
231
    return out;
Alessio Netti's avatar
Alessio Netti committed
232
233
234
}

bool AnalyticsManager::reload(boost::asio::io_service& io, const string& plugin) {
Alessio Netti's avatar
Alessio Netti committed
235
236
237
238
    if(_status != LOADED) {
        LOG(error) << "Cannot reload, AnalyticsManager is not loaded!";
        return false;
    }
Alessio Netti's avatar
Alessio Netti committed
239
    bool out=false;
Alessio Netti's avatar
Alessio Netti committed
240
241
    for (const auto &p : _plugins)
        if(plugin=="" || plugin==p.id) {
242
            LOG(info) << "Reload \"" << p.id << "\" data analytics plugin";
Alessio Netti's avatar
Alessio Netti committed
243
            out = true;
Alessio Netti's avatar
Alessio Netti committed
244
245
246
247
            //Removing obsolete MQTT topics
            removeTopics(p);
            //Reloading plugin
            if(!p.configurator->reReadConfig())
Alessio Netti's avatar
Alessio Netti committed
248
                return false;
Alessio Netti's avatar
Alessio Netti committed
249
250
251
252
253
254
255
256
            //Checking new MQTT topics
            if(!checkTopics(p)) {
                removeTopics(p);
                p.configurator->clearConfig();
                return false;
            } else
                for (const auto &a : p.configurator->getAnalyzers())
                    a->init(io);
Alessio Netti's avatar
Alessio Netti committed
257
        }
Alessio Netti's avatar
Alessio Netti committed
258
    return out;
Alessio Netti's avatar
Alessio Netti committed
259
260
}

Alessio Netti's avatar
Alessio Netti committed
261
262
void AnalyticsManager::removeTopics(an_dl_t p) {
    MQTTChecker& mqttCheck = MQTTChecker::getInstance();
Alessio Netti's avatar
Alessio Netti committed
263
    for(const auto& a : p.configurator->getAnalyzers()) {
Alessio Netti's avatar
Alessio Netti committed
264
        mqttCheck.removeGroup(a->getName());
265
        if (a->getStreaming()) {
Alessio Netti's avatar
Alessio Netti committed
266
            for (const auto &u : a->getUnits())
Alessio Netti's avatar
Alessio Netti committed
267
                for (const auto &o: u->getBaseOutputs()) {
Alessio Netti's avatar
Alessio Netti committed
268
                    mqttCheck.removeTopic(o->getMqtt());
Alessio Netti's avatar
Alessio Netti committed
269
270
                    mqttCheck.removeName(o->getName());
                }
271
272
            a->releaseUnits();
        }
Alessio Netti's avatar
Alessio Netti committed
273
    }
Alessio Netti's avatar
Alessio Netti committed
274
275
276
277
278
}

bool AnalyticsManager::checkTopics(an_dl_t p) {
    MQTTChecker& mqttCheck = MQTTChecker::getInstance();
    bool validTopics=true;
Alessio Netti's avatar
Alessio Netti committed
279
    for(const auto& a : p.configurator->getAnalyzers()) {
Alessio Netti's avatar
Alessio Netti committed
280
        if (!mqttCheck.checkGroup(a->getName()))
Alessio Netti's avatar
Alessio Netti committed
281
            validTopics = false;
282
        if (a->getStreaming()) {
Alessio Netti's avatar
Alessio Netti committed
283
284
            for (const auto &u : a->getUnits())
                for (const auto &o: u->getBaseOutputs())
Alessio Netti's avatar
Alessio Netti committed
285
                    if (!mqttCheck.checkTopic(o->getMqtt()) || !mqttCheck.checkName(o->getName()))
Alessio Netti's avatar
Alessio Netti committed
286
                        validTopics = false;
287
288
            a->releaseUnits();
        }
Alessio Netti's avatar
Alessio Netti committed
289
    }
Alessio Netti's avatar
Alessio Netti committed
290
291
292
    return validTopics;
}

Alessio Netti's avatar
Alessio Netti committed
293
bool AnalyticsManager::start(const string& plugin, const string& analyzer) {
Alessio Netti's avatar
Alessio Netti committed
294
295
296
297
    if(_status != LOADED) {
        LOG(error) << "Cannot start, AnalyticsManager is not loaded!";
        return false;
    }
Alessio Netti's avatar
Alessio Netti committed
298
    bool out=false;
Alessio Netti's avatar
Alessio Netti committed
299
    for (const auto &p : _plugins)
300
301
        if(plugin=="" || plugin==p.id) {
            LOG(info) << "Start \"" << p.id << "\" data analytics plugin";
Alessio Netti's avatar
Alessio Netti committed
302
            for (const auto &a : p.configurator->getAnalyzers())
Alessio Netti's avatar
Alessio Netti committed
303
304
305
306
307
                // Only streaming analyzers can be started
                if(a->getStreaming() && (analyzer=="" || analyzer==a->getName())) {
                    a->start();
                    out=true;
                }
308
        }
Alessio Netti's avatar
Alessio Netti committed
309
    return out;
Alessio Netti's avatar
Alessio Netti committed
310
311
}

Alessio Netti's avatar
Alessio Netti committed
312
bool AnalyticsManager::stop(const string& plugin, const string& analyzer) {
Alessio Netti's avatar
Alessio Netti committed
313
314
315
316
    if(_status != LOADED) {
        LOG(error) << "Cannot stop, AnalyticsManager is not loaded!";
        return false;
    }
Alessio Netti's avatar
Alessio Netti committed
317
    bool out=false;
Alessio Netti's avatar
Alessio Netti committed
318
    for (const auto &p : _plugins)
319
320
        if(plugin=="" || plugin==p.id) {
            LOG(info) << "Stop \"" << p.id << "\" data analytics plugin";
Alessio Netti's avatar
Alessio Netti committed
321
            for (const auto &a : p.configurator->getAnalyzers())
Alessio Netti's avatar
Alessio Netti committed
322
323
324
325
326
                // Only streaming analyzers can be stopped
                if(a->getStreaming() && (analyzer=="" || analyzer==a->getName())) {
                    a->stop();
                    out=true;
                }
327
        }
Alessio Netti's avatar
Alessio Netti committed
328
    return out;
Alessio Netti's avatar
Alessio Netti committed
329
}
Micha Mueller's avatar
Micha Mueller committed
330

331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
/******************************************************************************/
/*      Rest API endpoint methods                                             */
/******************************************************************************/

#define stdBind(fun) std::bind(&AnalyticsManager::fun, \
          this, \
          std::placeholders::_1, \
          std::placeholders::_2)

void AnalyticsManager::addRestEndpoints(RESTHttpsServer* restServer) {
    restServer->addEndpoint("/analytics/help",      {http::verb::get, stdBind(GET_analytics_help)});
    restServer->addEndpoint("/analytics/plugins",   {http::verb::get, stdBind(GET_analytics_plugins)});
    restServer->addEndpoint("/analytics/sensors",   {http::verb::get, stdBind(GET_analytics_sensors)});
    restServer->addEndpoint("/analytics/units",     {http::verb::get, stdBind(GET_analytics_units)});
    restServer->addEndpoint("/analytics/analyzers", {http::verb::get, stdBind(GET_analytics_analyzers)});

    restServer->addEndpoint("/analytics/start",    {http::verb::put, stdBind(PUT_analytics_start)});
    restServer->addEndpoint("/analytics/stop",     {http::verb::put, stdBind(PUT_analytics_stop)});
    restServer->addEndpoint("/analytics/compute",  {http::verb::put, stdBind(PUT_analytics_compute)});
    restServer->addEndpoint("/analytics/analyzer", {http::verb::put, stdBind(PUT_analytics_analyzer)});
}

void AnalyticsManager::GET_analytics_help(endpointArgs){
    if (!managerLoaded(res)) {
        return;
    }
357
    res.body() = restCheatSheet;
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
    res.result(http::status::ok);
}

void AnalyticsManager::GET_analytics_plugins(endpointArgs) {
    if (!managerLoaded(res)) {
        return;
    }
    std::ostringstream data;
    if (getQuery("json", queries) == "true") {
        boost::property_tree::ptree root, plugins;
        for(const auto& p : _plugins) {
            plugins.put(p.id, "");
        }
        root.add_child("plugins", plugins);
        boost::property_tree::write_json(data, root, true);
    } else {
        for(const auto& p : _plugins) {
            data << p.id << "\n";
        }
    }
    res.body() = data.str();
    res.result(http::status::ok);
}

void AnalyticsManager::GET_analytics_sensors(endpointArgs) {
    if (!managerLoaded(res)) {
        return;
    }

    const std::string plugin   = getQuery("plugin", queries);
    const std::string analyzer = getQuery("analyzer", queries);

    if (!hasPlugin(plugin, res)) {
        return;
    }

    bool found = false;
    std::ostringstream data;

    for (const auto& p : _plugins) {
        if (p.id == plugin) {
            if (getQuery("json", queries) == "true") {
                boost::property_tree::ptree root, sensors;

                // In JSON mode, sensors are arranged hierarchically by plugin->analyzer->sensor
                for (const auto& a : p.configurator->getAnalyzers()) {
                    if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
                        found = true;
                        boost::property_tree::ptree group;
                        for (const auto& u : a->getUnits()) {
                            for (const auto& s : u->getBaseOutputs()) {
                                // Explicitly adding nodes to the ptree as to prevent BOOST from performing
                                // parsing on the node names
411
                                group.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(s->getMqtt())));
412
413
                            }
                        }
414
                        a->releaseUnits();
415
416
417
418
419
420
421
422
423
424
425
                        sensors.add_child(a->getName(), group);
                    }
                }
                root.add_child(p.id, sensors);
                boost::property_tree::write_json(data, root, true);
            } else {
                for (const auto& a : p.configurator->getAnalyzers()) {
                    if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
                        found = true;
                        for (const auto& u : a->getUnits()) {
                            for (const auto& s : u->getBaseOutputs()) {
426
                                data << a->getName() << "::" << s->getMqtt() << "\n";
427
428
                            }
                        }
429
                        a->releaseUnits();
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
                    }
                }
            }
            res.body() = data.str();
            res.result(http::status::ok);
            break;
        }
    }
    if (!found) {
        res.body() = "Plugin or analyzer not found!\n";
        res.result(http::status::not_found);
    }
}

void AnalyticsManager::GET_analytics_units(endpointArgs) {
    if (!managerLoaded(res)) {
        return;
    }

    const std::string plugin   = getQuery("plugin", queries);
    const std::string analyzer = getQuery("analyzer", queries);

    if (!hasPlugin(plugin, res)) {
        return;
    }

    bool found = false;
    std::ostringstream data;

    for (const auto& p : _plugins) {
        if (p.id == plugin) {
            if (getQuery("json", queries) == "true") {
                boost::property_tree::ptree root, units;

                // In JSON mode, sensors are arranged hierarchically by plugin->analyzer->sensor
                for (const auto& a : p.configurator->getAnalyzers())
                    if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
                        found = true;
                        boost::property_tree::ptree group;
                        for (const auto& u : a->getUnits()) {
470
                            group.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(u->getName())));
471
                        }
472
                        a->releaseUnits();
473
474
475
476
477
478
479
480
481
                        units.add_child(a->getName(), group);
                    }
                root.add_child(p.id, units);
                boost::property_tree::write_json(data, root, true);
            } else {
                for (const auto& a : p.configurator->getAnalyzers()) {
                    if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
                        found = true;
                        for (const auto& u : a->getUnits()) {
482
                            data << a->getName() << "::" << u->getName() << "\n";
483
                        }
484
                        a->releaseUnits();
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
                    }
                }
            }
            res.body() = data.str();
            res.result(http::status::ok);
            break;
        }
    }
    if (!found) {
        res.body() = "Plugin or analyzer not found!\n";
        res.result(http::status::not_found);
    }
}

void AnalyticsManager::GET_analytics_analyzers(endpointArgs) {
    if (!managerLoaded(res)) {
        return;
    }

    const std::string plugin   = getQuery("plugin", queries);

    if (!hasPlugin(plugin, res)) {
        return;
    }

    std::ostringstream data;

    for (const auto& p : _plugins) {
        if (p.id == plugin) {
            if (getQuery("json", queries) == "true") {
                boost::property_tree::ptree root, analyzers;

                // For each analyzer, we output its type as well
                for (const auto& a : p.configurator->getAnalyzers()) {
                    analyzers.push_back(boost::property_tree::ptree::value_type(a->getName(), boost::property_tree::ptree(a->getStreaming() ? "streaming" : "on-demand")));
                }
                root.add_child(p.id, analyzers);
                boost::property_tree::write_json(data, root, true);
            } else {
                for (const auto& a : p.configurator->getAnalyzers()) {
                    data << a->getName() << " " << (a->getStreaming() ? "streaming\n" : "on-demand\n");
                }
            }
            res.body() = data.str();
            res.result(http::status::ok);
            return;
        }
    }
}


void AnalyticsManager::PUT_analytics_start(endpointArgs) {
    if (!managerLoaded(res)) {
        return;
    }

    const std::string plugin   = getQuery("plugin", queries);
    const std::string analyzer = getQuery("analyzer", queries);

    if (start(plugin, analyzer)) {
        res.body() = "Plugin " + plugin + " " + analyzer + ": Sensors started!\n";
        res.result(http::status::ok);
    } else {
        res.body() = "Plugin or analyzer not found!\n";
        res.result(http::status::not_found);
    }
}

void AnalyticsManager::PUT_analytics_stop(endpointArgs) {
    if (!managerLoaded(res)) {
        return;
    }

    const std::string plugin   = getQuery("plugin", queries);
    const std::string analyzer = getQuery("analyzer", queries);

    if (stop(plugin, analyzer)) {
        res.body() = "Plugin " + plugin + " " + analyzer + ": Sensors stopped!\n";
        res.result(http::status::ok);
    } else {
        res.body() = "Plugin or analyzer not found!\n";
        res.result(http::status::not_found);
    }
}

void AnalyticsManager::PUT_analytics_compute(endpointArgs) {
    if (!managerLoaded(res)) {
        return;
    }

    const std::string plugin   = getQuery("plugin", queries);
    const std::string analyzer = getQuery("analyzer", queries);
577
    std::string unit = getQuery("unit", queries);
578
579

    if (plugin == "" || analyzer == "") {
580
        res.body() = "Request malformed: plugin or analyzer query missing\n";
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
        res.result(http::status::bad_request);
        return;
    }

    if (unit == "") {
        unit = SensorNavigator::rootKey;
    }

    res.body() = "Plugin or analyzer not found!\n";
    res.result(http::status::not_found);

    std::ostringstream data;

    bool unitFound=false;
    for (const auto &p : _plugins) {
        if (p.id == plugin) {
            for (const auto &a : p.configurator->getAnalyzers()) {
                if( a->getName() == analyzer ) {
                    std::map<std::string, reading_t> outMap;
                    try {
                        outMap = a->computeOnDemand(unit);
                        unitFound = true;
                    } catch(const domain_error& e) {
                        // In the particular case where an analyzer is duplicated, it could be that the right
                        // unit is found only after a few tries. Therefore, we handle the domain_error
                        // exception raised in AnalyzerTemplate, and allow the search to continue
                        if(a->getStreaming() && a->getDuplicate()) {
                            continue;
                        } else {
Alessio Netti's avatar
Alessio Netti committed
610
                            res.body() = string(e.what()) + string("\n");
611
612
613
                            res.result(http::status::not_found);
                            return;
                        }
Alessio Netti's avatar
Alessio Netti committed
614
                    } catch(const exception& e) {
Alessio Netti's avatar
Alessio Netti committed
615
                        res.body() = string(e.what()) + string("\n");
Alessio Netti's avatar
Alessio Netti committed
616
617
                        res.result(http::status::internal_server_error);
                        return;
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
                    }
                    if (getQuery("json", queries) == "true") {
                        boost::property_tree::ptree root, outputs;
                        // Iterating through the outputs of the on-demand computation and adding them to a JSON
                        for (const auto& kv : outMap) {
                            boost::property_tree::ptree sensor;
                            sensor.push_back(boost::property_tree::ptree::value_type("timestamp", boost::property_tree::ptree(to_string(kv.second.timestamp))));
                            sensor.push_back(boost::property_tree::ptree::value_type("value", boost::property_tree::ptree(to_string(kv.second.value))));
                            outputs.push_back(boost::property_tree::ptree::value_type(kv.first, sensor));
                        }
                        root.add_child(a->getName(), outputs);
                        boost::property_tree::write_json(data, root, true);
                    } else {
                        for (const auto& kv : outMap) {
                            data << kv.first << " ts: " << kv.second.timestamp << " v: " << kv.second.value << "\n";
                        }
                    }
                    res.body() = data.str();
                    res.result(http::status::ok);
                    return;
                }
            }
        }
    }
    // This if branch is accessed only if the target analyzer is streaming and duplicated
    if(!unitFound) {
644
        res.body() = "Node " + unit + " does not belong to the domain of " + analyzer + "\n!";
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
        res.result(http::status::not_found);
    }

}

void AnalyticsManager::PUT_analytics_analyzer(endpointArgs) {
    if (!managerLoaded(res)) {
        return;
    }

    const std::string plugin   = getQuery("plugin", queries);
    const std::string analyzer = getQuery("analyzer", queries);
    const std::string action   = getQuery("action", queries);

    if (plugin == "" || action == "") {
660
        res.body() = "Request malformed: plugin or action query missing.\n";
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
        res.result(http::status::bad_request);
        return;
    }

    res.body() = "Plugin or analyzer not found!\n";
    res.result(http::status::not_found);

    // Managing custom REST PUT actions defined at the analyzer level
    for (const auto &p : _plugins) {
        if (p.id == plugin) {
            for (const auto &a : p.configurator->getAnalyzers()) {
                if (analyzer == "" || analyzer == a->getName()) {
                    // Any thrown exception is catched outside in the HTTPserver
                    try {
                        restResponse_t reply = a->REST(action, queries);
                        res.body() = reply.data;
                        res.body() += reply.response;
                        res.result(http::status::ok);
                    } catch(const std::invalid_argument &e) {
Alessio Netti's avatar
Alessio Netti committed
680
                        res.body() = string(e.what()) + string("\n");
681
682
                        res.result(http::status::bad_request);
                    } catch(const std::domain_error &e) {
Alessio Netti's avatar
Alessio Netti committed
683
                        res.body() = string(e.what()) + string("\n");
684
685
                        res.result(http::status::not_found);
                    } catch(const std::exception &e) {
Alessio Netti's avatar
Alessio Netti committed
686
                        res.body() = string(e.what()) + string("\n");
687
688
689
690
691
692
693
                        res.result(http::status::internal_server_error);
                    }
                }
            }
        }
    }
}