analyticscontroller.h 7.91 KB
Newer Older
1
2
3
//================================================================================
// Name        : analyticscontroller.h
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Wrapper class for OperatorManager.
7
8
9
10
11
12
13
14
15
16
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30
31
32
33
34
35
36

#ifndef PROJECT_ANALYTICSCONTROLLER_H
#define PROJECT_ANALYTICSCONTROLLER_H

#include <list>
#include <vector>
#include <dcdb/sensordatastore.h>
#include <dcdb/sensorconfig.h>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
Alessio Netti's avatar
Alessio Netti committed
37
#include <boost/algorithm/string/trim.hpp>
38
#include "../analytics/OperatorManager.h"
39
40
41
42
43
44
45
#include "sensornavigator.h"
#include "sensorcache.h"
#include "configuration.h"
#include "logging.h"

using namespace std;

Alessio Netti's avatar
Alessio Netti committed
46
/**
47
* @brief Class implementing a wrapper around the OperatorManager
Alessio Netti's avatar
Alessio Netti committed
48
* 
49
* @details This class provides a wrapper around many features required for the
50
*          instantiation of the Operator Manager - namely the instantiation of
51
52
53
54
55
56
*          a SensorNavigator, the creation of a dedicated thread pool, and the
*          insertion of generated sensor values into the Cassandra datastore.
*          Most of these features were already available in DCDBPusher to handle
*          regular sensors, but they had to be ported over to the CollectAgent.
*
* @ingroup ca
Alessio Netti's avatar
Alessio Netti committed
57
*/
58
59
60
61
class AnalyticsController {

public:

Alessio Netti's avatar
Alessio Netti committed
62
63
64
65
66
67
    /**
    * @brief            Class constructor
    * 
    * @param dcdbCfg    SensorConfig object to be used to retrieve sensor meta-data from Cassandra
    * @param dcdbStore  SensorDataStore object to be used to insert sensor readings into Cassandra
    */
68
69
70
    AnalyticsController(DCDB::SensorConfig *dcdbCfg, DCDB::SensorDataStore *dcdbStore) {
        _dcdbCfg = dcdbCfg;
        _dcdbStore = dcdbStore;
71
        _manager = make_shared<OperatorManager>();
72
73
74
75
76
77
78
79
        _navigator = nullptr;
        _sensorCache = nullptr;
        _keepRunning = false;
        _doHalt = false;
        _halted = true;
        _initialized = false;
    }

Alessio Netti's avatar
Alessio Netti committed
80
81
82
    /**
    * @brief            Class destructor
    */
83
84
    ~AnalyticsController() {}

Alessio Netti's avatar
Alessio Netti committed
85
86
87
88
89
    /**
    * @brief            Starts the internal thread of the controller
    *
    *                   Initialization must have been performed already at this point.
    */
90
    void start();
Alessio Netti's avatar
Alessio Netti committed
91
92
93
94
95
96

    /**
    * @brief            Stops the internal management thread
    *
    *                   This will also stop and join all threads in the BOOST ASIO pool.
    */
97
    void stop();
Alessio Netti's avatar
Alessio Netti committed
98
99
100
101
102
    
    /**
    * @brief            Initializes the data analytics infrastructure
    *
    *                   This method will build a Sensor Navigator by fecthing sensor names from the Cassandra datastore,
103
    *                   and then create an OperatorManager object, which will take care of instantiating and 
Alessio Netti's avatar
Alessio Netti committed
104
105
    *                   preparing plugins.
    *                   
Alessio Netti's avatar
Alessio Netti committed
106
    * @param settings   Settings class containing user-specified configuration parameters                 
Alessio Netti's avatar
Alessio Netti committed
107
108
109
    * @param configPath Path to the configuration files for the data analytics framework
    * @return           True if successful, false otherwise 
    */
Alessio Netti's avatar
Alessio Netti committed
110
    bool initialize(Configuration& settings, const string& configPath);
111

Alessio Netti's avatar
Alessio Netti committed
112
113
114
115
116
117
118
    /**
    * @brief            Sets the cache to be used for sensors
    *
    *                   This method allows to set the SensorCache object to be used to store sensor readings produced
    *                   by the data analytics framework.
    * @param cache      The SensorCache object to be used as cache
    */
119
120
    void setCache(SensorCache* cache)            { _sensorCache = cache; }

Alessio Netti's avatar
Alessio Netti committed
121
122
123
124
125
    /**
    * @brief            Returns the status of the internal thread
    *
    * @return           True if the controller is currently stopped, false otherwise
    */
Alessio Netti's avatar
Alessio Netti committed
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
    bool isHalted() { return _halted; }

