QueryEngine.h 16.1 KB
Newer Older
1
2
3
//================================================================================
// Name        : QueryEngine.h
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright   : Leibniz Supercomputing Centre
// Description : Class that grants query access to local and remote sensors.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27
28
29
30

#ifndef PROJECT_QUERYENGINE_H
#define PROJECT_QUERYENGINE_H

31
32
#include "sensornavigator.h"
#include "sensorbase.h"
33
#include "metadatastore.h"
Alessio Netti's avatar
Alessio Netti committed
34
#include <atomic>
Alessio Netti's avatar
Alessio Netti committed
35
36
37

using namespace std;

Alessio Netti's avatar
Alessio Netti committed
38
struct qeJobData {
39
40
41
42
    std::string jobId;
    std::string userId;
    uint64_t    startTime;
    uint64_t    endTime;
Alessio Netti's avatar
Alessio Netti committed
43
44
45
    std::list<std::string> nodes;
};

Alessio Netti's avatar
Alessio Netti committed
46
//Typedef for the callback used to retrieve sensors
47
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
Alessio Netti's avatar
Alessio Netti committed
48
//Typedef for the job retrieval callback
49
typedef bool (*QueryEngineJobCallback)(const string&, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
50
51
//Typedef for the metadata retrieval callback
typedef bool (*QueryEngineMetadataCallback)(const string&, SensorMetadata&);
Alessio Netti's avatar
Alessio Netti committed
52
53

/**
54
 * @brief Class that grants query access to local and remote sensors
Alessio Netti's avatar
Alessio Netti committed
55
 *
56
57
58
59
60
61
 * @details This class provides an abstraction layer to where the data analytics
 *          framework is executed: access interface to sensor data is the same
 *          on dcdbpusher and collectagent.
 *          This class is implemented according to the Singleton design pattern.
 *
 * @ingroup analytics
Alessio Netti's avatar
Alessio Netti committed
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
 */
class QueryEngine {

public:

    /**
    * @brief            Returns an instance to a QueryEngine object
    *
    *                   The QueryEngine class is implemented as a singleton: therefore, all entities calling the
    *                   getInstance method will share the same instance of the QueryEngine class.
    *
    * @returns          Reference to a QueryEngine object
    */
    static QueryEngine& getInstance() {
        static QueryEngine q;
        return q;
    }

    /**
    * @brief            Set SensorNavigator object
    *
    *                   This method sets the internal SensorNavigator object that can be used by other entities to
    *                   navigate the current sensor structure.
    *
    * @param navi       Pointer to a SensorNavigator object
    */
88
    void  setNavigator(shared_ptr<SensorNavigator> navi)         { _navigator = navi; }
89
90
91
92
93
94
95
96
97
98
99
    
    /**
     * @brief           Set internal map of sensors
     * 
     *                  In certain query callback implementations, a sensor map structure is used to make access to
     *                  sensor data more efficient. In this case, this method is to be used to set and expose such
     *                  structure correctly.
     * 
     * @param sMap      Pointer to a sensor map structure
     */
    void  setSensorMap(shared_ptr<map<string, SBasePtr>> sMap)    { _sensorMap = sMap; }
100
101
102
103
104
105
106
107
108
109

    /**
    * @brief            Set the current sensor hierarchy
    *
    *                   This method sets the internal string used to build the sensor tree in the Sensor Navigator.
    *                   This is used for convenience, so that access to the global settings is not necessary.
    *
    * @param hierarchy  String containing a sensor hierarchy
    */
    void  setSensorHierarchy(const string& hierarchy)            { _sensorHierarchy = hierarchy; }
Alessio Netti's avatar
Alessio Netti committed
110
111
112
113
114
115
116
117
118
119
    
    /**
     * @brief           Set the current sensor filter
     * 
     *                  This method sets the internal filter string used to discard sensors when building a 
     *                  SensorNavigator object.
     * 
     * @param filter    String containing the new filter 
     */
    void  setFilter(const string& filter)                        { _filter = filter; }
Alessio Netti's avatar
Alessio Netti committed
120

Alessio Netti's avatar
Alessio Netti committed
121
122
123
124
125
126
127
128
129
130
    /**
     * @brief           Set the current job filter
     * 
     *                  This method sets the internal filter string used by job operators to identify
     *                  the set of jobs for which they are responsible, based on their nodelist.
     * 
     * @param filter    String containing the new job filter 
     */
    void  setJobFilter(const string& jfilter)                        { _jobFilter = jfilter; }

Alessio Netti's avatar
Alessio Netti committed
131
132
133
134
135
136
137
138
139
140
    /**
    * @brief            Sets the internal callback to retrieve sensor data
    *
    *                   This method sets the internal callback that will be used by the QueryEngine to retrieve sensor
    *                   data and thus implement an abstraction layer. The callback must store all values for the input
    *                   sensor name, in the given time range, into the "buffer" vector. If such vector has not been
    *                   supplied (NULL), the callback must allocate and return a new one.
    *
    * @param cb         Pointer to a function of type QueryEngineCallback
    */
141
    void  setQueryCallback(QueryEngineCallback cb)                      { _callback = cb; }
Alessio Netti's avatar
Alessio Netti committed
142

Alessio Netti's avatar
Alessio Netti committed
143
144
145
146
147
148
149
150
151
152
153
    /**
    * @brief            Sets the internal callback to retrieve job data
    *
    *                   This method sets the internal callback that will be used by the QueryEngine to retrieve job
    *                   data and thus implement an abstraction layer. Behavior of the callback must be identical to
    *                   that specified in setQueryCallback.
    *
    * @param jcb        Pointer to a function of type QueryEngineJobCallback
    */
    void  setJobQueryCallback(QueryEngineJobCallback jcb)               { _jCallback = jcb; }

154
155
156
157
158
159
160
161
162
163
164
    /**
    * @brief            Sets the internal callback to retrieve sensor metadata data
    *
    *                   This method sets the internal callback that will be used by the QueryEngine to retrieve sensor
    *                   metadata and thus implement an abstraction layer. Behavior of the callback must be identical to
    *                   that specified in setQueryCallback.
    *
    * @param mcb        Pointer to a function of type QueryEngineMetadataCallback
    */
    void  setMetadataQueryCallback(QueryEngineMetadataCallback mcb)     { _mCallback = mcb; }

Alessio Netti's avatar
Alessio Netti committed
165
    /**
166
    * @brief            Returns the internal SensorNavigator object
Alessio Netti's avatar
Alessio Netti committed
167
    *
168
    * @return           Pointer to a SensorNavigator object
Alessio Netti's avatar
Alessio Netti committed
169
    */
170
    const shared_ptr<SensorNavigator> getNavigator()             { return _navigator; }
171
172
173
174
175
176
177
    
    /**
     * @brief           Returns the internal sensor map data structure
     * 
     * @return          Pointer to a sensor map 
     */
    const shared_ptr<map<string, SBasePtr>> getSensorMap()       { return _sensorMap; }
178
179
180
181
182
183
184

    /**
    * @brief            Returns the current sensor hierarchy
    *
    * @return           String containing the current sensor hierarchy
    */
    const string&  getSensorHierarchy()                          { return _sensorHierarchy; }
Alessio Netti's avatar
Alessio Netti committed
185

Alessio Netti's avatar
Alessio Netti committed
186
187
188
189
190
191
192
    /**
    * @brief            Returns the current sensor filter
    *
    * @return           String containing the current sensor filter
    */
    const string&  getFilter()                                   { return _filter; }

Alessio Netti's avatar
Alessio Netti committed
193
194
195
196
197
198
199
    /**
    * @brief            Returns the current job filter
    *
    * @return           String containing the current job filter
    */
    const string&  getJobFilter()                                   { return _jobFilter; }

Alessio Netti's avatar
Alessio Netti committed
200
201
202
203
204
205
206
    /**
    * @brief            Perform a sensor query
    *
    *                   This method allows to retrieve readings for a certain sensor in a given time range. The input
    *                   "buffer" vector allows to re-use memory over successive readings. Note that in order to use
    *                   this method, a callback must have been set through the setQueryCallback method. If not, this
    *                   method will throw an exception.
207
208
209
210
211
212
213
    *                   
    *                   The "rel" argument governs how the search is performed in local sensor caches: if set to true,
    *                   startTs and endTs indicate relative offsets against the most recent reading, and the returned
    *                   vector is a view of the cache whose range is computed statically in O(1), and therefore the
    *                   underlying data may be slightly unaligned depending on the sampling rate. If rel is set to
    *                   false, startTs and endTs are interpreted as absolute timestamps, and the cache view is 
    *                   determined by performing binary search with O(log(n)) complexity, thus resulting in a accurate 
214
215
216
217
218
219
    *                   time range. Relative and absolute mode have different data guarantees:
    *                   
    *                  - Relative: returned data is guaranteed to not be stale, but it can extend outside of the
    *                      queried range by a factor proportional to sensor's sampling rate.
    *                  - Absolute: returned data is guaranteed to not be stale and to be strictly within the queried
    *                      time range.
Alessio Netti's avatar
Alessio Netti committed
220
221
222
223
    *
    * @param name       Name of the sensor to be queried
    * @param startTs    Start timestamp (in nanoseconds) of the time range for the query
    * @param endTs      End timestamp (in nanoseconds) of the time range for the query. Must be >= startTs
224
    * @param buffer     Reference to a vector in which readings must be stored.
Alessio Netti's avatar
Alessio Netti committed
225
    * @param rel        If true, the input timestamps are considered to be relative offset against "now"
226
    * @return           True if successful, false otherwise
Alessio Netti's avatar
Alessio Netti committed
227
    */
228
    bool querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true) {
Alessio Netti's avatar
Alessio Netti committed
229
230
        if(!_callback)
            throw runtime_error("Query Engine: callback not set!");
Alessio Netti's avatar
Alessio Netti committed
231
        if((startTs > endTs && !rel) || (startTs < endTs && rel))
Alessio Netti's avatar
Alessio Netti committed
232
            throw invalid_argument("Query Engine: invalid time range!");
233
        return _callback(name, startTs, endTs, buffer, rel);
Alessio Netti's avatar
Alessio Netti committed
234
235
    }

Alessio Netti's avatar
Alessio Netti committed
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
    /**
    * @brief            Perform a job query
    *
    *                   This method allows to retrieve data for jobs running in a given time range. The input
    *                   "buffer" vector allows to re-use memory over successive readings. Note that in order to use
    *                   this method, a callback must have been set through the setJobQueryCallback method. If not, this
    *                   method will throw an exception.
    *                   
    *                   The "rel" argument governs how the search is performed in local sensor caches: if set to true,
    *                   startTs and endTs indicate relative offsets against the most recent reading, and the returned
    *                   vector is a view of the cache whose range is computed statically in O(1), and therefore the
    *                   underlying data may be slightly unaligned depending on the sampling rate. If rel is set to
    *                   false, startTs and endTs are interpreted as absolute timestamps, and the cache view is 
    *                   determined by performing binary search with O(log(n)) complexity, thus resulting in a accurate 
    *                   time range. This parameter does not affect the query method when using the Cassandra datastore.
    *
    * @param jobId      ID of the job to be retrieved (only if range=false)
    * @param startTs    Start timestamp (in nanoseconds) of the time range for the query (only if range=true)
    * @param endTs      End timestamp (in nanoseconds) of the time range for the query. (only if range=true)
255
    * @param buffer     Reference to a vector in which job info must be stored.
Alessio Netti's avatar
Alessio Netti committed
256
257
    * @param rel        If true, the input timestamps are considered to be relative offset against "now"
    * @param range      If true, the jobId parameter is ignored, and all jobs in the given time range are returned 
258
    * @return           True if successful, false otherwise
Alessio Netti's avatar
Alessio Netti committed
259
    */
260
    bool queryJob(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel=true, const bool range=false) {
Alessio Netti's avatar
Alessio Netti committed
261
262
263
264
265
266
267
        if(!_jCallback)
            throw runtime_error("Query Engine: job callback not set!");
        if((startTs > endTs && !rel) || (startTs < endTs && rel))
            throw invalid_argument("Query Engine: invalid time range!");
        return _jCallback(jobId, startTs, endTs, buffer, rel, range);
    }

268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
    /**
    * @brief            Perform a sensor metadata query
    *
    *                   This method allows to retrieve the metadata of available sensors. The input "buffer" object
    *                   allows to re-use memory over successive readings. Note that in order to use this method, a 
    *                   callback must have been set through the setMetadataQueryCallback method. If not, this
    *                   method will throw an exception.
    *
    * @param name       Name of the sensor to be queried
    * @param buffer     SensorMetadata object in which to store the result
    * @return           True if successful, false otherwise
    */
    bool queryMetadata(const string& name, SensorMetadata& buffer) {
        if(!_mCallback)
            throw runtime_error("Query Engine: sensor metadata callback not set!");
        return _mCallback(name, buffer);
    }

286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
    /**
     * @brief           Locks access to the QueryEngine
     * 
     *                  Once this method returns, the invoking thread can safely update all internal data structures
     *                  of the QueryEngine. the unlock() method must be called afterwards.
     */
    void lock() {
        // Locking out new threads from the QueryEngine callbacks
        updating.store(true);
        // Waiting until all previous threads have finished using the QueryEngine
        while(access.load()>0) {}
    }
    
    /**
     * @brief           Unlocks access to the QueryEngine
     * 
     *                  Must be called after a lock() call.
     */
    void unlock() {
        access.store(0);
Alessio Netti's avatar
Alessio Netti committed
306
307
        updating.store(false);
    }
308
309
    
    //Internal atomic flags used for utility purposes
Alessio Netti's avatar
Alessio Netti committed
310
    atomic<bool> updating;
311
    atomic<int>  access;
312

Alessio Netti's avatar
Alessio Netti committed
313
314
315
316
317
318
319
private:

    /**
    * @brief            Private class constructor
    */
    QueryEngine() {
        _navigator = NULL;
320
        _sensorMap = NULL;
Alessio Netti's avatar
Alessio Netti committed
321
        _callback = NULL;
Alessio Netti's avatar
Alessio Netti committed
322
        _jCallback = NULL;
323
        _mCallback = NULL;
Alessio Netti's avatar
Alessio Netti committed
324
        updating.store(false);
325
        access.store(0);
Alessio Netti's avatar
Alessio Netti committed
326
327
328
329
330
331
332
333
334
335
336
337
338
    }

    /**
    * @brief            Copy constructor is not available
    */
    QueryEngine(QueryEngine const&)     = delete;

    /**
    * @brief            Assignment operator is not available
    */
    void operator=(QueryEngine const&)  = delete;

    // Internal pointer to a SensorNavigator
339
    shared_ptr<SensorNavigator> _navigator;
340
341
    // Internal pointer to a sensor map used in certain query callback implementations
    shared_ptr<map<string, SBasePtr>> _sensorMap;
Alessio Netti's avatar
Alessio Netti committed
342
343
    // Callback used to retrieve sensor data
    QueryEngineCallback _callback;
Alessio Netti's avatar
Alessio Netti committed
344
345
    // Callback used to retrieve job data
    QueryEngineJobCallback _jCallback;
346
347
    // Callback used to retrieve metadata
    QueryEngineMetadataCallback _mCallback;
348
349
    // String storing the current sensor hierarchy, used for convenience
    string _sensorHierarchy;
Alessio Netti's avatar
Alessio Netti committed
350
351
    // String storing the filter to be used when building a sensor navigator
    string _filter;
Alessio Netti's avatar
Alessio Netti committed
352
353
    // String storing the job filter to be used by job operators
    string _jobFilter;
Alessio Netti's avatar
Alessio Netti committed
354
355
356
};

#endif //PROJECT_QUERYENGINE_H