analyticscontroller.h 6.79 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//
// Created by Netti, Alessio on 11.04.19.
//

#ifndef PROJECT_ANALYTICSCONTROLLER_H
#define PROJECT_ANALYTICSCONTROLLER_H

#include <list>
#include <vector>
#include <dcdb/sensordatastore.h>
#include <dcdb/sensorconfig.h>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include "../analytics/AnalyticsManager.h"
#include "sensornavigator.h"
#include "sensorcache.h"
#include "configuration.h"
#include "logging.h"

using namespace std;

Alessio Netti's avatar
Alessio Netti committed
22
23
24
25
26
27
28
29
/**
* Class implementing a wrapper around the AnalyticsManager
* 
* This class provides a wrapper around many features required for the instantiation of the Analytics Manager - namely
* the instantiation of a SensorNavigator, the creation of a dedicated thread pool, and the insertion of generated
* sensor values into the Cassandra datastore. Most of these features were already available in DCDBPusher to handle
* regular sensors, but they had to be ported over to the CollectAgent.
*/
30
31
32
33
class AnalyticsController {

public:

Alessio Netti's avatar
Alessio Netti committed
34
35
36
37
38
39
    /**
    * @brief            Class constructor
    * 
    * @param dcdbCfg    SensorConfig object to be used to retrieve sensor meta-data from Cassandra
    * @param dcdbStore  SensorDataStore object to be used to insert sensor readings into Cassandra
    */
40
41
42
43
44
45
46
47
48
49
50
51
    AnalyticsController(DCDB::SensorConfig *dcdbCfg, DCDB::SensorDataStore *dcdbStore) {
        _dcdbCfg = dcdbCfg;
        _dcdbStore = dcdbStore;
        _manager = make_shared<AnalyticsManager>();
        _navigator = nullptr;
        _sensorCache = nullptr;
        _keepRunning = false;
        _doHalt = false;
        _halted = true;
        _initialized = false;
    }

Alessio Netti's avatar
Alessio Netti committed
52
53
54
    /**
    * @brief            Class destructor
    */
55
56
    ~AnalyticsController() {}

Alessio Netti's avatar
Alessio Netti committed
57
58
59
60
61
    /**
    * @brief            Starts the internal thread of the controller
    *
    *                   Initialization must have been performed already at this point.
    */
62
    void start();
Alessio Netti's avatar
Alessio Netti committed
63
64
65
66
67
68

    /**
    * @brief            Stops the internal management thread
    *
    *                   This will also stop and join all threads in the BOOST ASIO pool.
    */
69
    void stop();
Alessio Netti's avatar
Alessio Netti committed
70
71
72
73
74
75
76
77
    
    /**
    * @brief            Initializes the data analytics infrastructure
    *
    *                   This method will build a Sensor Navigator by fecthing sensor names from the Cassandra datastore,
    *                   and then create an AnalyticsManager object, which will take care of instantiating and 
    *                   preparing plugins.
    *                   
Alessio Netti's avatar
Alessio Netti committed
78
    * @param settings   Settings class containing user-specified configuration parameters                 
Alessio Netti's avatar
Alessio Netti committed
79
80
81
    * @param configPath Path to the configuration files for the data analytics framework
    * @return           True if successful, false otherwise 
    */
Alessio Netti's avatar
Alessio Netti committed
82
    bool initialize(Configuration& settings, const string& configPath);
83

Alessio Netti's avatar
Alessio Netti committed
84
85
86
87
88
89
90
    /**
    * @brief            Sets the cache to be used for sensors
    *
    *                   This method allows to set the SensorCache object to be used to store sensor readings produced
    *                   by the data analytics framework.
    * @param cache      The SensorCache object to be used as cache
    */
91
92
    void setCache(SensorCache* cache)            { _sensorCache = cache; }

Alessio Netti's avatar
Alessio Netti committed
93
94
95
96
97
    /**
    * @brief            Returns the status of the internal thread
    *
    * @return           True if the controller is currently stopped, false otherwise
    */
Alessio Netti's avatar
Alessio Netti committed
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
    bool isHalted() { return _halted; }

    /**
    * @brief            Triggers a temporary halt of the internal management thread
    * 
    * @param wait       If set to true, the method returns only when the thread has stopped
    */
    void halt(bool wait=false) { 
        _doHalt = true; 
        if(wait) while (!_halted) { sleep(1); }
    }

    /**
    * @brief            Resumes the internal management thread
    */
    void resume() { _doHalt = false; }
Alessio Netti's avatar
Alessio Netti committed
114
115
116
117
118
119

    /**
    * @brief            Returns the internal AnalyticsManager object
    *
    * @return           A shared pointer to the internal AnalyticsManager object
    */
120
    shared_ptr<AnalyticsManager> getManager()    { return _manager; }
Alessio Netti's avatar
Alessio Netti committed
121
122
123
124
125
126

    /**
    * @brief            Returns the internal SensorNavigator object
    *
    * @return           A shared pointer to the internal SensorNavigator object
    */
127
    shared_ptr<SensorNavigator>  getNavigator()  { return _navigator; }
Alessio Netti's avatar
Alessio Netti committed
128
129
130
131
132
133

    /**
    * @brief            Returns the SensorCache object used to store readings
    *
    * @return           A pointer to a SensorCache object
    */
134
    SensorCache*                 getCache()      { return _sensorCache; }
Alessio Netti's avatar
Alessio Netti committed
135
136
137
138
139
140
141
142
143

    /**
    * @brief            Returns an insert counter for data analytics readings
    * 
    *                   This counter keeps track of how many inserts were performed into the Cassandra datastore for
    *                   data analytics-related operations since the last call to this method.
    *
    * @return           An insert counter to the Cassandra datastore
    */
144
145
    uint64_t                     getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; }

Alessio Netti's avatar
Alessio Netti committed
146
147
148
149
150
151
152
153
154
155
156
157
    /**
    * @brief            Supply a REST command to the manager
    *
    *                   This method simply forwards the request to the internal AnalyticsManager.
    *
    * @param pathStrs   resource path to be accessed
    * @param queries    vector of queries
    * @param method     Either GET or PUT
    * @return           Response as a <data, response> pair
    */
    restResponse_t REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method);

158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
private:

    // Method implementing the main loop of the internal thread
    void run();
    // Performs auto-publish of data analytics sensors, if required
    bool publishSensors();

    // Flag to keep track of the thread's status
    bool _keepRunning;
    // Flag to trigger temporary stops to the data analytics controller
    bool _doHalt;
    bool _halted;
    bool _initialized;
    // Objects to connect to the Cassandra datastore
    DCDB::SensorConfig *_dcdbCfg;
    DCDB::SensorDataStore *_dcdbStore;
    // Global sensor cache object for the collectagent
    SensorCache *_sensorCache;
    // Sensor navigator
    shared_ptr<SensorNavigator> _navigator;
    // Internal data analytics manager object
    shared_ptr<AnalyticsManager> _manager;
    // Misc configuration attributes
Alessio Netti's avatar
Alessio Netti committed
181
    Configuration _settings;
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
    string _configPath;
    // Readings counter
    uint64_t _readingCtr;

    // Main management thread for the analytics controller
    boost::thread _mainThread;
    // IO service for the analyzers
    boost::asio::io_service _io;
    // Underlying thread pool
    boost::thread_group _threads;
    // Dummy task to keep thread pool alive
    shared_ptr<boost::asio::io_service::work> _keepAliveWork;

    //Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};

#endif //PROJECT_ANALYTICSCONTROLLER_H