Commit b9c4cf89 authored by Micha Müller's avatar Micha Müller
Browse files

Add full support for separate storage of Caliper Event data

-CollectAgent and Pusher adapted for special handling of event data
-Handful of other fixes and improvements
parent 1c33996d
......@@ -54,6 +54,7 @@
#include <dcdb/connection.h>
#include <dcdb/sensordatastore.h>
#include <dcdb/jobdatastore.h>
#include <dcdb/calievtdatastore.h>
#include <dcdb/sensorconfig.h>
#include <dcdb/version.h>
#include <dcdb/sensor.h>
......@@ -85,6 +86,7 @@ DCDB::Connection* dcdbConn;
DCDB::SensorDataStore *mySensorDataStore;
DCDB::JobDataStore *myJobDataStore;
DCDB::SensorConfig *mySensorConfig;
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
MetadataStore *metadataStore;
DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance();
......@@ -248,6 +250,77 @@ int mqttCallback(SimpleMQTTMessage *msg)
default:
break;
}
} else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
/*
* Special message case. This message contains a Caliper Event data
* string that is encoded in the MQTT topic. Its payload consists of
* usual timestamp-value pairs.
* Data from this messages will be stored in a separate table managed
* by CaliEvtDataStore class.
*/
std::string topicStr(msg->getTopic());
mqttPayload buf, *payload;
len = msg->getPayloadLength();
//TODO support case that no timestamp is given?
//retrieve timestamp and value from the payload
if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
payload = (mqttPayload *) msg->getPayload();
} else {
//this message is malformed -> ignore
LOG(error) << "Message malformed";
return 1;
}
/*
* Decode message topic in actual sensor topic and string data.
* "/:/" is used as delimiter between topic and data.
*/
topicStr.erase(0, DCDB_CALIEVT_LEN);
size_t pos = topicStr.find("/:/");
if (pos == std::string::npos) {
// topic is malformed -> ignore
LOG(error) << "CaliEvt topic malformed";
return 1;
}
const std::string data(topicStr, pos+3);
topicStr.erase(pos);
/*
* We use the same MQTT-topic/SensorId infrastructure as actual sensor
* data readings to sort related events.
* Check if we can decode the event topic into a valid SensorId. If
* successful, store the record in the database.
*/
DCDB::SensorId sid;
if (sid.mqttTopicConvert(topicStr)) {
std::list<DCDB::CaliEvtData> events;
DCDB::CaliEvtData e;
e.eventId = sid;
e.event = data;
for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
e.timeStamp = payload[i].timestamp;
/**
* We want an exhaustive list of all events ordered by their
* time of occurrence. Payload values should always be
* one. Other values currently indicate a malformed message.
*
* In the future, the value field could be used to aggregate
* multiple equal events that occurred in the same plugin
* read cycle.
*/
if(payload[i].value != 1) {
LOG(error) << "CaliEvt message malformed. Value != 1";
return 1;
}
events.push_back(e);
}
myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
} else {
LOG(error) << "Topic could not be converted to SID";
}
} else {
mqttPayload buf, *payload;
......@@ -499,13 +572,17 @@ int main(int argc, char* const argv[]) {
mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
mySensorConfig = new DCDB::SensorConfig(dcdbConn);
myJobDataStore = new DCDB::JobDataStore(dcdbConn);
myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
/*
* Set TTL for data store inserts if TTL > 0.
*/
if (cassandraSettings.ttl > 0)
if (cassandraSettings.ttl > 0) {
mySensorDataStore->setTTL(cassandraSettings.ttl);
myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
}
mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
// Fetching public sensor information from the Cassandra datastore
list<DCDB::PublicSensor> publicSensors;
......@@ -656,6 +733,7 @@ int main(int argc, char* const argv[]) {
delete mySensorDataStore;
delete myJobDataStore;
delete mySensorConfig;
delete myCaliEvtDataStore;
dcdbConn->disconnect();
delete dcdbConn;
delete metadataStore;
......
......@@ -55,6 +55,8 @@
#define DCDB_MAP_LEN 10
#define DCDB_MET "/DCDB_MAP/METADATA/"
#define DCDB_MET_LEN 19
#define DCDB_CALIEVT "/DCDB_CE/"
#define DCDB_CALIEVT_LEN 9
#pragma pack(push,1)
......
......@@ -740,13 +740,13 @@ private:
std::string info;
if (!begin_evt_entry.is_empty()) {
info = "evt_begin#";
info = "evt_begin/";
trigger_id = set_evt_entry.value().to_id();
} else if (!set_evt_entry.is_empty()) {
info = "evt_set#";
info = "evt_set/";
trigger_id = begin_evt_entry.value().to_id();
} else if (!end_evt_entry.is_empty()) {
info = "evt_end#";
info = "evt_end/";
trigger_id = end_evt_entry.value().to_id();
} else {
//should not happen...
......@@ -760,7 +760,7 @@ private:
info += trigger_attribute.name() + "/" + trigger_entry.value().to_string();
++snapshots_event;
data_size = snprintf(data, max_dat_size, "cpu%d/e/%s", cpu, info.c_str());
data_size = snprintf(data, max_dat_size, "Ecpu%d/%s", cpu, info.c_str());
} else if (!sampler_pc_entry.is_empty()) {
//is a sampler-triggered snapshot
++snapshots_sampler;
......@@ -783,10 +783,10 @@ private:
auto f_it = a_it->symbols.lower_bound(f_tmp);
if (f_it != a_it->symbols.end() && pc >= f_it->start_addr && pc <= f_it->end_addr) {
data_size = snprintf(data, max_dat_size, "cpu%d/s/%s/%s", cpu, a_it->pathname.c_str(), f_it->name.c_str());
data_size = snprintf(data, max_dat_size, "Scpu%d/%s/%s", cpu, a_it->pathname.c_str(), f_it->name.c_str());
} else {
//It's OK if we found no symbol. There are possibly none associated to this range
data_size = snprintf(data, max_dat_size, "cpu%d/s/%s", cpu, a_it->pathname.c_str());
data_size = snprintf(data, max_dat_size, "Scpu%d/%s", cpu, a_it->pathname.c_str());
}
} else {
//PC was not within any range --> tell the update service to do his job
......
......@@ -204,12 +204,12 @@ int MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& total
#endif
//try to send them to the broker
int rc;
if ((rc = mosquitto_publish(_mosq, NULL, (s.getMqtt()).c_str(), sizeof(reading_t)*count, reads, _qosLevel, false)) != MOSQ_ERR_SUCCESS) {
if ((rc = mosquitto_publish(_mosq, NULL, s.getMqtt().c_str(), sizeof(reading_t)*count, reads, _qosLevel, false)) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
if (rc == MOSQ_ERR_NOMEM) {
LOGM(info) << "Can\'t queue additional messages";
} else {
LOGM(error) << "Could not send message! Trying again later";
LOGM(error) << "Could not send message: " << mosquitto_strerror(rc) << " Trying again later";
_connected = false;
}
s.pushReadingQueue(reads, count);
......
......@@ -27,6 +27,7 @@
#include "CaliperSensorGroup.h"
#include <algorithm>
#include <chrono>
#include <errno.h>
#include <stdio.h>
......@@ -212,7 +213,7 @@ void CaliperSensorGroup::read() {
}
//clean up sensors if required
//If they still contain unpushed values, the will get lost
//If they still contain unpushed values, they will get lost
if (_sensorIndex.size() > _maxSensorNum) {
_sensorIndex.clear();
acquireSensors();
......@@ -297,9 +298,16 @@ void CaliperSensorGroup::read() {
reading.timestamp = *(reinterpret_cast<uint64_t*>(&_buf[bufIdx]));
bufIdx += sizeof(uint64_t);
std::string data = std::string(&_buf[bufIdx]);
std::string data(&_buf[bufIdx]);
bufIdx += data.size() + 1;
const bool event = (data[0] == 'E' ? true : false);
data.erase(0, 1);
// '#' and '+' are not allowed in MQTT topics. Get rid of them here
std::replace(data.begin(), data.end(), '#', '/');
std::replace(data.begin(), data.end(), '+', '.');
std::string cpu = "/" + data.substr(0, data.find_first_of('/'));
std::string top = data.substr(data.find_first_of('/'));
......@@ -312,7 +320,15 @@ void CaliperSensorGroup::read() {
} else {
//unknown function or event --> create a new sensor
s = std::make_shared<CaliperSensorBase>(data);
s->setMqtt(_globalMqttPrefix + cpu + _mqttPart + top);
if (event) {
//Events shall be stored in a separate table. For
//identification they get a unique topic prefix. Also,
//data will be encoded in the topic but separated so it can
//be split again in the CollectAgent
s->setMqtt("/DCDB_CE/" + _globalMqttPrefix + cpu + _mqttPart + "/:/" + top);
} else {
s->setMqtt(_globalMqttPrefix + cpu + _mqttPart + top);
}
s->setName(s->getMqtt());
s->initSensor(_interval);
......@@ -324,15 +340,25 @@ void CaliperSensorGroup::read() {
}
// temporarily store and aggregate value in the cache
if (cache.find(data) == cache.end()) {
//no cache entry yet
cache[data] = std::make_pair(s, reading);
} else {
//update cache entry. Use timestamp of last aggregated value
cache[data].second.value += reading.value;
if (reading.timestamp > cache[data].second.timestamp) {
cache[data].second.timestamp = reading.timestamp;
// currently only done for Sample data
if (!event) {
if (cache.find(data) == cache.end()) {
//no cache entry yet
cache[data] = std::make_pair(s, reading);
} else {
//update cache entry. Use timestamp of last aggregated value
cache[data].second.value += reading.value;
if (reading.timestamp > cache[data].second.timestamp) {
cache[data].second.timestamp = reading.timestamp;
}
}
} else {
//NOTE: if event data shall ever be cached and aggregated, too,
//CollectAgent's message processing has to be adapted!
s->storeReading(reading);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << " (E) raw reading: \"" << reading.value << "\"";
#endif
}
}
......@@ -340,7 +366,7 @@ void CaliperSensorGroup::read() {
for (auto& it : cache) {
it.second.first->storeReading(it.second.second);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << it.second.first->getName() << " raw reading: \"" << it.second.second.value << "\"";
LOG(debug) << _groupName << "::" << it.second.first->getName() << " (S) raw reading: \"" << it.second.second.value << "\"";
#endif
}
}
......
......@@ -40,6 +40,24 @@
namespace DCDB {
//TODO Deduplicate (duplicate of sensordatastore_internal.h)
//Definition of callback function for Cassandra inserts
//prints debug output on insert failure
void CaliEvtDataStoreImpl_on_result(CassFuture_* future, void* data) {
/* This result will now return immediately */
static CassError rcPrev = CASS_OK;
static uint32_t ctr = 0;
CassError rc = cass_future_error_code(future);
if(rc != CASS_OK) {
if(rc != rcPrev) {
std::cout << "Cassandra Backend Error (CaliEvt): " << cass_error_desc(rc) << std::endl;
ctr = 0;
rcPrev = rc;
} else if(++ctr%10000 == 0)
std::cout << "Cassandra Backend Error (CaliEvt): " << cass_error_desc(rc) << " (" << ctr << " more)" << std::endl;
}
}
/**
* @brief The CaliEvtDataStoreImpl class contains all protected
* functions belonging to CaliEvtDataStore which are
......
//================================================================================
// Name : calievtdatastore.cpp
// Author : Axel Auweter, Micha Mueller
// Author : Micha Mueller
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : C++ API implementation for inserting and querying DCDB job data.
// Description : C++ API implementation for inserting and querying Caliper event data.
//================================================================================
//================================================================================
......@@ -62,7 +62,7 @@ void CaliEvtDataStoreImpl::prepareInsert(uint64_t ttl) {
}
query = "INSERT INTO " CED_KEYSPACE_NAME "." CF_CALIEVTDATA
" (name, ws, ts, value) VALUES (?, ?, ?, ?) USING TTL ? ;";
" (sid, ws, ts, value) VALUES (?, ?, ?, ?) USING TTL ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
......@@ -79,7 +79,7 @@ void CaliEvtDataStoreImpl::prepareInsert(uint64_t ttl) {
cass_prepared_free(preparedInsert_noTTL);
}
query = "INSERT INTO " CED_KEYSPACE_NAME "." CF_CALIEVTDATA
" (name, ws, ts, value) VALUES (?, ?, ?, ?);";
" (sid, ws, ts, value) VALUES (?, ?, ?, ?);";
future = cass_session_prepare(session, query);
cass_future_wait(future);
......@@ -87,7 +87,7 @@ void CaliEvtDataStoreImpl::prepareInsert(uint64_t ttl) {
if (rc != CASS_OK) {
connection->printError(future);
} else {
preparedInsert = cass_future_get_prepared(future);
preparedInsert_noTTL = cass_future_get_prepared(future);
}
cass_future_free(future);
......@@ -164,10 +164,9 @@ void CaliEvtDataStoreImpl::insertBatch(std::list<CaliEvtData>& datas, int64_t tt
CassFuture *future = cass_session_execute_batch(session, batch);
cass_batch_free(batch);
//Requires definition of DataStoreImpl_on_result (see sensordatastore_internal.h)
// if(debugLog) {
// cass_future_set_callback(future, DataStoreImpl_on_result, NULL);
// }
if(debugLog) {
cass_future_set_callback(future, CaliEvtDataStoreImpl_on_result, NULL);
}
/* Don't wait for the future, just free it to make the call truly asynchronous */
cass_future_free(future);
......
......@@ -518,13 +518,13 @@ bool ConnectionImpl::initSchema() {
if (!existsColumnFamily(CF_CALIEVTDATA)) {
std::cout << "Creating Column Family " CF_CALIEVTDATA "...\n";
createColumnFamily(CF_CALIEVTDATA,
"name varchar, " /* Public name */
"sid varchar, " /* Public name/SensorID */
"ws smallint, " /* Weekstamp to ensure that the maximum number of
2^32 columns per key is not exceeded */
"ts bigint, " /* Timestamp of a Caliper Event */
"value varchar ", /* String representation of the Event */
"value varchar", /* String representation of the Event */
"name, ws, ts", /* Make the "name", "ws", and "ts" columns together
"sid, ws, ts", /* Make the "sid", "ws", and "ts" columns together
the primary key */
"COMPACT STORAGE" /* Enable compact storage */
); /* No further options required */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment