2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

CARestAPI.cpp 12.3 KB
Newer Older
1
2
3
//================================================================================
// Name        : CARestAPI.cpp
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : RESTful API implementation for collectagent.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28

#include "CARestAPI.h"
Michael Ott's avatar
Michael Ott committed
29
30
31
#include <dcdb/version.h>
#include "version.h"

32
#include <boost/beast/http/field.hpp>
33
34
35
36

#define stdBind(fun) std::bind(&CARestAPI::fun, \
          this, \
          std::placeholders::_1, \
37
38
          std::placeholders::_2, \
	  std::placeholders::_3)
39

Alessio Netti's avatar
Alessio Netti committed
40
41
42
43
44
CARestAPI::CARestAPI(serverSettings_t settings, 
                     influx_t* influxSettings,
                     SensorCache* sensorCache, 
                     SensorDataStore* sensorDataStore,
                     SensorConfig *sensorConfig,
45
                     AnalyticsController* analyticsController,
Alessio Netti's avatar
Alessio Netti committed
46
47
48
                     SimpleMQTTServer* mqttServer,
                     boost::asio::io_context& io) :
                     RESTHttpsServer(settings, io),
49
                     _influxSettings(influxSettings),
50
                     _sensorCache(sensorCache),
51
                     _sensorDataStore(sensorDataStore),
Michael Ott's avatar
Michael Ott committed
52
                     _sensorConfig(sensorConfig),
53
54
                     _analyticsController(analyticsController),
                     _mqttServer(mqttServer) {
Michael Ott's avatar
Michael Ott committed
55
56
    _influxCounter = 0;
			 
57
    addEndpoint("/help",    {http::verb::get, stdBind(GET_help)});
Michael Ott's avatar
Michael Ott committed
58
59
    addEndpoint("/version", {http::verb::get, stdBind(GET_version)});
    addEndpoint("/hosts",   {http::verb::get, stdBind(GET_hosts)});
60
    addEndpoint("/average", {http::verb::get, stdBind(GET_average)});
Alessio Netti's avatar
Alessio Netti committed
61
    addEndpoint("/quit", {http::verb::put, stdBind(PUT_quit)});
62

63
64
65
66
    addEndpoint("/ping", {http::verb::get, stdBind(GET_ping)});
    addEndpoint("/query", {http::verb::post, stdBind(POST_query)});
    addEndpoint("/write", {http::verb::post, stdBind(POST_write)});

67
68
69
    _analyticsController->getManager()->addRestEndpoints(this);

    addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
70
71
    addEndpoint("/analytics/load", {http::verb::put, stdBind(PUT_analytics_load)});
    addEndpoint("/analytics/unload", {http::verb::put, stdBind(PUT_analytics_unload)});
72
    addEndpoint("/analytics/navigator", {http::verb::put, stdBind(PUT_analytics_navigator)});
73
74
75
76
77
78
79
}

void CARestAPI::GET_help(endpointArgs) {
    res.body() = caRestCheatSheet + _analyticsController->getManager()->restCheatSheet;
    res.result(http::status::ok);
}

Michael Ott's avatar
Michael Ott committed
80
81
82
83
84
void CARestAPI::GET_version(endpointArgs) {
    res.body() = "CollectAgent " + std::string(VERSION) + " (libdcdb " + DCDB::Version::getVersion() + ")";
    res.result(http::status::ok);
}

85
86
87
88
89
90
91
92
void CARestAPI::GET_hosts(endpointArgs) {
    if (!_mqttServer) {
        res.body() = "The MQTT server is not initialized!";
        res.result(http::status::internal_server_error);
        return;
    }
    std::ostringstream data;
    data << "address,clientID,lastSeen" << std::endl;
93
    std::map<std::string, hostInfo_t> hostsVec = _mqttServer->collectLastSeen();
94
    for(auto &el : hostsVec) {
95
        data << el.second.address <<  "," << el.second.clientId << "," << std::to_string(el.second.lastSeen) << std::endl;
96
97
98
99
100
    }
    res.body() = data.str();
    res.result(http::status::ok);
}

101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
void CARestAPI::GET_average(endpointArgs) {
    const std::string sensor = getQuery("sensor", queries);
    const std::string interval = getQuery("interval", queries);

    if (sensor == "") {
        res.body() = "Request malformed: sensor query missing";
        res.result(http::status::bad_request);
        return;
    }

    uint64_t time = 0;

    if (interval != "") {
        try {
            time = std::stoul(interval);
        } catch (const std::exception& e) {
            RESTAPILOG(warning) << "Bad interval query: " << e.what();
            res.body() = "Bad interval query!\n";
            res.result(http::status::bad_request);
            return;
        }
    }

    //try getting the latest value
    try {
        int64_t val = _sensorCache->getSensor(sensor, (uint64_t) time * 1000000000);
        res.body() = "collectagent::" + sensor + " Average of last " +
                     std::to_string(time) + " seconds is " + std::to_string(val);
        res.result(http::status::ok);
        //std::ostringstream data;
        //data << val << "\n";
        //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
        //res.body() = data.str();
    } catch (const std::invalid_argument &e) {
        res.body() = "Error: Sensor id not found.\n";
        res.result(http::status::not_found);
    } catch (const std::out_of_range &e) {
        res.body() = "Error: Sensor unavailable.\n";
        res.result(http::status::no_content);
    } catch (const std::exception &e) {
        res.body() = "Internal server error.\n";
        res.result(http::status::internal_server_error);
        RESTAPILOG(warning) << "Internal server error: " << e.what();
    }
}

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
void CARestAPI::GET_ping(endpointArgs) {
    res.body() = "";
    res.result(http::status::ok);
}

void CARestAPI::POST_query(endpointArgs) {
    res.set(http::field::content_type, "application/json");
    res.body() = "{results: [{statement_id: 0}]}";
    res.result(http::status::ok);
}

void CARestAPI::POST_write(endpointArgs) {
    std::istringstream body(req.body());
    std::string line;
    while (std::getline(body, line)) {
162
	// Regex to split line into measurement, tags, fields, timestamp
163
	boost::regex r1("^([^,]*)(,[^ ]*)? ([^ ]*)( .*)?$", boost::regex::extended);
164
	// Regex to split comma-separated tags and fields into individual entries
165
166
167
168
169
	boost::regex r2(",?([^,=]*)=([^,]*)", boost::regex::extended);

	boost::smatch m1, m2;
	if (boost::regex_search(line, m1, r1)) {
	    std::string measurement = m1[1].str();
170
171
172
	    auto m = _influxSettings->measurements.find(measurement);
	    if (m != _influxSettings->measurements.end()) {
		influx_measurement_t influx = m->second;
173
174
		
		// Parse tags into a map
175
		std::map<std::string, std::string> tags;
176
177
		std::string tagList = m1[2].str();
		while (boost::regex_search(tagList, m2, r2)) {
178
		    tags[m2[1].str()] = m2[2].str();
179
		    tagList = m2.suffix().str();
180
181
		}

182
		auto t = tags.find(influx.tag);
183
		if (t != tags.end()) {
184
185
186
187
188
189
190
191
192
193
194
		    std::string tagName = t->second;
		    // Perform pattern filter or substitution via regex on tag
		    if (!influx.tagRegex.empty()) {
			std::string input(tagName);
			tagName = "";
			boost::regex_replace(std::back_inserter(tagName), input.begin(), input.end(), influx.tagRegex, influx.tagSubstitution.c_str(), boost::regex_constants::format_sed | boost::regex_constants::format_no_copy);
			if (tagName.size() == 0) {
			    // There was no match
			    break;
			}
		    }
195
		    std::map<std::string, std::string> fields;
196
197
		    std::string fieldList = m1[3].str();
		    while (boost::regex_search(fieldList, m2, r2)) {
198
			fields[m2[1].str()] = m2[2].str();
199
			fieldList = m2.suffix().str();
200
201
202
203
204
205
206
207
		    }
		    
		    DCDB::TimeStamp ts;
		    try {
			ts = TimeStamp(m1[4].str());
		    } catch (...) {
		    }

208
209
210
		    for (auto &f: fields) {
			// If no fields were defined, we take any field
			if (influx.fields.empty() || (influx.fields.find(f.first) != influx.fields.end())) {
211
			    std::string mqttTopic = _influxSettings->mqttPrefix + m->second.mqttPart + "/" + tagName + "/" + f.first;
212
213
			    uint64_t value = 0;
			    try {
214
				value = stoull(f.second);
215
216
217
218
219
220
221
			    } catch (...) {
				break;
			    }

			    DCDB::SensorId sid;
			    if (sid.mqttTopicConvert(mqttTopic)) {
				_sensorCache->storeSensor(sid, ts.getRaw(), value);
222
				_sensorDataStore->insert(sid, ts.getRaw(), value);
Michael Ott's avatar
Michael Ott committed
223
				_influxCounter++;
Michael Ott's avatar
Michael Ott committed
224
225
226
227
228
				
				if (_influxSettings->publish && (_influxSensors.find(sid.getId()) == _influxSensors.end())) {
				    _influxSensors.insert(sid.getId());
				    _sensorConfig->publishSensor(sid.getId().c_str(), sid.getId().c_str());
				}
229
			    }
230
231
232
#ifdef DEBUG
			    LOG(debug) << "influx insert: " << mqttTopic << " " << ts.getRaw() << " " << value;
#endif
233
234
235
			}
		    }
		}
236
237
	    } else {
		LOG(debug) << "influx: unknown measurement " << measurement;
Michael Ott's avatar
Michael Ott committed
238
239
240
#ifdef DEBUG
		LOG(debug) << "influx line: " << line;
#endif
241
242
243
244
245
246
247
	    }
	}
    }
    res.body() = "";
    res.result(http::status::no_content);
}

Alessio Netti's avatar
Alessio Netti committed
248
249
250
251
252
253
254
255
256
257
void CARestAPI::PUT_quit(endpointArgs) {
    int retCode = getQuery("code", queries)=="" ? 0 : std::stoi(getQuery("code", queries));
    if(retCode<0 || retCode>255)
        retCode = 0;
    _retCode = retCode;
    raise(SIGUSR1);
    res.body() = "Quitting with return code " + std::to_string(retCode) + ".\n";
    res.result(http::status::ok);
}

258
void CARestAPI::PUT_analytics_reload(endpointArgs) {
259
260
    if (_analyticsController->getManager()->getStatus() != OperatorManager::LOADED) {
        res.body() = "OperatorManager is not loaded!\n";
261
262
263
264
265
266
267
268
269
        res.result(http::status::internal_server_error);
        return;
    }

    const std::string plugin   = getQuery("plugin", queries);

    // Wait until controller is paused in order to reload plugins
    _analyticsController->halt(true);

270
    if (!_analyticsController->getManager()->reload(plugin)) {
271
272
        res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
        res.result(http::status::not_found);
273
    } else if (!_analyticsController->getManager()->start(plugin)) {
274
275
276
277
278
279
280
281
282
        res.body() = "Plugin cannot be restarted!\n";
        res.result(http::status::internal_server_error);
    } else {
        res.body() = "Plugin " + plugin + ": Sensors reloaded\n";
        res.result(http::status::ok);
    }

    _analyticsController->resume();
}
283
284
285
286
287
288
289
290
291
292
293
294
295
296

void CARestAPI::PUT_analytics_load(endpointArgs) {
    const std::string plugin = getQuery("plugin", queries);
    const std::string path   = getQuery("path", queries);
    const std::string config = getQuery("config", queries);

    if (!hasPlugin(plugin, res)) {
        return;
    }

    // Wait until controller is paused in order to reload plugins
    _analyticsController->halt(true);

    if(_analyticsController->getManager()->loadPlugin(plugin, path, config)) {
297
        res.body() = "Operator plugin " + plugin + " successfully loaded!\n";
298
299
        res.result(http::status::ok);

300
        _analyticsController->getManager()->init(plugin);
301
    } else {
302
        res.body() = "Failed to load operator plugin " + plugin + "!\n";
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
        res.result(http::status::internal_server_error);
    }

    _analyticsController->resume();
}

void CARestAPI::PUT_analytics_unload(endpointArgs) {
    const std::string plugin = getQuery("plugin", queries);

    if (!hasPlugin(plugin, res)) {
        return;
    }

    // Wait until controller is paused in order to reload plugins
    _analyticsController->halt(true);

    _analyticsController->getManager()->unloadPlugin(plugin);
320
    res.body() = "Operator plugin " + plugin + " unloaded.\n";
321
322
323
324
    res.result(http::status::ok);

    _analyticsController->resume();
}
325
326
327
328
329
330
331
332
333
334
335

void CARestAPI::PUT_analytics_navigator(endpointArgs) {
    if(!_analyticsController->rebuildSensorNavigator()) {
        res.body() = "Sensor hierarchy tree could not be rebuilt.\n";
        res.result(http::status::internal_server_error);
    } else {
        std::shared_ptr <SensorNavigator> navigator = QueryEngine::getInstance().getNavigator();
        res.body() = "Built a sensor hierarchy tree of size " + std::to_string(navigator->getTreeSize()) + " and depth " + std::to_string(navigator->getTreeDepth()) + ".\n";
        res.result(http::status::ok);
    }
}