JobOperatorTemplate.h 15.4 KB
Newer Older
1
//================================================================================
2
// Name        : JobOperatorTemplate.h
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Template implementing features needed by Operators.
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

28
29
#ifndef PROJECT_JOBOPERATORTEMPLATE_H
#define PROJECT_JOBOPERATORTEMPLATE_H
30

31
#include "OperatorTemplate.h"
32
33

/**
34
35
 * @brief Template that implements features needed by Job Operators and
 *        complying to OperatorInterface.
36
 *
37
 * @details This template is derived from OperatorTemplate, and is adjusted to
38
 *          simplify job-related computations.
39
 *
40
 * @ingroup operator
41
42
 */
template <typename S>
43
class JobOperatorTemplate : virtual public OperatorTemplate<S> {
44
45
46
47
48
49
50
51
52
53
54
55
56
57
    // The template shall only be instantiated for classes which derive from SensorBase
    static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");

protected:
    
    // For readability
    using S_Ptr = shared_ptr<S>;
    using U_Ptr = shared_ptr< UnitTemplate<S> >;

public:
    
    /**
    * @brief            Class constructor
    *
58
    * @param name       Name of the operator
59
    */
60
    JobOperatorTemplate(const string name) :
61
            OperatorTemplate<S>(name) {
62
        
63
        _unitAccess.store(false);
64
        this->_dynamic = true;
Alessio Netti's avatar
Alessio Netti committed
65
        this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
66
        this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
Alessio Netti's avatar
Alessio Netti committed
67
        this->_jobFilter = boost::regex(this->_jobFilterStr);
68
69
70
71
72
73
    }

    /**
    * @brief            Copy constructor
    *
    */
74
    JobOperatorTemplate(const JobOperatorTemplate& other) :
75
            OperatorTemplate<S>(other) {
76
        
77
        _unitAccess.store(false);
78
        this->_dynamic = true;
Alessio Netti's avatar
Alessio Netti committed
79
        this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
80
        this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
Alessio Netti's avatar
Alessio Netti committed
81
        this->_jobFilter = boost::regex(this->_jobFilterStr);
82
83
84
85
86
87
    }

    /**
    * @brief            Assignment operator
    *
    */
88
89
    JobOperatorTemplate& operator=(const JobOperatorTemplate& other) {
        OperatorTemplate<S>::operator=(other);
90
        this->_dynamic = true;
Alessio Netti's avatar
Alessio Netti committed
91
        this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
92
        this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
Alessio Netti's avatar
Alessio Netti committed
93
        this->_jobFilter = boost::regex(this->_jobFilterStr);
94
        return *this;
95
96
97
98
99
    }
            
    /**
    * @brief            Class destructor
    */
100
    virtual ~JobOperatorTemplate() {}
101
    
102
    /**
103
    * @brief              Returns the units of this operator
104
105
    *
    *                     The units returned by this method are of the UnitInterface type. The actual units, in their
106
    *                     derived type, are used internally. This type of operator employs dynamic units that are
107
108
    *                     generated at runtime: as such, an internal unit lock is acquired upon calling this method,
    *                     and must later be released through the releaseUnits() method.
109
    *
110
    * @return             The vector of UnitInterface objects of this operator
111
    */
112
113
114
115
116
117
118
119
120
121
122
    virtual vector<UnitPtr>& getUnits() override	{
        // Spinlock to regulate access to units - normally innocuous
        while(_unitAccess.exchange(true)) {}
        return this->_baseUnits;
    }
    
    /**
     * @brief             Releases the access lock to units
     * 
     *                    This method must be called anytime operations on units are performed through getUnits().
     */
123
    virtual void releaseUnits() override {
124
        _unitAccess.store(false);
125
    }
126
    
127
128
129
130
    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
131
    *                     perform data analytics queries on the operator, which must have the _streaming attribute set
132
133
134
135
136
137
138
139
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Unit name for which the query must be performed
    * @return             a map<string, reading_t> containing the output of the query
    */
    virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
        map<string, reading_t> outMap;
140
        if( !this->_streaming ) {
141
            try {
142
                // Getting exclusive access to the operator
143
                while( this->_onDemandLock.exchange(true) ) {}
144
                std::string jobId = MQTTChecker::topicToJob(node);
145
146
147
                _jobDataVec.clear();
                if(this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false) && !_jobDataVec.empty()) {
                    U_Ptr jobUnit = jobDataToUnit(_jobDataVec[0]);
Alessio Netti's avatar
Alessio Netti committed
148
149
                    if(!jobUnit)
                        throw std::runtime_error("Job " + node + " not in the domain of operator " + this->_name + "!");
150
                    this->compute(jobUnit, _jobDataVec[0]);
151
                    this->retrieveAndFlush(outMap, jobUnit);
152
                } else
153
                    throw std::runtime_error("Operator " + this->_name + ": cannot retrieve job data!");
154
            } catch(const exception& e) {
155
                this->_onDemandLock.store(false);
156
157
                throw;
            }
158
159
            this->_onDemandLock.store(false);
        } else if( this->_keepRunning ) {
160
            bool found = false;
161
162
163
            //Spinning explicitly as we need to iterate on the derived Unit objects
            while(_unitAccess.exchange(true)) {}
            for(const auto& u : this->_units)
164
                if(u->getName() == node) {
165
                    found = true;
166
                    this->retrieveAndFlush(outMap, u, false);
167
                }
168
            releaseUnits();
169
170

            if(!found)
171
                throw std::domain_error("Job " + node + " does not belong to the domain of " + this->_name + "!");
172
        } else
