JobAnalyzerTemplate.h 12.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//================================================================================
// Name        : JobAnalyzerTemplate.h
// Author      : Alessio Netti
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features needed by Analyzers.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#ifndef PROJECT_JOBANALYZERTEMPLATE_H
#define PROJECT_JOBANALYZERTEMPLATE_H

30
#include "AnalyzerTemplate.h"
31
32
33
34
35
36
37
38

/**
 * Template that implements features needed by Job Analyzers and complying to AnalyzerInterface.
 *
 * This template is derived from AnalyzerTemplate, and is adjusted to simplify job-related computations.
 *
 */
template <typename S>
39
class JobAnalyzerTemplate : public AnalyzerTemplate<S> {
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    // The template shall only be instantiated for classes which derive from SensorBase
    static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");

protected:
    
    // For readability
    using S_Ptr = shared_ptr<S>;
    using U_Ptr = shared_ptr< UnitTemplate<S> >;

public:
    
    /**
    * @brief            Class constructor
    *
    * @param name       Name of the analyzer
    */
    JobAnalyzerTemplate(const string name) :
57
            AnalyzerTemplate<S>(name),
58
59
            _jobDataVec(nullptr) {
        
60
        _unitAccess.store(false);
61
62
63
64
65
66
67
    }

    /**
    * @brief            Copy constructor
    *
    */
    JobAnalyzerTemplate(const JobAnalyzerTemplate& other) :
68
            AnalyzerTemplate<S>(other),
69
70
            _jobDataVec(nullptr) {
        
71
        _unitAccess.store(false);
72
73
74
75
76
77
78
    }

    /**
    * @brief            Assignment operator
    *
    */
    JobAnalyzerTemplate& operator=(const JobAnalyzerTemplate& other) {
79
        AnalyzerTemplate<S>::operator=(other);
80
81
82
83
84
85
86
87
88
89
        _jobDataVec = nullptr;
    }
            
    /**
    * @brief            Class destructor
    */
    virtual ~JobAnalyzerTemplate() {
        if(_jobDataVec)
            delete _jobDataVec;
    }
90
    
91
92
93
94
95
    /**
    * @brief              Returns the units of this analyzer
    *
    *                     The units returned by this method are of the UnitInterface type. The actual units, in their
    *                     derived type, are used internally. This type of analyzer employs dynamic units that are
96
97
    *                     generated at runtime: as such, an internal unit lock is acquired upon calling this method,
    *                     and must later be released through the releaseUnits() method.
98
99
100
    *
    * @return             The vector of UnitInterface objects of this analyzer
    */
101
102
103
104
105
106
107
108
109
110
111
112
113
    virtual vector<UnitPtr>& getUnits() override	{
        // Spinlock to regulate access to units - normally innocuous
        while(_unitAccess.exchange(true)) {}
        return this->_baseUnits;
    }
    
    /**
     * @brief             Releases the access lock to units
     * 
     *                    This method must be called anytime operations on units are performed through getUnits().
     */
    virtual void releaseUnits() {
        _unitAccess.store(false);
114
    }
115
    
116
117
118
119
120
    /**
    * @brief              Initializes this analyzer
    *
    * @param io           Boost ASIO service to be used
    */
121
    virtual void init(boost::asio::io_service& io) override { AnalyzerInterface::init(io); }
122
123
124
125
126
127
128
129
130
131
132
133
134
135

    /**
    * @brief              Performs an on-demand compute task
    *
    *                     Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
    *                     perform data analytics queries on the analyzer, which must have the _streaming attribute set
    *                     to false. A unit is generated on the fly, corresponding to the input node given as input,
    *                     and results are returned in the form of a map.
    *
    * @param node         Unit name for which the query must be performed
    * @return             a map<string, reading_t> containing the output of the query
    */
    virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
        map<string, reading_t> outMap;
136
        if( !this->_streaming ) {
137
138
            try {
                // Getting exclusive access to the analyzer
139
                while( this->_onDemandLock.exchange(true) ) {}
140
                uint32_t jobId = MQTTChecker::topicToJob(node);
141
                vector<qeJobData>* buf = this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false);
142
143
144
145
146
147
148
149
150
151
                if(buf) _jobDataVec = buf;
                if(buf && !buf->empty()) {
                    U_Ptr jobUnit = jobDataToUnit(buf[0]);

                    compute(jobUnit);
                    for (const auto &o : jobUnit->getOutputs()) {
                        outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                        o->clearReadingQueue();
                    }
                } else
152
                    throw std::runtime_error("Analyzer " + this->_name + ": cannot retrieve job data!");
153
            } catch(const exception& e) {
154
                this->_onDemandLock.store(false);
155
156
                throw;
            }
157
158
            this->_onDemandLock.store(false);
        } else if( this->_keepRunning ) {
159
            bool found = false;
160
161
            for(const auto& u : getUnits())
                if(u->getName() == node) {
162
                    found = true;
163
                    for(const auto& o : u->getBaseOutputs())
164
165
                        outMap.insert(make_pair(o->getName(), o->getLatestValue()));
                }
166
            releaseUnits();
167
168

            if(!found)
169
                throw std::domain_error("Job " + node + " does not belong to the domain of " + this->_name + "!");
170
        } else
171
            throw std::runtime_error("Analyzer " + this->_name + ": not available for on-demand query!");
172
173
        return outMap;
    }
174
    
175
protected:
176
    
