analyticscontroller.cpp 8.48 KB
Newer Older
1
2
3
//================================================================================
// Name        : analyticscontroller.cpp
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright   : Leibniz Supercomputing Centre
// Description : Wrapper class implementation for AnalyticsManager.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30
31
32
33
34
35
36

#include "analyticscontroller.h"

void AnalyticsController::start() {
    _keepRunning = true;
    _readingCtr = 0;
    _mainThread = boost::thread(bind(&AnalyticsController::run, this));
}

void AnalyticsController::stop() {
Alessio Netti's avatar
Alessio Netti committed
37
    LOG(info) << "Stopping data analytics management thread...";
38
    _keepRunning = false;
Alessio Netti's avatar
Alessio Netti committed
39
    _mainThread.join();
40
41
    LOG(info) << "Stopping sensors...";
    _manager->stop();
42
    _manager->clear();
43
44
45
    LOG(info) << "Stopping worker threads...";
    _keepAliveWork.reset();
    _threads.join_all();
46
    _initialized = false;
47
48
}

Alessio Netti's avatar
Alessio Netti committed
49
bool AnalyticsController::initialize(Configuration& settings, const string& configPath) {
50
51
52
    _settings = settings;
    _configPath = configPath;
    _navigator = make_shared<SensorNavigator>();
53

54
    // A sensor navigator is only built if operator plugins are expected to be instantiated
55
56
    QueryEngine &_queryEngine = QueryEngine::getInstance();
    if(_manager->probe(_configPath, "collectagent.conf")) {
Alessio Netti's avatar
Alessio Netti committed
57
        vector <string> topics;
58
59
60
        list <DCDB::PublicSensor> publicSensors;
        // Fetching sensor names and topics from the Cassandra datastore
        _dcdbCfg->getPublicSensorsVerbose(publicSensors);
Alessio Netti's avatar
Alessio Netti committed
61
        std::string patternBuf;
62
63
        for (const auto &s : publicSensors)
            if (!s.is_virtual) {
Alessio Netti's avatar
Alessio Netti committed
64
65
66
                patternBuf = s.pattern;
                boost::algorithm::trim(patternBuf);
                topics.push_back(patternBuf);
67
68
69
70
71
            }
        publicSensors.clear();

        // Building the sensor navigator
        try {
72
            _navigator->setFilter(_settings.analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
73
            _navigator->buildTree(_settings.analyticsSettings.hierarchy, &topics);
74
75
        } catch (const std::invalid_argument &e) {
            LOG(error) << e.what();
76
77
            LOG(error) << "Failed to build sensor hierarchy tree!";
            return false;
78
79
80
81
82
83
84
85
86
87
88
89
        }
        LOG(info) << "Built a sensor hierarchy tree of size " << _navigator->getTreeSize() << " and depth "
                  << _navigator->getTreeDepth() << ".";
        topics.clear();

        // Assigning the newly-built sensor navigator to the QueryEngine
        _queryEngine.setNavigator(_navigator);
    }

    //TODO: find a better solution to disable the SensorBase default cache
    _settings.pluginSettings.cacheInterval = 0;
    if(!_manager->load(_configPath, "collectagent.conf", _settings.pluginSettings)) {
90
        LOG(fatal) << "Failed to load data analytics manager!";
91
92
93
        return false;
    }

94
    if(!_queryEngine.updating.is_lock_free())
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
        LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";

    LOG(info) << "Creating threads...";
    // Dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
    // Inherited from DCDB Pusher
    _keepAliveWork = make_shared<boost::asio::io_service::work>(_io);
    // Create pool of threads which handle the sensors
    for(size_t i = 0; i < _settings.threads; i++) {
        _threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &_io));
    }
    LOG(info) << "Threads created!";
    
    _initialized = true;
    return true;
}

void AnalyticsController::run() {
    // If initialization of data analytics fails, the thread terminates
    if(!_initialized)
        return;

116
    LOG(info) << "Init operators...";
117
    _manager->init(_io);
118
    LOG(info) << "Starting operators...";
119
120
121
122
123
    _manager->start();
    LOG(info) << "Sensors started!";
    
    publishSensors();

124
    vector<op_dl_t>& _analyticsPlugins = _manager->getPlugins();
125
126
127
128
129
130
131
132
133
134
135
136
137
138
    DCDB::SensorId sid;
    list<DCDB::SensorDataStoreReading> readings;
    boost::lockfree::spsc_queue<reading_t> *sensorQueue;
    reading_t readingBuf;
    while (_keepRunning) {
        if (_doHalt) {
            _halted = true;
            sleep(2);
            continue;
        }
        _halted = false;
        // Push output analytics sensors
        for (auto &p : _analyticsPlugins) {
            if (_doHalt) break;
139
140
141
            for (const auto &op : p.configurator->getOperators())
                if(op->getStreaming()) {
                    for (const auto &u : op->getUnits())
142
                        for (const auto &s : u->getBaseOutputs())
143
                            if (s->getSizeOfReadingQueue() >= op->getMinValues() && sid.mqttTopicConvert(s->getMqtt())) {
144
145
                                readings.clear();
                                sensorQueue = s->getReadingQueue();
146
                                while (sensorQueue->pop(readingBuf)) {
147
148
149
                                    readings.push_back(DCDB::SensorDataStoreReading(sid, readingBuf.timestamp, readingBuf.value));
                                    _sensorCache->storeSensor(sid, readingBuf.timestamp, readingBuf.value);
                                }
Alessio Netti's avatar
Alessio Netti committed
150
                                _sensorCache->getSensorMap()[sid].updateBatchSize(op->getMinValues());
151
                                _dcdbStore->insertBatch(readings);
152
                                _readingCtr += readings.size();
153
                            }
154
                    op->releaseUnits();
155
                }
156
157
158
159
160
161
        }
        sleep(1);
    }
}

bool AnalyticsController::publishSensors() {
Alessio Netti's avatar
Alessio Netti committed
162
    // Performing auto-publish (if required) for the sensors instantiated by the data analytics framework
163
    if(!_settings.pluginSettings.autoPublish) return false;
164
165

    DCDB::SCError err;
166
    vector<op_dl_t>& _analyticsPlugins = _manager->getPlugins();
167
168
169
    bool failedPublish = false;
    uint64_t publishCtr = 0;
    for (auto &p : _analyticsPlugins)
170
171
172
        for (const auto &op : p.configurator->getOperators())
            if(op->getStreaming() && !op->getDynamic()) {
                for (const auto &u : op->getUnits())
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
                    for (const auto &s : u->getBaseOutputs()) {
                        err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
                        switch (err) {
                            case DCDB::SC_OK:
                                publishCtr++;
                                break;
                            case DCDB::SC_INVALIDPATTERN:
                                LOG(error) << "Invalid sensor topic : " << s->getMqtt();
                                failedPublish = true;
                                break;
                            case DCDB::SC_INVALIDPUBLICNAME:
                                LOG(error) << "Invalid sensor public name: " << s->getName();
                                failedPublish = true;
                                break;
                            case DCDB::SC_INVALIDSESSION:
                                LOG(error) << "Cannot reach sensor data store.";
                                failedPublish = true;
                                break;
                            default:
                                break;
                        }
194
                    }
195
                op->releaseUnits();
196
            }
197
198
199
200
201
202
203

    if(failedPublish)
        LOG(error) << "Issues during sensor name auto-publish! Only " << publishCtr << " sensors were published.";
    else
        LOG(info) << "Sensor name auto-publish performed for all " << publishCtr << " sensors!";
    return true;
}