173
            throw std::runtime_error("Operator " + this->_name + ": not available for on-demand query!");
174
175
        return outMap;
    }
176
    
177
protected:
178
    
179
    using OperatorTemplate<S>::compute;
180
181
182
183
184
    
    /**
    * @brief              Data analytics (job) computation logic
    *
    *                     This method contains the actual logic used by the analyzed, and is automatically called by
185
186
    *                     the computeAsync method. This variant of the compute() method defined in OperatorTemplate also
    *                     includes a job data structure in its list of arguments, and is specialized for job operators.
187
188
189
190
191
192
    *
    * @param unit         Shared pointer to unit to be processed
    * @param jobData      Job data structure 
    */
    virtual void compute(U_Ptr unit, qeJobData& jobData) = 0;
    
193
194
195
196
    /**
     * @brief           This method encapsulates all logic to generate and manage job units
     * 
     *                  The algorithm implemented in this method is very similar to that used in computeOnDemand in
197
     *                  OperatorTemplate, and it is used to manage job units both in on-demand and streaming mode. The
198
199
200
201
202
203
     *                  internal unit cache is used to store recent job units. Moreover, the job data returned by the
     *                  QueryEngine is converted to a format compatible with the UnitGenerator.
     * 
     * @param jobData   a qeJobData struct containing job information
     * @return          A shared pointer to a job unit object
     */
Alessio Netti's avatar
Alessio Netti committed
204
    virtual U_Ptr jobDataToUnit(qeJobData& jobData) {
205
206
        string jobTopic = MQTTChecker::jobToTopic(jobData.jobId);
        U_Ptr jobUnit = nullptr;
207
        if(!this->_unitCache)
208
            throw std::runtime_error("Initialization error in operator " + this->_name + "!");
209

210
211
        if (this->_unitCache->count(jobTopic)) {
            jobUnit = this->_unitCache->at(jobTopic);
212
            if(!this->_streaming)
213
                LOG(debug) << "Operator " << this->_name << ": cache hit for unit " << jobTopic << ".";
214
215
            
        } else {
216
            if (!this->_unitCache->count(SensorNavigator::templateKey))
217
                throw std::runtime_error("No template unit in operator " + this->_name + "!");
218
            if(!this->_streaming)
219
                LOG(debug) << "Operator " << this->_name << ": cache miss for unit " << jobTopic << ".";
Alessio Netti's avatar
Alessio Netti committed
220
221
            if(!this->filterJob(jobData))
                return nullptr;
222
223
            U_Ptr uTemplate = this->_unitCache->at(SensorNavigator::templateKey);
            shared_ptr<SensorNavigator> navi = this->_queryEngine.getNavigator();
224
            UnitGenerator<S> unitGen(navi);
225
226
            // The job unit is generated as a hierarchical unit
            jobUnit = unitGen.generateFromTemplate(uTemplate, jobTopic, jobData.nodes, this->_mqttPart, this->_enforceTopics, this->_relaxed);
227
            // Initializing sensors if necessary
228
            jobUnit->init(this->_interval, this->_queueSize);
229
            this->addToUnitCache(jobUnit);
230
231
232
233
234
        }
        return jobUnit;
    }
    
    /**
Alessio Netti's avatar
Alessio Netti committed
235
     * @brief             Tests the job against the internal filter
236
     * 
Alessio Netti's avatar
Alessio Netti committed
237
238
239
240
     *                    This method is used to filter out jobs for which this operator is not responsible. By default,
     *                    the operator checks the first node in the nodelist of the job, and if its hostname matches
     *                    with the internal job filter regex, the job is accepted. This method can be overridden to
     *                    implement more complex filtering policies.
241
     * 
Alessio Netti's avatar
Alessio Netti committed
242
243
     * @param jobData     a qeJobData struct containing job information
     * @return            True if the job should be processed, false otherwise
244
     */
Alessio Netti's avatar
Alessio Netti committed
245
    virtual bool filterJob(qeJobData& jobData) {
246
        // Job with no nodes - a unit cannot be built
Alessio Netti's avatar
Alessio Netti committed
247
248
        if(jobData.nodes.empty())
            return false;
249
        // Filtering and formatting the node list
Alessio Netti's avatar
Alessio Netti committed
250
251
        for(auto& nodeName : jobData.nodes)
            nodeName = MQTTChecker::formatTopic(nodeName) + std::string(1, MQTT_SEP);
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
        
        // No filter was set - every job is accepted
        if(_jobFilterStr=="" || _jobMatchStr=="")
            return true;
        
        // Counting the different matches to the job filter - e.g., different racks, islands, etc.
        std::map<std::string, uint64_t> matchCtr;
        for(const auto& nodeName : jobData.nodes) {
            if(boost::regex_search(nodeName.c_str(), _match, _jobFilter)) {
                ++matchCtr[_match.str(0)];
            }
        }
        
        // Computing the actual mode - the filtered node name acts as a tie breaker
        std::pair<std::string, uint64_t> mode = {"", 0};
        for(const auto& kv : matchCtr) {
            if (kv.second > mode.second || (kv.second == mode.second && kv.first > mode.first)) {
                mode = kv;
            }
        }
        
        // If the mode corresponds to the job match string, the check is successful.
        return mode.first == _jobMatchStr;
Alessio Netti's avatar
Alessio Netti committed
275
    }
276
277
278
279
280
    
    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
281
282
    *                     compute() method, which encapsulates the real logic of the operator. The compute method
    *                     is automatically called over units as required by the operator's configuration.
283
    *                     
284
    *                     In the case of job operators, this method will also automatically retrieve the list of jobs
285
286
287
288
289
290
    *                     that were running in the last interval. One unit for each of them is instantiated (or 
    *                     retrieved from the local unit cache, if available) and then the compute phase starts.
    *
    */
    virtual void computeAsync() override {
        try {
291
            _jobDataVec.clear();
292
            if(this->_queryEngine.queryJob("", this->_interval * 1000000, 0, _jobDataVec, true, true)) {
293
294
                _tempUnits.clear();
                // Producing units from the job data, discarding invalid jobs in the process
295
                for(auto& job : _jobDataVec) {
296
297
                    try {
                        _tempUnits.push_back(jobDataToUnit(job));
298
                    } catch(const invalid_argument& e2) { 
299
                        LOG(debug) << e2.what(); 
300
301
                        _tempUnits.push_back(nullptr); 
                        continue; }
302
                }
303
304
                
                // Performing actual computation on each unit
305
306
307
308
309
310
311
312
313
314
                for(size_t idx=0; idx<_tempUnits.size(); idx++) {
                    if (_tempUnits[idx]) {
                        try {
                            this->compute(_tempUnits[idx], _jobDataVec[idx]);
                        } catch(const exception& e) {
                            LOG(error) << e.what();
                            continue;
                        }
                    }
                }
315
316
317
318
                // Acquiring the spinlock to refresh the exposed units
                while(_unitAccess.exchange(true)) {}
                this->clearUnits();
                for(const auto& ju : _tempUnits)
319
                    if(ju)
320
                        this->addUnit(ju);
321
322
                _unitAccess.store(false);
                _tempUnits.clear();
323
324
            }
            else
325
                LOG(error) << "Operator " + this->_name + ": cannot retrieve job data!";
326
        } catch(const exception& e) {
327
            LOG(error) << "Operator " + this->_name + ": internal error " + e.what() + " during computation!";
328
            _unitAccess.store(false);
329
330
        }

331
332
333
        if (this->_timer && this->_keepRunning) {
            this->_timer->expires_at(timestamp2ptime(this->nextReadingTime()));
            this->_pendingTasks++;
334
            this->_timer->async_wait(bind(&JobOperatorTemplate::computeAsync, this));
335
        }
336
        this->_pendingTasks--;
337
    }
338
    
339
340
    // Vector of recently-modified units
    vector<U_Ptr> _tempUnits;
341
    // Spinlock used to regulate access to the internal units map, for "visualization" purposes
342
    atomic<bool> _unitAccess;
343
    // Vector of job data structures used to retrieve job data at runtime
344
    vector<qeJobData> _jobDataVec;
Alessio Netti's avatar
Alessio Netti committed
345
346
    // Regex object used to filter out jobs
    string _jobFilterStr;
347
    string _jobMatchStr;
Alessio Netti's avatar
Alessio Netti committed
348
349
    boost::regex _jobFilter;
    boost::cmatch _match;
350
351
    // Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
352
353
354
355
    

};

356
#endif //PROJECT_JOBOPERATORTEMPLATE_H