177
178
179
180
181
182
183
184
185
186
187
188
189
190
    /**
     * @brief           This method encapsulates all logic to generate and manage job units
     * 
     *                  The algorithm implemented in this method is very similar to that used in computeOnDemand in
     *                  AnalyzerTemplate, and it is used to manage job units both in on-demand and streaming mode. The
     *                  internal unit cache is used to store recent job units. Moreover, the job data returned by the
     *                  QueryEngine is converted to a format compatible with the UnitGenerator.
     * 
     * @param jobData   a qeJobData struct containing job information
     * @return          A shared pointer to a job unit object
     */
    virtual U_Ptr jobDataToUnit(qeJobData& jobData) {
        string jobTopic = MQTTChecker::jobToTopic(jobData.jobId);
        U_Ptr jobUnit = nullptr;
191
192
        if(!this->_unitCache)
            throw std::runtime_error("Initialization error in analyzer " + this->_name + "!");
193

194
195
196
        if (this->_unitCache->count(jobTopic)) {
            jobUnit = this->_unitCache->at(jobTopic);
            LOG(debug) << "Analyzer " << this->_name << ": cache hit for unit " << jobTopic << ".";
197
198
            
        } else {
199
200
201
202
203
            if (!this->_unitCache->count(SensorNavigator::templateKey))
                throw std::runtime_error("No template unit in analyzer " + this->_name + "!");
            LOG(debug) << "Analyzer " << this->_name << ": cache miss for unit " << jobTopic << ".";
            U_Ptr uTemplate = this->_unitCache->at(SensorNavigator::templateKey);
            shared_ptr<SensorNavigator> navi = this->_queryEngine.getNavigator();
204
205
206
207
            UnitGenerator<S> unitGen(navi);
            vector<string> nodes;
            for (const auto &n : jobData.nodes)
                nodes.push_back(translateNodeName(n));
208
            jobUnit = unitGen.generateJobUnit(jobTopic, nodes, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), jobTopic, this->_relaxed);
209
210
211
212

            // Initializing sensors if necessary
            for (const auto s : jobUnit->getOutputs())
                if (!s->isInit())
213
                    s->initSensor(this->_cacheSize);
214
                
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
            addToUnitCache(jobUnit);
        }
        return jobUnit;
    }
    
    /**
     * @brief             Translates a node name as returned by the resource manager to an internal representation
     * 
     *                    The external node name is usually just the hostname associated to the machine. This 
     *                    representation usually needs to be converted to an internal one that reflects the hierarchy
     *                    described by the sensor navigator. Since this logic is sytem-dependent, users can freely
     *                    override this method.
     * 
     * @param n           Raw node hostname
     * @return            Converted sensor navigator-friendly node name
     */
    virtual string translateNodeName(string n) { return n; }
    
    /**
    * @brief              Performs a compute task
    *
    *                     This method is tasked with scheduling the next compute task, and invoking the internal
    *                     compute() method, which encapsulates the real logic of the analyzer. The compute method
    *                     is automatically called over units as required by the Analyzer's configuration.
    *                     
    *                     In the case of job analyzers, this method will also automatically retrieve the list of jobs
    *                     that were running in the last interval. One unit for each of them is instantiated (or 
    *                     retrieved from the local unit cache, if available) and then the compute phase starts.
    *
    */
    virtual void computeAsync() override {
246
247
248
249
        if(this->_delayInterval > 0) {
            sleep(this->_delayInterval);
            this->_delayInterval = 0;
            LOG(info) << "Analyzer " + this->_name + ": starting computation after delayed start!";
250
251
252
        }

        try {
253
            vector<qeJobData>* buf = this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true);
254
255
            if(buf) {
                _jobDataVec = buf;
256
257
                _tempUnits.clear();
                // Producing units from the job data, discarding invalid jobs in the process
258
                for(const auto& job : *_jobDataVec) {
259
260
261
                    try {
                        _tempUnits.push_back(jobDataToUnit(job));
                    } catch(const invalid_argument& e2) { continue; }
262
                }
263
264
265
266
267
268
269
270
271
272
273
                
                // Performing actual computation on each unit
                for(const auto& ju : _tempUnits)
                    compute(ju);
                // Acquiring the spinlock to refresh the exposed units
                while(_unitAccess.exchange(true)) {}
                this->clearUnits();
                for(const auto& ju : _tempUnits)
                    addUnit(ju);
                _unitAccess.store(false);
                _tempUnits.clear();
274
275
            }
            else
276
                LOG(error) << "Analyzer " + this->_name + ": cannot retrieve job data!";
277
        } catch(const exception& e) {
278
            LOG(error) << "Analyzer " + this->_name + ": internal error " + e.what() + " during computation!";
279
            _unitAccess.store(false);
280
281
        }

282
283
284
285
        if (this->_timer && this->_keepRunning) {
            this->_timer->expires_at(timestamp2ptime(this->nextReadingTime()));
            this->_pendingTasks++;
            this->_timer->async_wait(bind(&JobAnalyzerTemplate::computeAsync, this));
286
        }
287
        this->_pendingTasks--;
288
    }
289
    
290
291
    // Vector of recently-modified units
    vector<U_Ptr> _tempUnits;
292
    // Spinlock used to regulate access to the internal units map, for "visualization" purposes
293
    atomic<bool> _unitAccess;
294
295
    // Vector of job data structures used to retrieve job data at runtime
    vector<qeJobData>* _jobDataVec;
296
297
    // Logger object
    boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
298
299
300
301
302
    

};

#endif //PROJECT_JOBANALYZERTEMPLATE_H