Commit 8996f185 authored by Daniele Tafani's avatar Daniele Tafani
Browse files

Merge branch 'master' of ssh://deep-ras.srv.lrz.de/git/dcdb

parents 739421e0 f1c1f940
......@@ -28,7 +28,7 @@ int keepRunning;
uint64_t msgCtr;
uint64_t pmsgCtr;
SensorDataStore *mySensorDataStore;
std::string listenHost, cassandraHost;
std::string listenHost, cassandraHost, ttl;
/* Normal termination (SIGINT, CTRL+C) */
void sigHandler(int sig)
......@@ -122,11 +122,13 @@ void mqttCallback(SimpleMQTTMessage *msg)
* Print usage information
*/
void usage() {
printf("Usage: collectagent [-D] [-l <host>] [-h <host>]\n");
printf("Usage: collectagent [-D] [-l <host>] [-h <host>] [-t <ttl>]\n");
printf("Collectagent will accept remote connections by listening to the\n");
printf("specified listen address (-l <host>) at port 1883 (default MQTT port).\n");
printf("It will also connect to cassandra to the specifiec addres (-h <host>).\n");
printf("The default <host> is localhost/127.0.0.1.\n");
printf("If the -t option is specified, data will be inserted with the specified\n");
printf("TTL in seconds.\n");
printf("If the -D option is specified, CollectAgent will run as daemon.\n\n");
}
......@@ -149,7 +151,8 @@ int main(int argc, char* const argv[]) {
int ret;
listenHost="localhost";
cassandraHost="127.0.0.1";
while ((ret=getopt(argc, argv, "h:l:D?"))!=-1) {
ttl="0";
while ((ret=getopt(argc, argv, "h:l:t:D?"))!=-1) {
switch(ret) {
case 'h':
cassandraHost = optarg;
......@@ -157,6 +160,9 @@ int main(int argc, char* const argv[]) {
case 'l':
listenHost = optarg;
break;
case 't':
ttl = optarg;
break;
case 'D':
daemon(1,1);
break;
......@@ -170,8 +176,14 @@ int main(int argc, char* const argv[]) {
/*
* Allocate and initialize sensor data store.
*/
uint64_t ttlInt;
std::istringstream ttlParser(ttl);
if (!(ttlParser >> ttlInt)) {
std::cout << "Invalid TTL!" << std::endl;
exit(EXIT_FAILURE);
}
std::string sdHost(cassandraHost);
mySensorDataStore = new SensorDataStore(sdHost, 9042);
mySensorDataStore = new SensorDataStore(sdHost, 9042, ttlInt);
/*
* Start the MQTT Message Server.
......
......@@ -103,8 +103,9 @@ public:
* connection to the database.
* @param hostname The hostname or IP address of the database node to connect to.
* @param port The port on which the database node is listening.
* @param ttl The TTL for data inserted into the data store (set to 0 for unlimited)
*/
void init(std::string hostname, int port);
void init(std::string hostname, int port, uint64_t ttl);
/**
* @brief This function populates a preallocated SensorId object
......@@ -135,8 +136,9 @@ public:
* @brief The standard constructor for a SensorDataStore object.
* @param hostname A string containing the hostname or IP address of the database server.
* @param port A integer containing the port number on which the database server is listening.
* @param ttl A uint64_t containing the TTL for inserted data items.
*/
SensorDataStore(std::string hostname, int port);
SensorDataStore(std::string hostname, int port, uint64_t ttl);
/**
* @brief The standard destructor for a SensorDatStore object.
......
......@@ -140,7 +140,7 @@ public:
/**
* @brief Prepare for insertions
*/
void prepareInsert();
void prepareInsert(uint64_t ttl);
/* Class constructor / desctructor */
......
......@@ -50,8 +50,9 @@ public:
* @brief This function connects to the database and initializes keyspaces and column families
* @param hostname The hostname of a Cassandra front-end node
* @param port The port number of the Cassandra front-end node
* @param ttl The TTL for inserted data (set to 0 to insert data without TTL)
*/
void init(std::string hostname, int port);
void init(std::string hostname, int port, uint64_t ttl);
/**
* @brief This function converts a MQTT topic string to a SensorId object
......
......@@ -10,6 +10,11 @@
#include <boost/lexical_cast.hpp>
#include <cstdio>
#include <cstdlib>
#include <cstdint>
#include <cinttypes>
#include "cassandraBackend.h"
/**
......@@ -292,11 +297,22 @@ void CassandraBackend::insert(std::string key, uint64_t ts, uint64_t value)
* insert CQL query in advance and only bind it on the actual
* insert.
*/
void CassandraBackend::prepareInsert()
void CassandraBackend::prepareInsert(uint64_t ttl)
{
CassError rc = CASS_OK;
CassFuture* future = NULL;
CassString query = cass_string_init("INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?);");
CassString query;
char *queryBuf = NULL;
if (ttl == 0) {
query = cass_string_init("INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?);");
}
else {
queryBuf = (char*)malloc(256);
snprintf(queryBuf, 256, "INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?) USING TTL %" PRIu64 " ;", ttl);
query = cass_string_init(queryBuf);
}
future = cass_session_prepare(session, query);
cass_future_wait(future);
......@@ -309,6 +325,9 @@ void CassandraBackend::prepareInsert()
}
cass_future_free(future);
if (queryBuf) {
free(queryBuf);
}
}
CassandraBackend::CassandraBackend()
......
......@@ -98,7 +98,7 @@ bool SensorDataStoreImpl::topicToSid(SensorId* sid, std::string topic)
* Applications should not call this function directly, but
* use the init function provideed by the SensorDataStore class.
*/
void SensorDataStoreImpl::init(std::string hostname, int port) {
void SensorDataStoreImpl::init(std::string hostname, int port, uint64_t ttl) {
/*
* Open the connection to the Cassandra database and
......@@ -152,7 +152,7 @@ void SensorDataStoreImpl::init(std::string hostname, int port) {
}
/* Prepare for optimized insertions */
csBackend->prepareInsert();
csBackend->prepareInsert(ttl);
}
/**
......@@ -208,7 +208,7 @@ SensorDataStoreImpl::~SensorDataStoreImpl()
* Once this is ensured, the actual initialization work is
* performed by the init function of SensorDataStoreImpl.
*/
void SensorDataStore::init(std::string hostname, int port)
void SensorDataStore::init(std::string hostname, int port, uint64_t ttl)
{
/* Allocate new SensorDataStoreImpl Object if necessary */
if (!impl) {
......@@ -216,7 +216,7 @@ void SensorDataStore::init(std::string hostname, int port)
}
/* Call the Impl class init function */
impl->init(hostname, port);
impl->init(hostname, port, ttl);
}
/**
......@@ -251,7 +251,7 @@ SensorDataStore::SensorDataStore()
{
csBackend = new CassandraBackend();
impl = nullptr;
init("localhost", 9042);
init("localhost", 9042, 0);
}
/**
......@@ -264,11 +264,11 @@ SensorDataStore::SensorDataStore()
* object that is then used by SensorDataStoreImpl for doing
* the raw database accesses.
*/
SensorDataStore::SensorDataStore(std::string hostname, int port)
SensorDataStore::SensorDataStore(std::string hostname, int port, uint64_t ttl)
{
csBackend = new CassandraBackend();
impl = nullptr;
init(hostname, port);
init(hostname, port, ttl);
}
/**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment