QueryEngine.h 20 KB
Newer Older
1
2
3
//================================================================================
// Name        : QueryEngine.h
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright   : Leibniz Supercomputing Centre
// Description : Class that grants query access to local and remote sensors.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27
28
29
30

#ifndef PROJECT_QUERYENGINE_H
#define PROJECT_QUERYENGINE_H

31
32
#include "sensornavigator.h"
#include "sensorbase.h"
33
#include "metadatastore.h"
Alessio Netti's avatar
Alessio Netti committed
34
#include <atomic>
Alessio Netti's avatar
Alessio Netti committed
35
36
37

using namespace std;

Alessio Netti's avatar
Alessio Netti committed
38
struct qeJobData {
39
40
41
42
    std::string jobId;
    std::string userId;
    uint64_t    startTime;
    uint64_t    endTime;
Alessio Netti's avatar
Alessio Netti committed
43
44
45
    std::list<std::string> nodes;
};

Alessio Netti's avatar
Alessio Netti committed
46
//Typedef for the callback used to retrieve sensors
47
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool, const uint64_t);
48
//Typedef for the callback used to retrieve sensors
49
typedef bool (*QueryEngineGroupCallback)(const vector<string>&, const uint64_t, const uint64_t, vector<reading_t>&, const bool, const uint64_t);
Alessio Netti's avatar
Alessio Netti committed
50
//Typedef for the job retrieval callback
51
typedef bool (*QueryEngineJobCallback)(const string&, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
52
53
//Typedef for the metadata retrieval callback
typedef bool (*QueryEngineMetadataCallback)(const string&, SensorMetadata&);
Alessio Netti's avatar
Alessio Netti committed
54
55

/**
56
 * @brief Class that grants query access to local and remote sensors
Alessio Netti's avatar
Alessio Netti committed
57
 *
58
59
60
61
62
63
 * @details This class provides an abstraction layer to where the data analytics
 *          framework is executed: access interface to sensor data is the same
 *          on dcdbpusher and collectagent.
 *          This class is implemented according to the Singleton design pattern.
 *
 * @ingroup analytics
Alessio Netti's avatar
Alessio Netti committed
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
 */
class QueryEngine {

public:

    /**
    * @brief            Returns an instance to a QueryEngine object
    *
    *                   The QueryEngine class is implemented as a singleton: therefore, all entities calling the
    *                   getInstance method will share the same instance of the QueryEngine class.
    *
    * @returns          Reference to a QueryEngine object
    */
    static QueryEngine& getInstance() {
        static QueryEngine q;
        return q;
    }

    /**
    * @brief            Set SensorNavigator object
    *
    *                   This method sets the internal SensorNavigator object that can be used by other entities to
    *                   navigate the current sensor structure.
    *
    * @param navi       Pointer to a SensorNavigator object
    */
90
    void  setNavigator(shared_ptr<SensorNavigator> navi)         { _navigator = navi; }
91
92
93
94
95
96
97
98
99
100
101
    
    /**
     * @brief           Set internal map of sensors
     * 
     *                  In certain query callback implementations, a sensor map structure is used to make access to
     *                  sensor data more efficient. In this case, this method is to be used to set and expose such
     *                  structure correctly.
     * 
     * @param sMap      Pointer to a sensor map structure
     */
    void  setSensorMap(shared_ptr<map<string, SBasePtr>> sMap)    { _sensorMap = sMap; }
102
103
104
105
106
107
108
109
110
111

    /**
    * @brief            Set the current sensor hierarchy
    *
    *                   This method sets the internal string used to build the sensor tree in the Sensor Navigator.
    *                   This is used for convenience, so that access to the global settings is not necessary.
    *
    * @param hierarchy  String containing a sensor hierarchy
    */
    void  setSensorHierarchy(const string& hierarchy)            { _sensorHierarchy = hierarchy; }
Alessio Netti's avatar
Alessio Netti committed
112
113
114
115
116
117
118
119
120
121
    
    /**
     * @brief           Set the current sensor filter
     * 
     *                  This method sets the internal filter string used to discard sensors when building a 
     *                  SensorNavigator object.
     * 
     * @param filter    String containing the new filter 
     */
    void  setFilter(const string& filter)                        { _filter = filter; }
Alessio Netti's avatar
Alessio Netti committed
122

Alessio Netti's avatar
Alessio Netti committed
123
124
125
126
127
128
129
130
131
132
    /**
     * @brief           Set the current job filter
     * 
     *                  This method sets the internal filter string used by job operators to identify
     *                  the set of jobs for which they are responsible, based on their nodelist.
     * 
     * @param filter    String containing the new job filter 
     */
    void  setJobFilter(const string& jfilter)                        { _jobFilter = jfilter; }

133
134
135
136
137
138
139
140
141
142
143
    /**
     * @brief           Set the current job ID filter
     * 
     *                  This method sets the internal filter string used by job operators to identify
     *                  the set of jobs for which they are responsible, based on their ID. All jobs whose ID does not
     *                  match this filter are excluded.
     * 
     * @param jidfilter    String containing the new job ID filter 
     */
    void  setJobIDFilter(const string& jidfilter)                        { _jobIdFilter = jidfilter; }

144
145
146
147
148
149
150
151
152
153
154
155
    /**
     * @brief           Set the current job match string
     * 
     *                  The job match string is used to check which jobs must be processed by job operators.
     *                  For each node in the nodelist of a job, its hostname is filtered through the job filter.
     *                  If the mode of the filtered hostnames corresponds with the job match string, the job
     *                  is assigned to the job operator.
     * 
     * @param match    String containing the new job match string 
     */
    void  setJobMatch(const string& jMatch)                        { _jobMatch = jMatch; }

Alessio Netti's avatar
Alessio Netti committed
156
157
158
159
160
161
162
163
164
165
    /**
    * @brief            Sets the internal callback to retrieve sensor data
    *
    *                   This method sets the internal callback that will be used by the QueryEngine to retrieve sensor
    *                   data and thus implement an abstraction layer. The callback must store all values for the input
    *                   sensor name, in the given time range, into the "buffer" vector. If such vector has not been
    *                   supplied (NULL), the callback must allocate and return a new one.
    *
    * @param cb         Pointer to a function of type QueryEngineCallback
    */
166
    void  setQueryCallback(QueryEngineCallback cb)                      { _callback = cb; }
Alessio Netti's avatar
Alessio Netti committed
167

168
169
170
171
172
173
174
175
176
177
178
179
    /**
    * @brief            Sets the internal callback to retrieve sensor data in groups
    *
    *                   This method sets the internal callback that will be used by the QueryEngine to retrieve sensor
    *                   data and thus implement an abstraction layer. The callback must store all values for the input
    *                   vector of sensor names, in the given time range, into the "buffer" vector. If such vector has 
    *                   not been supplied (NULL), the callback must allocate and return a new one.
    *
    * @param cb         Pointer to a function of type QueryEngineCallback
    */
    void  setGroupQueryCallback(QueryEngineGroupCallback cb)                      { _gCallback = cb; }

Alessio Netti's avatar
Alessio Netti committed
180
181
182
183
184
185
186
187
188
189
190
    /**
    * @brief            Sets the internal callback to retrieve job data
    *
    *                   This method sets the internal callback that will be used by the QueryEngine to retrieve job
    *                   data and thus implement an abstraction layer. Behavior of the callback must be identical to
    *                   that specified in setQueryCallback.
    *
    * @param jcb        Pointer to a function of type QueryEngineJobCallback
    */
    void  setJobQueryCallback(QueryEngineJobCallback jcb)               { _jCallback = jcb; }

191
192
193
194
195
196
197
198
199
200
201
    /**
    * @brief            Sets the internal callback to retrieve sensor metadata data
    *
    *                   This method sets the internal callback that will be used by the QueryEngine to retrieve sensor
    *                   metadata and thus implement an abstraction layer. Behavior of the callback must be identical to
    *                   that specified in setQueryCallback.
    *
    * @param mcb        Pointer to a function of type QueryEngineMetadataCallback
    */
    void  setMetadataQueryCallback(QueryEngineMetadataCallback mcb)     { _mCallback = mcb; }

Alessio Netti's avatar
Alessio Netti committed
202
    /**
203
    * @brief            Returns the internal SensorNavigator object
Alessio Netti's avatar
Alessio Netti committed
204
    *
205
    * @return           Pointer to a SensorNavigator object
Alessio Netti's avatar
Alessio Netti committed
206
    */
207
    const shared_ptr<SensorNavigator> getNavigator()             { return _navigator; }
208
209
210
211
212
213
214
    
    /**
     * @brief           Returns the internal sensor map data structure
     * 
     * @return          Pointer to a sensor map 
     */
    const shared_ptr<map<string, SBasePtr>> getSensorMap()       { return _sensorMap; }
215
216
217
218
219
220
221

    /**
    * @brief            Returns the current sensor hierarchy
    *
    * @return           String containing the current sensor hierarchy
    */
    const string&  getSensorHierarchy()                          { return _sensorHierarchy; }
Alessio Netti's avatar
Alessio Netti committed
222

Alessio Netti's avatar
Alessio Netti committed
223
224
225
226
227
228
229
    /**
    * @brief            Returns the current sensor filter
    *
    * @return           String containing the current sensor filter
    */
    const string&  getFilter()                                   { return _filter; }

Alessio Netti's avatar
Alessio Netti committed
230
231
232
233
234
235
236
    /**
    * @brief            Returns the current job filter
    *
    * @return           String containing the current job filter
    */
    const string&  getJobFilter()                                   { return _jobFilter; }

237
238
239
240
241
242
243
    /**
    * @brief            Returns the current job ID filter
    *
    * @return           String containing the current job ID filter
    */
    const string&  getJobIdFilter()                                   { return _jobIdFilter; }

244
245
246
247
248
249
250
    /**
    * @brief            Returns the current job match string
    *
    * @return           String containing the current job match string
    */
    const string&  getJobMatch()                                   { return _jobMatch; }

Alessio Netti's avatar
Alessio Netti committed
251
252
253
254
255
256
257
    /**
    * @brief            Perform a sensor query
    *
    *                   This method allows to retrieve readings for a certain sensor in a given time range. The input
    *                   "buffer" vector allows to re-use memory over successive readings. Note that in order to use
    *                   this method, a callback must have been set through the setQueryCallback method. If not, this
    *                   method will throw an exception.
258
259
260
261
262
263
264
    *                   
    *                   The "rel" argument governs how the search is performed in local sensor caches: if set to true,
    *                   startTs and endTs indicate relative offsets against the most recent reading, and the returned
    *                   vector is a view of the cache whose range is computed statically in O(1), and therefore the
    *                   underlying data may be slightly unaligned depending on the sampling rate. If rel is set to
    *                   false, startTs and endTs are interpreted as absolute timestamps, and the cache view is 
    *                   determined by performing binary search with O(log(n)) complexity, thus resulting in a accurate 
265
266
267
268
269
270
    *                   time range. Relative and absolute mode have different data guarantees:
    *                   
    *                  - Relative: returned data is guaranteed to not be stale, but it can extend outside of the
    *                      queried range by a factor proportional to sensor's sampling rate.
    *                  - Absolute: returned data is guaranteed to not be stale and to be strictly within the queried
    *                      time range.
Alessio Netti's avatar
Alessio Netti committed
271
272
273
274
    *
    * @param name       Name of the sensor to be queried
    * @param startTs    Start timestamp (in nanoseconds) of the time range for the query
    * @param endTs      End timestamp (in nanoseconds) of the time range for the query. Must be >= startTs
275
    * @param buffer     Reference to a vector in which readings must be stored.
Alessio Netti's avatar
Alessio Netti committed
276
    * @param rel        If true, the input timestamps are considered to be relative offset against "now"
277
    * @param tol        Tolerance (in ns) for returned timestamps. Does not affect Cassandra range queries. 
278
    * @return           True if successful, false otherwise
Alessio Netti's avatar
Alessio Netti committed
279
    */
280
    bool querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true, const uint64_t tol=3600000000000) {
Alessio Netti's avatar
Alessio Netti committed
281
282
        if(!_callback)
            throw runtime_error("Query Engine: callback not set!");
Alessio Netti's avatar
Alessio Netti committed
283
        if((startTs > endTs && !rel) || (startTs < endTs && rel))
Alessio Netti's avatar
Alessio Netti committed
284
            throw invalid_argument("Query Engine: invalid time range!");
285
        return _callback(name, startTs, endTs, buffer, rel, tol);
Alessio Netti's avatar
Alessio Netti committed
286
287
    }

288
289
290
291
292
293
294
295
    /**
    * @brief            Perform a sensor query
    *
    *                   This is an overloaded version of the querySensor() method. It accepts a vector of sensor names
    *                   instead of a single sensor. These will be queried collectively, and the result is returned.
    *                   
    * @return           True if successful, false otherwise
    */
296
    bool querySensor(const vector<string>& names, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true, const uint64_t tol=3600000000000) {
297
298
299
300
        if(!_gCallback)
            throw runtime_error("Query Engine: callback not set!");
        if((startTs > endTs && !rel) || (startTs < endTs && rel))
            throw invalid_argument("Query Engine: invalid time range!");
301
        return _gCallback(names, startTs, endTs, buffer, rel, tol);
302
303
    }

Alessio Netti's avatar
Alessio Netti committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
    /**
    * @brief            Perform a job query
    *
    *                   This method allows to retrieve data for jobs running in a given time range. The input
    *                   "buffer" vector allows to re-use memory over successive readings. Note that in order to use
    *                   this method, a callback must have been set through the setJobQueryCallback method. If not, this
    *                   method will throw an exception.
    *                   
    *                   The "rel" argument governs how the search is performed in local sensor caches: if set to true,
    *                   startTs and endTs indicate relative offsets against the most recent reading, and the returned
    *                   vector is a view of the cache whose range is computed statically in O(1), and therefore the
    *                   underlying data may be slightly unaligned depending on the sampling rate. If rel is set to
    *                   false, startTs and endTs are interpreted as absolute timestamps, and the cache view is 
    *                   determined by performing binary search with O(log(n)) complexity, thus resulting in a accurate 
    *                   time range. This parameter does not affect the query method when using the Cassandra datastore.
    *
    * @param jobId      ID of the job to be retrieved (only if range=false)
    * @param startTs    Start timestamp (in nanoseconds) of the time range for the query (only if range=true)
    * @param endTs      End timestamp (in nanoseconds) of the time range for the query. (only if range=true)
323
    * @param buffer     Reference to a vector in which job info must be stored.
Alessio Netti's avatar
Alessio Netti committed
324
325
    * @param rel        If true, the input timestamps are considered to be relative offset against "now"
    * @param range      If true, the jobId parameter is ignored, and all jobs in the given time range are returned 
326
    * @return           True if successful, false otherwise
Alessio Netti's avatar
Alessio Netti committed
327
    */
328
    bool queryJob(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel=true, const bool range=false) {
Alessio Netti's avatar
Alessio Netti committed
329
330
331
332
333
334
335
        if(!_jCallback)
            throw runtime_error("Query Engine: job callback not set!");
        if((startTs > endTs && !rel) || (startTs < endTs && rel))
            throw invalid_argument("Query Engine: invalid time range!");
        return _jCallback(jobId, startTs, endTs, buffer, rel, range);
    }

336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
    /**
    * @brief            Perform a sensor metadata query
    *
    *                   This method allows to retrieve the metadata of available sensors. The input "buffer" object
    *                   allows to re-use memory over successive readings. Note that in order to use this method, a 
    *                   callback must have been set through the setMetadataQueryCallback method. If not, this
    *                   method will throw an exception.
    *
    * @param name       Name of the sensor to be queried
    * @param buffer     SensorMetadata object in which to store the result
    * @return           True if successful, false otherwise
    */
    bool queryMetadata(const string& name, SensorMetadata& buffer) {
        if(!_mCallback)
            throw runtime_error("Query Engine: sensor metadata callback not set!");
        return _mCallback(name, buffer);
    }

354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
    /**
     * @brief           Locks access to the QueryEngine
     * 
     *                  Once this method returns, the invoking thread can safely update all internal data structures
     *                  of the QueryEngine. the unlock() method must be called afterwards.
     */
    void lock() {
        // Locking out new threads from the QueryEngine callbacks
        updating.store(true);
        // Waiting until all previous threads have finished using the QueryEngine
        while(access.load()>0) {}
    }
    
    /**
     * @brief           Unlocks access to the QueryEngine
     * 
     *                  Must be called after a lock() call.
     */
    void unlock() {
        access.store(0);
Alessio Netti's avatar
Alessio Netti committed
374
375
        updating.store(false);
    }
376
377
    
    //Internal atomic flags used for utility purposes
Alessio Netti's avatar
Alessio Netti committed
378
    atomic<bool> updating;
379
    atomic<int>  access;
380

Alessio Netti's avatar
Alessio Netti committed
381
382
383
384
385
386
387
private:

    /**
    * @brief            Private class constructor
    */
    QueryEngine() {
        _navigator = NULL;
388
        _sensorMap = NULL;
Alessio Netti's avatar
Alessio Netti committed
389
        _callback = NULL;
Alessio Netti's avatar
Alessio Netti committed
390
        _jCallback = NULL;
391
        _mCallback = NULL;
Alessio Netti's avatar
Alessio Netti committed
392
        updating.store(false);
393
        access.store(0);
Alessio Netti's avatar
Alessio Netti committed
394
395
396
397
398
399
400
401
402
403
404
405
406
    }

    /**
    * @brief            Copy constructor is not available
    */
    QueryEngine(QueryEngine const&)     = delete;

    /**
    * @brief            Assignment operator is not available
    */
    void operator=(QueryEngine const&)  = delete;

    // Internal pointer to a SensorNavigator
407
    shared_ptr<SensorNavigator> _navigator;
408
409
    // Internal pointer to a sensor map used in certain query callback implementations
    shared_ptr<map<string, SBasePtr>> _sensorMap;
Alessio Netti's avatar
Alessio Netti committed
410
411
    // Callback used to retrieve sensor data
    QueryEngineCallback _callback;
412
413
    // Callback used to retrieve sensor data in groups
    QueryEngineGroupCallback _gCallback;
Alessio Netti's avatar
Alessio Netti committed
414
415
    // Callback used to retrieve job data
    QueryEngineJobCallback _jCallback;
416
417
    // Callback used to retrieve metadata
    QueryEngineMetadataCallback _mCallback;
418
419
    // String storing the current sensor hierarchy, used for convenience
    string _sensorHierarchy;
Alessio Netti's avatar
Alessio Netti committed
420
421
    // String storing the filter to be used when building a sensor navigator
    string _filter;
Alessio Netti's avatar
Alessio Netti committed
422
423
    // String storing the job filter to be used by job operators
    string _jobFilter;
424
425
    // String storing the matching string resulting from the job filter for a job to be processed
    string _jobMatch;
426
427
    // String storing the job ID filter to be used by job operators
    string _jobIdFilter;
Alessio Netti's avatar
Alessio Netti committed
428
429
430
};

#endif //PROJECT_QUERYENGINE_H