Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
dcdb
dcdb
Commits
f1c1f940
Commit
f1c1f940
authored
May 06, 2015
by
Axel Auweter
Browse files
Fix ticket #35 "CollectAgent should support insertions with TTL".
parent
947b0d85
Changes
6
Hide whitespace changes
Inline
Side-by-side
CollectAgent/collectagent.cpp
View file @
f1c1f940
...
...
@@ -28,7 +28,7 @@ int keepRunning;
uint64_t
msgCtr
;
uint64_t
pmsgCtr
;
SensorDataStore
*
mySensorDataStore
;
std
::
string
listenHost
,
cassandraHost
;
std
::
string
listenHost
,
cassandraHost
,
ttl
;
/* Normal termination (SIGINT, CTRL+C) */
void
sigHandler
(
int
sig
)
...
...
@@ -122,11 +122,13 @@ void mqttCallback(SimpleMQTTMessage *msg)
* Print usage information
*/
void
usage
()
{
printf
(
"Usage: collectagent [-D] [-l <host>] [-h <host>]
\n
"
);
printf
(
"Usage: collectagent [-D] [-l <host>] [-h <host>]
[-t <ttl>]
\n
"
);
printf
(
"Collectagent will accept remote connections by listening to the
\n
"
);
printf
(
"specified listen address (-l <host>) at port 1883 (default MQTT port).
\n
"
);
printf
(
"It will also connect to cassandra to the specifiec addres (-h <host>).
\n
"
);
printf
(
"The default <host> is localhost/127.0.0.1.
\n
"
);
printf
(
"If the -t option is specified, data will be inserted with the specified
\n
"
);
printf
(
"TTL in seconds.
\n
"
);
printf
(
"If the -D option is specified, CollectAgent will run as daemon.
\n\n
"
);
}
...
...
@@ -149,7 +151,8 @@ int main(int argc, char* const argv[]) {
int
ret
;
listenHost
=
"localhost"
;
cassandraHost
=
"127.0.0.1"
;
while
((
ret
=
getopt
(
argc
,
argv
,
"h:l:D?"
))
!=-
1
)
{
ttl
=
"0"
;
while
((
ret
=
getopt
(
argc
,
argv
,
"h:l:t:D?"
))
!=-
1
)
{
switch
(
ret
)
{
case
'h'
:
cassandraHost
=
optarg
;
...
...
@@ -157,6 +160,9 @@ int main(int argc, char* const argv[]) {
case
'l'
:
listenHost
=
optarg
;
break
;
case
't'
:
ttl
=
optarg
;
break
;
case
'D'
:
daemon
(
1
,
1
);
break
;
...
...
@@ -170,8 +176,14 @@ int main(int argc, char* const argv[]) {
/*
* Allocate and initialize sensor data store.
*/
uint64_t
ttlInt
;
std
::
istringstream
ttlParser
(
ttl
);
if
(
!
(
ttlParser
>>
ttlInt
))
{
std
::
cout
<<
"Invalid TTL!"
<<
std
::
endl
;
exit
(
EXIT_FAILURE
);
}
std
::
string
sdHost
(
cassandraHost
);
mySensorDataStore
=
new
SensorDataStore
(
sdHost
,
9042
);
mySensorDataStore
=
new
SensorDataStore
(
sdHost
,
9042
,
ttlInt
);
/*
* Start the MQTT Message Server.
...
...
DCDBLib/include/sensordatastore.h
View file @
f1c1f940
...
...
@@ -103,8 +103,9 @@ public:
* connection to the database.
* @param hostname The hostname or IP address of the database node to connect to.
* @param port The port on which the database node is listening.
* @param ttl The TTL for data inserted into the data store (set to 0 for unlimited)
*/
void
init
(
std
::
string
hostname
,
int
port
);
void
init
(
std
::
string
hostname
,
int
port
,
uint64_t
ttl
);
/**
* @brief This function populates a preallocated SensorId object
...
...
@@ -135,8 +136,9 @@ public:
* @brief The standard constructor for a SensorDataStore object.
* @param hostname A string containing the hostname or IP address of the database server.
* @param port A integer containing the port number on which the database server is listening.
* @param ttl A uint64_t containing the TTL for inserted data items.
*/
SensorDataStore
(
std
::
string
hostname
,
int
port
);
SensorDataStore
(
std
::
string
hostname
,
int
port
,
uint64_t
ttl
);
/**
* @brief The standard destructor for a SensorDatStore object.
...
...
DCDBLib/include_internal/cassandraBackend.h
View file @
f1c1f940
...
...
@@ -140,7 +140,7 @@ public:
/**
* @brief Prepare for insertions
*/
void
prepareInsert
();
void
prepareInsert
(
uint64_t
ttl
);
/* Class constructor / desctructor */
...
...
DCDBLib/include_internal/sensordatastore_internal.h
View file @
f1c1f940
...
...
@@ -50,8 +50,9 @@ public:
* @brief This function connects to the database and initializes keyspaces and column families
* @param hostname The hostname of a Cassandra front-end node
* @param port The port number of the Cassandra front-end node
* @param ttl The TTL for inserted data (set to 0 to insert data without TTL)
*/
void
init
(
std
::
string
hostname
,
int
port
);
void
init
(
std
::
string
hostname
,
int
port
,
uint64_t
ttl
);
/**
* @brief This function converts a MQTT topic string to a SensorId object
...
...
DCDBLib/src/cassandraBackend.cpp
View file @
f1c1f940
...
...
@@ -10,6 +10,11 @@
#include
<boost/lexical_cast.hpp>
#include
<cstdio>
#include
<cstdlib>
#include
<cstdint>
#include
<cinttypes>
#include
"cassandraBackend.h"
/**
...
...
@@ -292,11 +297,22 @@ void CassandraBackend::insert(std::string key, uint64_t ts, uint64_t value)
* insert CQL query in advance and only bind it on the actual
* insert.
*/
void
CassandraBackend
::
prepareInsert
()
void
CassandraBackend
::
prepareInsert
(
uint64_t
ttl
)
{
CassError
rc
=
CASS_OK
;
CassFuture
*
future
=
NULL
;
CassString
query
=
cass_string_init
(
"INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?);"
);
CassString
query
;
char
*
queryBuf
=
NULL
;
if
(
ttl
==
0
)
{
query
=
cass_string_init
(
"INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?);"
);
}
else
{
queryBuf
=
(
char
*
)
malloc
(
256
);
snprintf
(
queryBuf
,
256
,
"INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?) USING TTL %"
PRIu64
" ;"
,
ttl
);
query
=
cass_string_init
(
queryBuf
);
}
future
=
cass_session_prepare
(
session
,
query
);
cass_future_wait
(
future
);
...
...
@@ -309,6 +325,9 @@ void CassandraBackend::prepareInsert()
}
cass_future_free
(
future
);
if
(
queryBuf
)
{
free
(
queryBuf
);
}
}
CassandraBackend
::
CassandraBackend
()
...
...
DCDBLib/src/sensordatastore.cpp
View file @
f1c1f940
...
...
@@ -98,7 +98,7 @@ bool SensorDataStoreImpl::topicToSid(SensorId* sid, std::string topic)
* Applications should not call this function directly, but
* use the init function provideed by the SensorDataStore class.
*/
void
SensorDataStoreImpl
::
init
(
std
::
string
hostname
,
int
port
)
{
void
SensorDataStoreImpl
::
init
(
std
::
string
hostname
,
int
port
,
uint64_t
ttl
)
{
/*
* Open the connection to the Cassandra database and
...
...
@@ -152,7 +152,7 @@ void SensorDataStoreImpl::init(std::string hostname, int port) {
}
/* Prepare for optimized insertions */
csBackend
->
prepareInsert
();
csBackend
->
prepareInsert
(
ttl
);
}
/**
...
...
@@ -208,7 +208,7 @@ SensorDataStoreImpl::~SensorDataStoreImpl()
* Once this is ensured, the actual initialization work is
* performed by the init function of SensorDataStoreImpl.
*/
void
SensorDataStore
::
init
(
std
::
string
hostname
,
int
port
)
void
SensorDataStore
::
init
(
std
::
string
hostname
,
int
port
,
uint64_t
ttl
)
{
/* Allocate new SensorDataStoreImpl Object if necessary */
if
(
!
impl
)
{
...
...
@@ -216,7 +216,7 @@ void SensorDataStore::init(std::string hostname, int port)
}
/* Call the Impl class init function */
impl
->
init
(
hostname
,
port
);
impl
->
init
(
hostname
,
port
,
ttl
);
}
/**
...
...
@@ -251,7 +251,7 @@ SensorDataStore::SensorDataStore()
{
csBackend
=
new
CassandraBackend
();
impl
=
nullptr
;
init
(
"localhost"
,
9042
);
init
(
"localhost"
,
9042
,
0
);
}
/**
...
...
@@ -264,11 +264,11 @@ SensorDataStore::SensorDataStore()
* object that is then used by SensorDataStoreImpl for doing
* the raw database accesses.
*/
SensorDataStore
::
SensorDataStore
(
std
::
string
hostname
,
int
port
)
SensorDataStore
::
SensorDataStore
(
std
::
string
hostname
,
int
port
,
uint64_t
ttl
)
{
csBackend
=
new
CassandraBackend
();
impl
=
nullptr
;
init
(
hostname
,
port
);
init
(
hostname
,
port
,
ttl
);
}
/**
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment