JobAnalyzerTemplate.h 12.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//================================================================================
// Name        : JobAnalyzerTemplate.h
// Author      : Alessio Netti
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features needed by Analyzers.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#ifndef PROJECT_JOBANALYZERTEMPLATE_H
#define PROJECT_JOBANALYZERTEMPLATE_H

30
#include "AnalyzerTemplate.h"
31
32
33
34
35
36
37
38

/**
 * Template that implements features needed by Job Analyzers and complying to AnalyzerInterface.
 *
 * This template is derived from AnalyzerTemplate, and is adjusted to simplify job-related computations.
 *
 */
template <typename S>
39
class JobAnalyzerTemplate : virtual public AnalyzerTemplate<S> {
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    // The template shall only be instantiated for classes which derive from SensorBase
    static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");

protected:
    
    // For readability
    using S_Ptr = shared_ptr<S>;
    using U_Ptr = shared_ptr< UnitTemplate<S> >;

public:
    
    /**
    * @brief            Class constructor
    *
    * @param name       Name of the analyzer
    */
    JobAnalyzerTemplate(const string name) :
57
            AnalyzerTemplate<S>(name),
58
59
            _jobDataVec(nullptr) {
        
60
        _unitAccess.store(false);
61
        this->_dynamic = true;
62
63
64
65
66
67
68
    }

    /**
    * @brief            Copy constructor
    *
    */
    JobAnalyzerTemplate(const JobAnalyzerTemplate& other) :
69
            AnalyzerTemplate<S>(other),
70
71
            _jobDataVec(nullptr) {
        
72
        _unitAccess.store(false);
73
        this->_dynamic = true;
74
75
76
77
78
79
80
    }

    /**
    * @brief            Assignment operator
    *
    */
    JobAnalyzerTemplate& operator=(const JobAnalyzerTemplate& other) {
81
        AnalyzerTemplate<S>::operator=(other);
82
        _jobDataVec = nullptr;
83
        this->_dynamic = true;
84
        return *this;
85
86
87
88
89
90
91
92
93
    }
            
    /**
    * @brief            Class destructor
    */
    virtual ~JobAnalyzerTemplate() {
        if(_jobDataVec)
            delete _jobDataVec;
    }
94
    
95
96
97
98
99
    /**
    * @brief              Returns the units of this analyzer
    *
    *                     The units returned by this method are of the UnitInterface type. The actual units, in their
    *                     derived type, are used internally. This type of analyzer employs dynamic units that are
100
101
    *                     generated at runtime: as such, an internal unit lock is acquired upon calling this method,
    *                     and must later be released through the releaseUnits() method.
102
103
104
    *
    * @return             The vector of UnitInterface objects of this analyzer
    */
105
106
107
108
109
110
111
112
113
114
115
    virtual vector<UnitPtr>& getUnits() override	{
        // Spinlock to regulate access to units - normally innocuous
        while(_unitAccess.exchange(true)) {}
        return this->_baseUnits;
    }
    
    /**
     * @brief             Releases the access lock to units
     * 
     *                    This method must be called anytime operations on units are performed through getUnits().
     */
116
    virtual void releaseUnits() override {
117
        _unitAccess.store(false);
118
    }
119
    
120
121
122
123
124
    /**
    * @brief              Initializes this analyzer
    *
    * @param io           Boost ASIO service to be used
    */
125
    virtual void init(boost::asio::io_service& io) override { AnalyzerInterface::init(io); }
126
127
128
129
130
131
132
133
134
135
136
137
138
139

    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
    *                     perform data analytics queries on the analyzer, which must have the _streaming attribute set
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Unit name for which the query must be performed
    * @return             a map<string, reading_t> containing the output of the query
    */
    virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
        map<string, reading_t> outMap;
140
        if( !this->_streaming ) {
141
142
            try {
                // Getting exclusive access to the analyzer
143
                while( this->_onDemandLock.exchange(true) ) {}
144
                uint32_t jobId = MQTTChecker::topicToJob(node);
145
                vector<qeJobData>* buf = this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false);
146
147
                if(buf) _jobDataVec = buf;
                if(buf && !buf->empty()) {
148
                    U_Ptr jobUnit = jobDataToUnit(buf->at(0));
149

150
                    this->compute(jobUnit);
151
152
153
154
155
                    for (const auto &o : jobUnit->getOutputs()) {
                        outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                        o->clearReadingQueue();
                    }
                } else
156
                    throw std::runtime_error("Analyzer " + this->_name + ": cannot retrieve job data!");
157
            } catch(const exception& e) {
158
                this->_onDemandLock.store(false);
159
160
                throw;
            }
161
162
            this->_onDemandLock.store(false);
        } else if( this->_keepRunning ) {
163
            bool found = false;
164
165
            for(const auto& u : getUnits())
                if(u->getName() == node) {
166
                    found = true;
167
                    for(const auto& o : u->getBaseOutputs())
168
169
                        outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                }
170
            releaseUnits();
171
172

            if(!found)
173
                throw std::domain_error("Job " + node + " does not belong to the domain of " + this->_name + "!");
174
        } else
175
            throw std::runtime_error("Analyzer " + this->_name + ": not available for on-demand query!");
176
177
        return outMap;
    }
178
    
179
protected:
180
    
181
182
183
184
185
186
187
188
189
190
191
    /**
     * @brief           This method encapsulates all logic to generate and manage job units
     * 
     *                  The algorithm implemented in this method is very similar to that used in computeOnDemand in
     *                  AnalyzerTemplate, and it is used to manage job units both in on-demand and streaming mode. The
     *                  internal unit cache is used to store recent job units. Moreover, the job data returned by the
     *                  QueryEngine is converted to a format compatible with the UnitGenerator.
     * 
     * @param jobData   a qeJobData struct containing job information
     * @return          A shared pointer to a job unit object
     */
192
    virtual U_Ptr jobDataToUnit(const qeJobData& jobData) {
193
194
        string jobTopic = MQTTChecker::jobToTopic(jobData.jobId);
        U_Ptr jobUnit = nullptr;
195
196
        if(!this->_unitCache)
            throw std::runtime_error("Initialization error in analyzer " + this->_name + "!");
197

198
199
200
        if (this->_unitCache->count(jobTopic)) {
            jobUnit = this->_unitCache->at(jobTopic);
            LOG(debug) << "Analyzer " << this->_name << ": cache hit for unit " << jobTopic << ".";
201
202
            
        } else {
203
204
205
206
207
            if (!this->_unitCache->count(SensorNavigator::templateKey))
                throw std::runtime_error("No template unit in analyzer " + this->_name + "!");
            LOG(debug) << "Analyzer " << this->_name << ": cache miss for unit " << jobTopic << ".";
            U_Ptr uTemplate = this->_unitCache->at(SensorNavigator::templateKey);
            shared_ptr<SensorNavigator> navi = this->_queryEngine.getNavigator();
208
209
210
211
            UnitGenerator<S> unitGen(navi);
            vector<string> nodes;
            for (const auto &n : jobData.nodes)
                nodes.push_back(translateNodeName(n));
212
            jobUnit = unitGen.generateJobUnit(jobTopic, nodes, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), jobTopic, this->_relaxed);
213
214
215
216

            // Initializing sensors if necessary
            for (const auto s : jobUnit->getOutputs())
                if (!s->isInit())
217
                    s->initSensor(this->_cacheSize);
218
                
219
            this->addToUnitCache(jobUnit);
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
        }
        return jobUnit;
    }
    
    /**
     * @brief             Translates a node name as returned by the resource manager to an internal representation
     * 
     *                    The external node name is usually just the hostname associated to the machine. This 
     *                    representation usually needs to be converted to an internal one that reflects the hierarchy
     *                    described by the sensor navigator. Since this logic is sytem-dependent, users can freely
     *                    override this method.
     * 
     * @param n           Raw node hostname
     * @return            Converted sensor navigator-friendly node name
     */
    virtual string translateNodeName(string n) { return n; }
    
    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
    *                     compute() method, which encapsulates the real logic of the analyzer. The compute method
    *                     is automatically called over units as required by the Analyzer's configuration.
    *                     
    *                     In the case of job analyzers, this method will also automatically retrieve the list of jobs
    *                     that were running in the last interval. One unit for each of them is instantiated (or 
    *                     retrieved from the local unit cache, if available) and then the compute phase starts.
    *
    */
    virtual void computeAsync() override {
250
251
252
253
        if(this->_delayInterval > 0) {
            sleep(this->_delayInterval);
            this->_delayInterval = 0;
            LOG(info) << "Analyzer " + this->_name + ": starting computation after delayed start!";
254
255
256
        }

        try {
257
            vector<qeJobData>* buf = this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true);
258
259
            if(buf) {
                _jobDataVec = buf;
260
261
                _tempUnits.clear();
                // Producing units from the job data, discarding invalid jobs in the process
262
                for(const auto& job : *_jobDataVec) {
263
264
265
                    try {
                        _tempUnits.push_back(jobDataToUnit(job));
                    } catch(const invalid_argument& e2) { continue; }
266
                }
267
268
269
                
                // Performing actual computation on each unit
                for(const auto& ju : _tempUnits)
270
                    this->compute(ju);
271
272
273
274
                // Acquiring the spinlock to refresh the exposed units
                while(_unitAccess.exchange(true)) {}
                this->clearUnits();
                for(const auto& ju : _tempUnits)
275
                    this->addUnit(ju);
276
277
                _unitAccess.store(false);
                _tempUnits.clear();
278
279
            }
            else
280
                LOG(error) << "Analyzer " + this->_name + ": cannot retrieve job data!";
281
        } catch(const exception& e) {
282
            LOG(error) << "Analyzer " + this->_name + ": internal error " + e.what() + " during computation!";
283
            _unitAccess.store(false);
284
285
        }

286
287
288
289
        if (this->_timer && this->_keepRunning) {
            this->_timer->expires_at(timestamp2ptime(this->nextReadingTime()));
            this->_pendingTasks++;
            this->_timer->async_wait(bind(&JobAnalyzerTemplate::computeAsync, this));
290
        }
291
        this->_pendingTasks--;
292
    }
293
    
294
295
    // Vector of recently-modified units
    vector<U_Ptr> _tempUnits;
296
    // Spinlock used to regulate access to the internal units map, for "visualization" purposes
297
    atomic<bool> _unitAccess;
298
299
    // Vector of job data structures used to retrieve job data at runtime
    vector<qeJobData>* _jobDataVec;
300
301
    // Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
302
303
304
305
306
    

};

#endif //PROJECT_JOBANALYZERTEMPLATE_H