analyticscontroller.cpp 6.71 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//
// Created by Netti, Alessio on 11.04.19.
//

#include "analyticscontroller.h"

void AnalyticsController::start() {
    _keepRunning = true;
    _readingCtr = 0;
    _mainThread = boost::thread(bind(&AnalyticsController::run, this));
}

void AnalyticsController::stop() {
    _keepRunning = false;
    LOG(info) << "Stopping sensors...";
    _manager->stop();
    LOG(info) << "Stopping data analytics management thread...";
    _mainThread.join();
    LOG(info) << "Stopping worker threads...";
    _keepAliveWork.reset();
    _threads.join_all();
}

bool AnalyticsController::initialize(globalCA_t& settings, const string& configPath) {
    _settings = settings;
    _configPath = configPath;
    _navigator = make_shared<SensorNavigator>();
28

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    // A sensor navigator is only built if data analytics plugins are expected to be instantiated
    QueryEngine &_queryEngine = QueryEngine::getInstance();
    if(_manager->probe(_configPath, "collectagent.conf")) {
        vector <string> names, topics;
        list <DCDB::PublicSensor> publicSensors;
        // Fetching sensor names and topics from the Cassandra datastore
        _dcdbCfg->getPublicSensorsVerbose(publicSensors);
        for (const auto &s : publicSensors)
            if (!s.is_virtual) {
                names.push_back(s.name);
                topics.push_back(s.pattern);
            }
        publicSensors.clear();

        // Building the sensor navigator
        try {
            _navigator->buildTree(_settings.hierarchy, &names, &topics);
        } catch (const std::invalid_argument &e) {
            LOG(error) << e.what();
48
49
            LOG(error) << "Failed to build sensor hierarchy tree!";
            return false;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
        }
        LOG(info) << "Built a sensor hierarchy tree of size " << _navigator->getTreeSize() << " and depth "
                  << _navigator->getTreeDepth() << ".";
        names.clear();
        topics.clear();

        // Assigning the newly-built sensor navigator to the QueryEngine
        _queryEngine.setNavigator(_navigator);
        _queryEngine.triggerUpdate();
    }

    //TODO: find a better solution to disable the SensorBase default cache
    _settings.pluginSettings.cacheInterval = 0;
    if(!_manager->load(_configPath, "collectagent.conf", _settings.pluginSettings)) {
64
        LOG(fatal) << "Failed to load data analytics manager!";
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
        return false;
    }

    if(!_queryEngine.updated.is_lock_free())
        LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";

    LOG(info) << "Creating threads...";
    // Dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
    // Inherited from DCDB Pusher
    _keepAliveWork = make_shared<boost::asio::io_service::work>(_io);
    // Create pool of threads which handle the sensors
    for(size_t i = 0; i < _settings.threads; i++) {
        _threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &_io));
    }
    LOG(info) << "Threads created!";
    
    _initialized = true;
    return true;
}

void AnalyticsController::run() {
    // If initialization of data analytics fails, the thread terminates
    if(!_initialized)
        return;

    LOG(info) << "Init analyzers...";
    _manager->init(_io);
    LOG(info) << "Starting analyzers...";
    _manager->start();
    LOG(info) << "Sensors started!";
    
    publishSensors();

    vector<an_dl_t>& _analyticsPlugins = _manager->getPlugins();
    DCDB::SensorId sid;
    list<DCDB::SensorDataStoreReading> readings;
    boost::lockfree::spsc_queue<reading_t> *sensorQueue;
    reading_t readingBuf;
    while (_keepRunning) {
        if (_doHalt) {
            _halted = true;
            sleep(2);
            continue;
        }
        _halted = false;
        // Push output analytics sensors
        for (auto &p : _analyticsPlugins) {
            if (_doHalt) break;
            for (const auto &a : p.configurator->getAnalyzers())
114
115
116
117
118
119
120
121
122
123
124
125
                if(a->getStreaming())
                    for (const auto &u : a->getUnits())
                        for (const auto &s : u->getBaseOutputs())
                            if (s->getSizeOfReadingQueue() >= a->getMinValues() && sid.mqttTopicConvert(s->getMqtt())) {
                                readings.clear();
                                sensorQueue = s->getReadingQueue();
                                while(sensorQueue->pop(readingBuf)) {
                                    readings.push_back(DCDB::SensorDataStoreReading(sid, readingBuf.timestamp, readingBuf.value));
                                    _sensorCache->storeSensor(sid, readingBuf.timestamp, readingBuf.value);
                                }
                                _dcdbStore->insertBatch(readings);
                                _readingCtr+=readings.size();
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
                            }
        }
        sleep(1);
    }
}

bool AnalyticsController::publishSensors() {
    if(_settings.pluginSettings.sensorPattern=="")
        return false;

    DCDB::SCError err;
    vector<an_dl_t>& _analyticsPlugins = _manager->getPlugins();
    bool failedPublish = false;
    uint64_t publishCtr = 0;
    for (auto &p : _analyticsPlugins)
        for (const auto &a : p.configurator->getAnalyzers())
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
            if(a->getStreaming())
                for (const auto &u : a->getUnits())
                    for (const auto &s : u->getBaseOutputs()) {
                        err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
                        switch (err) {
                            case DCDB::SC_OK:
                                publishCtr++;
                                break;
                            case DCDB::SC_INVALIDPATTERN:
                                LOG(error) << "Invalid sensor topic : " << s->getMqtt();
                                failedPublish = true;
                                break;
                            case DCDB::SC_INVALIDPUBLICNAME:
                                LOG(error) << "Invalid sensor public name: " << s->getName();
                                failedPublish = true;
                                break;
                            case DCDB::SC_INVALIDSESSION:
                                LOG(error) << "Cannot reach sensor data store.";
                                failedPublish = true;
                                break;
                            default:
                                break;
                        }
165
166
167
168
169
170
171
172
                    }

    if(failedPublish)
        LOG(error) << "Issues during sensor name auto-publish! Only " << publishCtr << " sensors were published.";
    else
        LOG(info) << "Sensor name auto-publish performed for all " << publishCtr << " sensors!";
    return true;
}