JobAnalyzerTemplate.h 12.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//================================================================================
// Name        : JobAnalyzerTemplate.h
// Author      : Alessio Netti
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features needed by Analyzers.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#ifndef PROJECT_JOBANALYZERTEMPLATE_H
#define PROJECT_JOBANALYZERTEMPLATE_H

30
#include "AnalyzerTemplate.h"
31
32
33
34
35
36
37
38

/**
 * Template that implements features needed by Job Analyzers and complying to AnalyzerInterface.
 *
 * This template is derived from AnalyzerTemplate, and is adjusted to simplify job-related computations.
 *
 */
template <typename S>
39
class JobAnalyzerTemplate : public AnalyzerTemplate<S> {
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    // The template shall only be instantiated for classes which derive from SensorBase
    static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");

protected:
    
    // For readability
    using S_Ptr = shared_ptr<S>;
    using U_Ptr = shared_ptr< UnitTemplate<S> >;

public:
    
    /**
    * @brief            Class constructor
    *
    * @param name       Name of the analyzer
    */
    JobAnalyzerTemplate(const string name) :
57
            AnalyzerTemplate<S>(name),
58
59
            _jobDataVec(nullptr) {
        
60
        _unitAccess.store(false);
61
        this->_dynamic = true;
62
63
64
65
66
67
68
    }

    /**
    * @brief            Copy constructor
    *
    */
    JobAnalyzerTemplate(const JobAnalyzerTemplate& other) :
69
            AnalyzerTemplate<S>(other),
70
71
            _jobDataVec(nullptr) {
        
72
        _unitAccess.store(false);
73
        this->_dynamic = true;
74
75
76
77
78
79
80
    }

    /**
    * @brief            Assignment operator
    *
    */
    JobAnalyzerTemplate& operator=(const JobAnalyzerTemplate& other) {
81
        AnalyzerTemplate<S>::operator=(other);
82
        _jobDataVec = nullptr;
83
        this->_dynamic = true;
84
85
86
87
88
89
90
91
92
    }
            
    /**
    * @brief            Class destructor
    */
    virtual ~JobAnalyzerTemplate() {
        if(_jobDataVec)
            delete _jobDataVec;
    }
93
    
94
95
96
97
98
    /**
    * @brief              Returns the units of this analyzer
    *
    *                     The units returned by this method are of the UnitInterface type. The actual units, in their
    *                     derived type, are used internally. This type of analyzer employs dynamic units that are
99
100
    *                     generated at runtime: as such, an internal unit lock is acquired upon calling this method,
    *                     and must later be released through the releaseUnits() method.
101
102
103
    *
    * @return             The vector of UnitInterface objects of this analyzer
    */
104
105
106
107
108
109
110
111
112
113
114
115
116
    virtual vector<UnitPtr>& getUnits() override	{
        // Spinlock to regulate access to units - normally innocuous
        while(_unitAccess.exchange(true)) {}
        return this->_baseUnits;
    }
    
    /**
     * @brief             Releases the access lock to units
     * 
     *                    This method must be called anytime operations on units are performed through getUnits().
     */
    virtual void releaseUnits() {
        _unitAccess.store(false);
117
    }
118
    
119
120
121
122
123
    /**
    * @brief              Initializes this analyzer
    *
    * @param io           Boost ASIO service to be used
    */
124
    virtual void init(boost::asio::io_service& io) override { AnalyzerInterface::init(io); }
125
126
127
128
129
130
131
132
133
134
135
136
137
138

    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
    *                     perform data analytics queries on the analyzer, which must have the _streaming attribute set
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Unit name for which the query must be performed
    * @return             a map<string, reading_t> containing the output of the query
    */
    virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
        map<string, reading_t> outMap;
139
        if( !this->_streaming ) {
140
141
            try {
                // Getting exclusive access to the analyzer
142
                while( this->_onDemandLock.exchange(true) ) {}
143
                uint32_t jobId = MQTTChecker::topicToJob(node);
144
                vector<qeJobData>* buf = this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false);
145
146
147
148
149
150
151
152
153
154
                if(buf) _jobDataVec = buf;
                if(buf && !buf->empty()) {
                    U_Ptr jobUnit = jobDataToUnit(buf[0]);

                    compute(jobUnit);
                    for (const auto &o : jobUnit->getOutputs()) {
                        outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                        o->clearReadingQueue();
                    }
                } else
155
                    throw std::runtime_error("Analyzer " + this->_name + ": cannot retrieve job data!");
156
            } catch(const exception& e) {
157
                this->_onDemandLock.store(false);
158
159
                throw;
            }
160
161
            this->_onDemandLock.store(false);
        } else if( this->_keepRunning ) {
162
            bool found = false;
163
164
            for(const auto& u : getUnits())
                if(u->getName() == node) {
165
                    found = true;
166
                    for(const auto& o : u->getBaseOutputs())
167
168
                        outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                }
169
            releaseUnits();
170
171

            if(!found)
172
                throw std::domain_error("Job " + node + " does not belong to the domain of " + this->_name + "!");
173
        } else
174
            throw std::runtime_error("Analyzer " + this->_name + ": not available for on-demand query!");
175
176
        return outMap;
    }
177
    
178
protected:
179
    
180
181
182
183
184
185
186
187
188
189
190
191
192
193
    /**
     * @brief           This method encapsulates all logic to generate and manage job units
     * 
     *                  The algorithm implemented in this method is very similar to that used in computeOnDemand in
     *                  AnalyzerTemplate, and it is used to manage job units both in on-demand and streaming mode. The
     *                  internal unit cache is used to store recent job units. Moreover, the job data returned by the
     *                  QueryEngine is converted to a format compatible with the UnitGenerator.
     * 
     * @param jobData   a qeJobData struct containing job information
     * @return          A shared pointer to a job unit object
     */
    virtual U_Ptr jobDataToUnit(qeJobData& jobData) {
        string jobTopic = MQTTChecker::jobToTopic(jobData.jobId);
        U_Ptr jobUnit = nullptr;
194
195
        if(!this->_unitCache)
            throw std::runtime_error("Initialization error in analyzer " + this->_name + "!");
196

197
198
199
        if (this->_unitCache->count(jobTopic)) {
            jobUnit = this->_unitCache->at(jobTopic);
            LOG(debug) << "Analyzer " << this->_name << ": cache hit for unit " << jobTopic << ".";
200
201
            
        } else {
202
203
204
205
206
            if (!this->_unitCache->count(SensorNavigator::templateKey))
                throw std::runtime_error("No template unit in analyzer " + this->_name + "!");
            LOG(debug) << "Analyzer " << this->_name << ": cache miss for unit " << jobTopic << ".";
            U_Ptr uTemplate = this->_unitCache->at(SensorNavigator::templateKey);
            shared_ptr<SensorNavigator> navi = this->_queryEngine.getNavigator();
207
208
209
210
            UnitGenerator<S> unitGen(navi);
            vector<string> nodes;
            for (const auto &n : jobData.nodes)
                nodes.push_back(translateNodeName(n));
211
            jobUnit = unitGen.generateJobUnit(jobTopic, nodes, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), jobTopic, this->_relaxed);
212
213
214
215

            // Initializing sensors if necessary
            for (const auto s : jobUnit->getOutputs())
                if (!s->isInit())
216
                    s->initSensor(this->_cacheSize);
217
                
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
            addToUnitCache(jobUnit);
        }
        return jobUnit;
    }
    
    /**
     * @brief             Translates a node name as returned by the resource manager to an internal representation
     * 
     *                    The external node name is usually just the hostname associated to the machine. This 
     *                    representation usually needs to be converted to an internal one that reflects the hierarchy
     *                    described by the sensor navigator. Since this logic is sytem-dependent, users can freely
     *                    override this method.
     * 
     * @param n           Raw node hostname
     * @return            Converted sensor navigator-friendly node name
     */
    virtual string translateNodeName(string n) { return n; }
    
    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
    *                     compute() method, which encapsulates the real logic of the analyzer. The compute method
    *                     is automatically called over units as required by the Analyzer's configuration.
    *                     
    *                     In the case of job analyzers, this method will also automatically retrieve the list of jobs
    *                     that were running in the last interval. One unit for each of them is instantiated (or 
    *                     retrieved from the local unit cache, if available) and then the compute phase starts.
    *
    */
    virtual void computeAsync() override {
249
250
251
252
        if(this->_delayInterval > 0) {
            sleep(this->_delayInterval);
            this->_delayInterval = 0;
            LOG(info) << "Analyzer " + this->_name + ": starting computation after delayed start!";
253
254
255
        }

        try {
256
            vector<qeJobData>* buf = this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true);
257
258
            if(buf) {
                _jobDataVec = buf;
259
260
                _tempUnits.clear();
                // Producing units from the job data, discarding invalid jobs in the process
261
                for(const auto& job : *_jobDataVec) {
262
263
264
                    try {
                        _tempUnits.push_back(jobDataToUnit(job));
                    } catch(const invalid_argument& e2) { continue; }
265
                }
266
267
268
269
270
271
272
273
274
275
276
                
                // Performing actual computation on each unit
                for(const auto& ju : _tempUnits)
                    compute(ju);
                // Acquiring the spinlock to refresh the exposed units
                while(_unitAccess.exchange(true)) {}
                this->clearUnits();
                for(const auto& ju : _tempUnits)
                    addUnit(ju);
                _unitAccess.store(false);
                _tempUnits.clear();
277
278
            }
            else
279
                LOG(error) << "Analyzer " + this->_name + ": cannot retrieve job data!";
280
        } catch(const exception& e) {
281
            LOG(error) << "Analyzer " + this->_name + ": internal error " + e.what() + " during computation!";
282
            _unitAccess.store(false);
283
284
        }

285
286
287
288
        if (this->_timer && this->_keepRunning) {
            this->_timer->expires_at(timestamp2ptime(this->nextReadingTime()));
            this->_pendingTasks++;
            this->_timer->async_wait(bind(&JobAnalyzerTemplate::computeAsync, this));
289
        }
290
        this->_pendingTasks--;
291
    }
292
    
293
294
    // Vector of recently-modified units
    vector<U_Ptr> _tempUnits;
295
    // Spinlock used to regulate access to the internal units map, for "visualization" purposes
296
    atomic<bool> _unitAccess;
297
298
    // Vector of job data structures used to retrieve job data at runtime
    vector<qeJobData>* _jobDataVec;
299
300
    // Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
301
302
303
304
305
    

};

#endif //PROJECT_JOBANALYZERTEMPLATE_H