    /**
    * @brief            Triggers a temporary halt of the internal management thread
    * 
    * @param wait       If set to true, the method returns only when the thread has stopped
    */
    void halt(bool wait=false) { 
        _doHalt = true; 
        if(wait) while (!_halted) { sleep(1); }
    }

    /**
    * @brief            Resumes the internal management thread
    */
    void resume() { _doHalt = false; }
Alessio Netti's avatar
Alessio Netti committed
142
143

    /**
144
    * @brief            Returns the internal OperatorManager object
Alessio Netti's avatar
Alessio Netti committed
145
    *
146
    * @return           A shared pointer to the internal OperatorManager object
Alessio Netti's avatar
Alessio Netti committed
147
    */
148
    shared_ptr<OperatorManager> getManager()    { return _manager; }
Alessio Netti's avatar
Alessio Netti committed
149
150
151
152
153
154

    /**
    * @brief            Returns the internal SensorNavigator object
    *
    * @return           A shared pointer to the internal SensorNavigator object
    */
155
    shared_ptr<SensorNavigator>  getNavigator()  { return _navigator; }
Alessio Netti's avatar
Alessio Netti committed
156
157
158
159
160
161

    /**
    * @brief            Returns the SensorCache object used to store readings
    *
    * @return           A pointer to a SensorCache object
    */
162
    SensorCache*                 getCache()      { return _sensorCache; }
Alessio Netti's avatar
Alessio Netti committed
163
164
165
166
167
168
169
170
171

    /**
    * @brief            Returns an insert counter for data analytics readings
    * 
    *                   This counter keeps track of how many inserts were performed into the Cassandra datastore for
    *                   data analytics-related operations since the last call to this method.
    *
    * @return           An insert counter to the Cassandra datastore
    */
172
173
    uint64_t                     getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; }

Alessio Netti's avatar
Alessio Netti committed
174
    /**
175
176
177
178
179
     * @brief   Return the io_service used by the analytics controller.
     *
     * @return  Reference to this object's boost::asio::io_service.
     */
    boost::asio::io_service&    getIoService() { return _io; }
Alessio Netti's avatar
Alessio Netti committed
180

181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
private:

    // Method implementing the main loop of the internal thread
    void run();
    // Performs auto-publish of data analytics sensors, if required
    bool publishSensors();

    // Flag to keep track of the thread's status
    bool _keepRunning;
    // Flag to trigger temporary stops to the data analytics controller
    bool _doHalt;
    bool _halted;
    bool _initialized;
    // Objects to connect to the Cassandra datastore
    DCDB::SensorConfig *_dcdbCfg;
    DCDB::SensorDataStore *_dcdbStore;
    // Global sensor cache object for the collectagent
    SensorCache *_sensorCache;
    // Sensor navigator
    shared_ptr<SensorNavigator> _navigator;
201
202
    // Internal data operator manager object
    shared_ptr<OperatorManager> _manager;
203
    // Misc configuration attributes
Alessio Netti's avatar
Alessio Netti committed
204
    Configuration _settings;
205
206
207
208
209
210
    string _configPath;
    // Readings counter
    uint64_t _readingCtr;

    // Main management thread for the analytics controller
    boost::thread _mainThread;
211
    // IO service for the operators
212
213
214
215
216
217
218
219
220
221
222
    boost::asio::io_service _io;
    // Underlying thread pool
    boost::thread_group _threads;
    // Dummy task to keep thread pool alive
    shared_ptr<boost::asio::io_service::work> _keepAliveWork;

    //Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};

#endif //PROJECT_ANALYTICSCONTROLLER_H