Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 31684fae authored by Micha Müller's avatar Micha Müller
Browse files

Insert Cali events individually into Cassandra. Batch inserts sometimes get rejected as too large

parent 1894f272
......@@ -326,7 +326,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
*/
DCDB::SensorId sid;
if (sid.mqttTopicConvert(topicStr)) {
std::list<DCDB::CaliEvtData> events;
//std::list<DCDB::CaliEvtData> events;
DCDB::CaliEvtData e;
e.eventId = sid;
e.event = data;
......@@ -347,9 +347,10 @@ int mqttCallback(SimpleMQTTMessage *msg)
return 1;
}
events.push_back(e);
myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
//events.push_back(e);
}
myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
//myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
} else {
LOG(error) << "Topic could not be converted to SID";
}
......
......@@ -50,7 +50,11 @@ void CaliEvtDataStoreImpl_on_result(CassFuture_* future, void* data) {
CassError rc = cass_future_error_code(future);
if(rc != CASS_OK) {
if(rc != rcPrev) {
std::cout << "Cassandra Backend Error (CaliEvt): " << cass_error_desc(rc) << std::endl;
const char* error_msg;
size_t error_msg_len;
cass_future_error_message(future, &error_msg, &error_msg_len);
std::string msg(error_msg, error_msg_len);
std::cout << "Cassandra Backend Error (CaliEvt): " << cass_error_desc(rc) << ": " << msg << std::endl;
ctr = 0;
rcPrev = rc;
} else if(++ctr%10000 == 0)
......
......@@ -123,9 +123,14 @@ void CaliEvtDataStoreImpl::insert(SensorId* sid, uint64_t ts, const std::string&
if(ttlReal>0)
cass_statement_bind_int32(statement, 4, ttlReal);
/* Execute statement */
CassFuture* future = cass_session_execute(session, statement);
cass_statement_free(statement);
if(debugLog) {
cass_future_set_callback(future, CaliEvtDataStoreImpl_on_result, NULL);
}
/* Don't wait for the future, just free it to make the call truly asynchronous */
cass_future_free(future);
}
......@@ -143,7 +148,7 @@ void CaliEvtDataStoreImpl::insertBatch(std::list<CaliEvtData>& datas, int64_t tt
int64_t ttlReal = (ttl<0 ? defaultTTL : ttl);
for (auto d: datas) {
for (auto& d: datas) {
/* Calculate and insert week number */
uint16_t week = d.timeStamp.getRaw() / 604800000000000;
d.eventId.setRsvd(week);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment