Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
9.2.2023: Due to updates GitLab will be unavailable for some minutes between 9:00 and 11:00.
Open sidebar
dcdb
dcdb
Commits
2e60dcb7
Commit
2e60dcb7
authored
Apr 05, 2018
by
Daniele Tafani
Browse files
Merge branch 'master' of
https://gitlab.lrz.de/dcdb/dcdb
parents
3ed2bd1d
5adb6dd8
Changes
13
Hide whitespace changes
Inline
Side-by-side
CollectAgent/collectagent.cpp
View file @
2e60dcb7
...
...
@@ -50,6 +50,14 @@
using
namespace
std
;
#define LISTENHOST "localhost"
#define LISTENPORT "1883"
#define CASSANDRAHOST "127.0.0.1"
#define CASSANDRAPORT "9042"
#define RESTAPIHOST "0.0.0.0"
#define RESTAPIPORT "8080"
#define TTL "0"
int
keepRunning
;
bool
statistics
;
uint64_t
msgCtr
;
...
...
@@ -192,15 +200,27 @@ int mqttCallback(SimpleMQTTMessage *msg)
* Print usage information
*/
void
usage
()
{
printf
(
"Usage: collectagent [-D] [-s] [-l <host>] [-h <host>] [-t <ttl>]
\n
"
);
printf
(
"Collectagent will accept remote connections by listening to the
\n
"
);
printf
(
"specified listen address (-l <host>) at port 1883 (default MQTT port).
\n
"
);
printf
(
"It will also connect to cassandra to the specifiec addres (-h <host>).
\n
"
);
printf
(
"The default <host> is localhost/127.0.0.1.
\n
"
);
printf
(
"If the -t option is specified, data will be inserted with the specified
\n
"
);
printf
(
"TTL in seconds.
\n
"
);
printf
(
"If the -D option is specified, CollectAgent will run as daemon.
\n
"
);
printf
(
"With the -s option, CollectAgent will print message statistics.
\n\n
"
);
/*
1 2 3 4 5 6 7 8
012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
cout
<<
"Usage:"
<<
endl
;
cout
<<
" collectagent [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-d] [-s]"
<<
endl
;
cout
<<
" collectagent -h"
<<
endl
;
cout
<<
endl
;
cout
<<
"Options:"
<<
endl
;
cout
<<
" -m<host> MQTT listen address [default: "
<<
LISTENHOST
<<
":"
<<
LISTENPORT
<<
"]"
<<
endl
;
cout
<<
" -r<host> REST API listen address [default: "
<<
RESTAPIHOST
<<
":"
<<
RESTAPIPORT
<<
"]"
<<
endl
;
cout
<<
" -c<host> Cassandra host [default: "
<<
CASSANDRAHOST
<<
":"
<<
CASSANDRAPORT
<<
"]"
<<
endl
;
cout
<<
" -u<username> Cassandra username [default: none]"
<<
endl
;
cout
<<
" -p<password> Cassandra password [default: none]"
<<
endl
;
cout
<<
" -t<ttl> Cassandra insert TTL [default: "
<<
TTL
<<
"]"
<<
endl
;
cout
<<
endl
;
cout
<<
" -d Daemonize"
<<
endl
;
cout
<<
" -s Print message statistics"
<<
endl
;
cout
<<
" -h This help page"
<<
endl
;
cout
<<
endl
;
}
int
main
(
int
argc
,
char
*
const
argv
[])
{
...
...
@@ -221,33 +241,49 @@ int main(int argc, char* const argv[]) {
/* Parse command line */
int
ret
;
std
::
string
listenHost
,
cassandraHost
,
restApiHost
,
ttl
;
std
::
string
cassandraUser
,
cassandraPassword
;
std
::
string
listenPort
,
cassandraPort
,
restApiPort
;
listenHost
=
"localhost"
;
cassandraHost
=
"127.0.0.1"
;
restApiHost
=
"0.0.0.0"
;
ttl
=
"0"
;
listenHost
=
LISTENHOST
;
cassandraHost
=
CASSANDRAHOST
;
restApiHost
=
RESTAPIHOST
;
ttl
=
"0"
;
statistics
=
false
;
while
((
ret
=
getopt
(
argc
,
argv
,
"h:l:r:t:Ds?"
))
!=-
1
)
{
while
((
ret
=
getopt
(
argc
,
argv
,
"m:r:c:u:p:t:dDsh"
))
!=-
1
)
{
switch
(
ret
)
{
case
'h'
:
cassandraHost
=
optarg
;
break
;
case
'l'
:
case
'm'
:
listenHost
=
optarg
;
break
;
case
'r'
:
restApiHost
=
optarg
;
break
;
case
'c'
:
cassandraHost
=
optarg
;
break
;
case
'u'
:
cassandraUser
=
optarg
;
break
;
case
'p'
:
{
cassandraPassword
=
optarg
;
size_t
pwdLen
=
strlen
(
optarg
);
memset
(
optarg
,
'x'
,
(
pwdLen
>=
3
)
?
3
:
pwdLen
);
if
(
pwdLen
>
3
)
{
memset
(
optarg
+
3
,
0
,
pwdLen
-
3
);
}
break
;
}
case
't'
:
ttl
=
optarg
;
break
;
case
'd'
:
case
'D'
:
dcdbdaemon
();
break
;
case
's'
:
statistics
=
true
;
break
;
case
'
?
'
:
case
'
h
'
:
default:
usage
();
exit
(
EXIT_FAILURE
);
...
...
@@ -262,21 +298,21 @@ int main(int argc, char* const argv[]) {
listenPort
=
listenHost
.
substr
(
pos
+
1
);
listenHost
.
erase
(
pos
);
}
else
{
listenPort
=
"1883"
;
listenPort
=
LISTENPORT
;
}
pos
=
cassandraHost
.
find
(
":"
);
if
(
pos
!=
string
::
npos
)
{
cassandraPort
=
cassandraHost
.
substr
(
pos
+
1
);
cassandraHost
.
erase
(
pos
);
}
else
{
cassandraPort
=
"9042"
;
cassandraPort
=
CASSANDRAPORT
;
}
pos
=
restApiHost
.
find
(
":"
);
if
(
pos
!=
string
::
npos
)
{
restApiPort
=
restApiHost
.
substr
(
pos
+
1
);
restApiHost
.
erase
(
pos
);
}
else
{
restApiPort
=
"8080"
;
restApiPort
=
RESTAPIPORT
;
}
...
...
@@ -284,7 +320,7 @@ int main(int argc, char* const argv[]) {
* Allocate and initialize connection to Cassandra.
*/
DCDB
::
Connection
*
dcdbConn
;
dcdbConn
=
new
DCDB
::
Connection
(
cassandraHost
,
atoi
(
cassandraPort
.
c_str
()));
dcdbConn
=
new
DCDB
::
Connection
(
cassandraHost
,
atoi
(
cassandraPort
.
c_str
())
,
cassandraUser
,
cassandraPassword
);
if
(
!
dcdbConn
->
connect
())
{
std
::
cout
<<
"Cannot connect to Cassandra!"
<<
std
::
endl
;
...
...
Makefile
View file @
2e60dcb7
...
...
@@ -45,6 +45,9 @@ endif
PUBHEADERS
=
pusherpqueue.h dcdbdaemon.h
FULL_CC
=
$(
shell
which
$(CC)
)
FULL_CXX
=
$(
shell
which
$(CXX)
)
.PHONY
:
info all clean cleanall distclean check-cross-compile deps depsinstall $(SUB_DIRS)
info
:
...
...
@@ -128,7 +131,7 @@ $(DCDBDEPSPATH)/mosquitto-$(MOSQUITTO_VERSION)/.built: $(DCDBDEPSPATH)/mosquitto
@
echo
"Building Mosquitto library..."
;
mkdir
-p
$(DCDBDEPSPATH)
/mosquitto_build
;
cd
$(DCDBDEPSPATH)
/mosquitto_build
&&
\
cmake
$(CMAKE_CROSS_FLAGS)
\
CC
=
$(FULL_CC)
CXX
=
$(FULL_CXX)
cmake
$(CMAKE_CROSS_FLAGS)
\
-DOPENSSL_ROOT_DIR
=
$(DCDBDEPLOYPATH)
/
\
-DWITH_SRV
=
no
\
-DWITH_TLS
=
yes
\
...
...
@@ -147,12 +150,12 @@ $(DCDBDEPSPATH)/boost_$(BOOST_VERSION_U)/.built: $(DCDBDEPSPATH)/boost_$(BOOST_V
@
if
[
"
$(ARCH)
"
=
"arm"
]
;
then
\
echo
" using gcc : arm :
$(CROSS_COMPILE)
g++ ; "
>
$
(
@D
)
/tools/build/src/user-config.jam
;
\
fi
cd
$
(
@D
)
&&
./bootstrap.sh
--prefix
=
$(DCDBDEPLOYPATH)
--with-libraries
=
atomic,chrono,date_time,exception,filesystem,program_options,random,regex,system,thread,timer
&&
\
./b2
cxxflags
=
"
$(CXX11FLAGS)
"
stage
&&
touch
$
(
@
)
cd
$
(
@D
)
&&
./bootstrap.sh
--prefix
=
$(DCDBDEPLOYPATH)
--with-libraries
=
atomic,chrono,date_time,exception,filesystem,
log,
program_options,random,regex,system,thread,timer
&&
\
./b2 stage
&&
touch
$
(
@
)
$(DCDBDEPSPATH)/boost_$(BOOST_VERSION_U)/.installed
:
$(DCDBDEPSPATH)/boost_$(BOOST_VERSION_U)/.built
cd
$
(
@D
)
&&
./b2
-j
$(MAKETHREADS)
cxxflags
=
"
$(CXX11FLAGS)
"
install
&&
touch
$
(
@
)
cd
$
(
@D
)
&&
./b2
-j
$(MAKETHREADS)
install
&&
touch
$
(
@
)
$(DCDBDEPSPATH)/libuv-v$(LIBUV_VERSION)/.built
:
$(DCDBDEPSPATH)/libuv-v$(LIBUV_VERSION)/.patched
@
echo
"Building libuv..."
...
...
@@ -174,8 +177,8 @@ $(DCDBDEPSPATH)/cpp-driver-$(CPPDRV_VERSION)/.built: $(DCDBDEPSPATH)/cpp-driver-
@
echo
"Building cpp-driver..."
mkdir
-p
$(DCDBDEPSPATH)
/cpp-driver_build
cd
$(DCDBDEPSPATH)
/cpp-driver_build
&&
\
cmake
$(CMAKE_CROSS_FLAGS)
\
-DCMAKE_CXX_FLAGS
=
"
$(CXX11FLAGS)
-Wno-unused-command-line-argument
-L
$(DCDBDEPLOYPATH)
/lib "
\
CC
=
$(FULL_CC)
CXX
=
$(FULL_CXX)
cmake
$(CMAKE_CROSS_FLAGS)
\
-DCMAKE_CXX_FLAGS
=
"-L
$(DCDBDEPLOYPATH)
/lib "
\
-DOPENSSL_ROOT_DIR
=
$(DCDBDEPSPATH)
/openssl-
$(OPENSSL_VERSION)
\
-DLIBUV_ROOT_DIR
=
$(DCDBDEPSPATH)
/libuv-v
$(LIBUV_VERSION)
\
-DCASS_BUILD_EXAMPLES
=
NO
\
...
...
@@ -194,8 +197,8 @@ $(DCDBDEPSPATH)/cpp-netlib-$(CPPNET_VERSION)/.built: $(DCDBDEPSPATH)/cpp-netlib-
@
echo
"Building cpp-netlib..."
mkdir
-p
$(DCDBDEPSPATH)
/cpp-netlib_build
cd
$(DCDBDEPSPATH)
/cpp-netlib_build
&&
\
cmake
$(CMAKE_CROSS_FLAGS)
\
-DCMAKE_CXX_FLAGS
=
"
$(CXX11FLAGS)
-Wno-unused-command-line-argument
-L
$(DCDBDEPLOYPATH)
/lib "
\
CC
=
$(FULL_CC)
CXX
=
$(FULL_CXX)
cmake
$(CMAKE_CROSS_FLAGS)
\
-DCMAKE_CXX_FLAGS
=
"-L
$(DCDBDEPLOYPATH)
/lib "
\
-DCPP-NETLIB_ENABLE_HTTPS
=
off
\
-DCPP-NETLIB_BUILD_TESTS
=
OFF
\
-DCPP-NETLIB_BUILD_EXAMPLES
=
OFF
\
...
...
common.mk
View file @
2e60dcb7
...
...
@@ -29,8 +29,7 @@ CMAKE_CROSS_FLAGS = ""
AUTOCONF_CROSS_FLAGS
=
""
endif
CFLAGS
+=
-I
$(DCDBDEPLOYPATH)
/include
-O2
-g
CXX11FLAGS
=
--std
=
c++11
-Wno-c99-extensions
-Wno-missing-field-initializers
-DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
-Wno-unused-local-typedef
CFLAGS
+=
-I
$(DCDBDEPLOYPATH)
/include
-O2
-g
-DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
LDFLAGS
+=
-L
$(DCDBDEPLOYPATH)
/lib
-O2
-g
.PHONY
:
all info
...
...
lib/include/dcdb/connection.h
View file @
2e60dcb7
...
...
@@ -90,6 +90,30 @@ public:
*/
uint16_t
getPort
();
/**
* @brief Set the username for the connection.
* @param username Username for connecting to the Cassandra front end node.
*/
void
setUsername
(
std
::
string
username
);
/**
* @brief Return the current username of the connection.
* @return The username for connecting to the Cassandra front end node.
*/
std
::
string
getUsername
();
/**
* @brief Set the password for the connection.
* @param password The password for connecting to the Cassandra front end node.
*/
void
setPassword
(
std
::
string
password
);
/**
* @brief Return the current password of the connection.
* @return The password for connecting to the Cassandra front end node.
*/
std
::
string
getPassword
();
/**
* @brief Establish a connection to the Cassandra database.
* @return True if the connection was successfully established, false otherwise.
...
...
@@ -128,10 +152,15 @@ public:
Connection
();
/**
* @brief Construct a Connection to the specific host and port
.
* @brief Construct a Connection to the specific host and port
without authentication
*/
Connection
(
std
::
string
hostname
,
uint16_t
port
);
/**
* @brief Construct a Connection to the specific host and port and authenticate with given username and password.
*/
Connection
(
std
::
string
hostname
,
uint16_t
port
,
std
::
string
username
,
std
::
string
password
);
/**
* @brief Standard destructor for Connections.
*/
...
...
lib/include/dcdb/sensor.h
View file @
2e60dcb7
...
...
@@ -20,7 +20,7 @@ namespace DCDB {
public:
Sensor
(
DCDB
::
Connection
*
connection
,
std
::
string
publicName
);
virtual
~
Sensor
();
void
query
(
std
::
list
<
SensorDataStoreReading
>&
reading
,
TimeStamp
&
start
,
TimeStamp
&
end
);
void
query
(
std
::
list
<
SensorDataStoreReading
>&
reading
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
=
AGGREGATE_NONE
);
double
getScalingFactor
()
const
{
return
scalingFactor
;
...
...
lib/include/dcdb/sensordatastore.h
View file @
2e60dcb7
...
...
@@ -50,6 +50,15 @@ typedef enum {
SDS_OK
,
SDS_EMPTYSET
}
SDSQueryResult
;
typedef
enum
{
AGGREGATE_NONE
=
0
,
AGGREGATE_MIN
,
AGGREGATE_MAX
,
AGGREGATE_AVG
,
AGGREGATE_SUM
,
AGGREGATE_COUNT
}
QueryAggregate
;
/* Forward-declaration of the implementation-internal classes */
class
SensorDataStoreImpl
;
...
...
@@ -107,7 +116,7 @@ public:
* @param start Start of the time series.
* @param end End of the time series.
*/
void
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
);
void
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
=
AGGREGATE_NONE
);
typedef
void
(
*
QueryCbFunc
)(
SensorDataStoreReading
&
reading
,
void
*
userData
);
/**
...
...
@@ -119,7 +128,7 @@ public:
* @param start Start of the time series.
* @param end End of the time series.
*/
void
queryCB
(
QueryCbFunc
cbFunc
,
void
*
userData
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
);
void
queryCB
(
QueryCbFunc
cbFunc
,
void
*
userData
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
=
AGGREGATE_NONE
);
/**
* @brief This function queries the integrated value (val * sec)
...
...
lib/include_internal/connection_internal.h
View file @
2e60dcb7
...
...
@@ -46,6 +46,8 @@ class ConnectionImpl
protected:
std
::
string
hostname_
;
/**< The hostname of a DB front-end node. */
uint16_t
port_
;
/**< The port of the DB front-end node. */
std
::
string
username_
;
/**< The username for connecting to the DB front-end. */
std
::
string
password_
;
/**< The password for connecting to the DB front-end. */
bool
connected
;
/**< Indicates whether a connection has been established. */
CassCluster
*
cluster
;
/**< The Cassandra Cluster object (contains hostname, port, etc) */
...
...
@@ -137,6 +139,26 @@ public:
*/
uint16_t
getPort
();
/**
* @brief The implementation function of Connection::setHostname().
*/
void
setUsername
(
std
::
string
username
);
/**
* @brief The implementation function of Connection::getHostname().
*/
std
::
string
getUsername
();
/**
* @brief The implementation function of Connection::setHostname().
*/
void
setPassword
(
std
::
string
password
);
/**
* @brief The implementation function of Connection::getHostname().
*/
std
::
string
getPassword
();
/**
* @brief The implementation function of Connection::connect().
*/
...
...
lib/include_internal/sensordatastore_internal.h
View file @
2e60dcb7
...
...
@@ -40,7 +40,8 @@
#include
"dcdb/connection.h"
namespace
DCDB
{
static
std
::
string
const
AggregateString
[]
=
{
""
,
"min"
,
"max"
,
"avg"
,
"sum"
,
"count"
};
/**
* @brief The SensorDataStoreImpl class contains all protected
* functions belonging to SensorDataStore which are
...
...
@@ -82,7 +83,7 @@ public:
* @param start Start of the time series.
* @param end End of the time series.
*/
void
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
);
void
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
);
/**
* @brief This function queries a sensor's values in
...
...
@@ -93,7 +94,7 @@ public:
* @param start Start of the time series.
* @param end End of the time series.
*/
void
queryCB
(
SensorDataStore
::
QueryCbFunc
cbFunc
,
void
*
userData
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
);
void
queryCB
(
SensorDataStore
::
QueryCbFunc
cbFunc
,
void
*
userData
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
);
/**
* @brief This function queries the integrated value
...
...
lib/src/connection.cpp
View file @
2e60dcb7
...
...
@@ -62,6 +62,22 @@ uint16_t Connection::getPort() {
return
impl
->
getPort
();
}
void
Connection
::
setUsername
(
std
::
string
username
)
{
impl
->
setUsername
(
username
);
}
std
::
string
Connection
::
getUsername
()
{
return
impl
->
getUsername
();
}
void
Connection
::
setPassword
(
std
::
string
password
)
{
impl
->
setHostname
(
password
);
}
std
::
string
Connection
::
getPassword
()
{
return
impl
->
getPassword
();
}
bool
Connection
::
connect
()
{
return
impl
->
connect
();
}
...
...
@@ -93,6 +109,14 @@ Connection::Connection(std::string hostname, uint16_t port) {
impl
->
setPort
(
port
);
}
Connection
::
Connection
(
std
::
string
hostname
,
uint16_t
port
,
std
::
string
username
,
std
::
string
password
)
{
impl
=
new
ConnectionImpl
();
impl
->
setHostname
(
hostname
);
impl
->
setPort
(
port
);
impl
->
setUsername
(
username
);
impl
->
setPassword
(
password
);
}
Connection
::~
Connection
()
{
delete
impl
;
}
...
...
@@ -277,6 +301,24 @@ uint16_t ConnectionImpl::getPort() {
return
port_
;
}
void
ConnectionImpl
::
setUsername
(
std
::
string
username
)
{
if
(
!
connected
)
username_
=
username
;
}
std
::
string
ConnectionImpl
::
getUsername
()
{
return
username_
;
}
void
ConnectionImpl
::
setPassword
(
std
::
string
password
)
{
if
(
!
connected
)
password_
=
password
;
}
std
::
string
ConnectionImpl
::
getPassword
()
{
return
password_
;
}
/**
* @details
* This function connects to the selected Cassandra
...
...
@@ -289,6 +331,9 @@ bool ConnectionImpl::connect() {
/* Set hostname and port */
cass_cluster_set_contact_points
(
cluster
,
hostname_
.
c_str
());
cass_cluster_set_port
(
cluster
,
port_
);
if
(
username_
.
size
()
&&
password_
.
size
())
{
cass_cluster_set_credentials
(
cluster
,
username_
.
c_str
(),
password_
.
c_str
());
}
/* Force protcol version to 1 */
cass_cluster_set_protocol_version
(
cluster
,
1
);
...
...
lib/src/sensor.cpp
View file @
2e60dcb7
...
...
@@ -38,7 +38,7 @@ namespace DCDB {
delete
sensorConfig
;
}
void
Sensor
::
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
TimeStamp
&
start
,
TimeStamp
&
end
)
{
void
Sensor
::
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
)
{
SensorDataStore
sensorDataStore
(
connection
);
if
(
publicSensor
.
is_virtual
)
{
...
...
@@ -61,7 +61,7 @@ namespace DCDB {
/* Iterate over the expanded list of sensorIds and output the results in CSV format */
for
(
std
::
list
<
SensorId
>::
iterator
sit
=
sensorIds
.
begin
();
sit
!=
sensorIds
.
end
();
sit
++
)
{
sensorDataStore
.
query
(
result
,
*
sit
,
start
,
end
);
sensorDataStore
.
query
(
result
,
*
sit
,
start
,
end
,
aggregate
);
}
if
(
scalingFactor
!=
1.0
||
publicSensor
.
scaling_factor
!=
1.0
)
{
...
...
lib/src/sensordatastore.cpp
View file @
2e60dcb7
...
...
@@ -145,7 +145,7 @@ void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, int64_t value)
cass_statement_bind_bytes_by_name
(
statement
,
"sid"
,
(
cass_byte_t
*
)(
key
.
c_str
()),
16
);
cass_statement_bind_int64_by_name
(
statement
,
"ts"
,
ts
);
cass_statement_bind_int64_by_name
(
statement
,
"value"
,
value
);
future
=
cass_session_execute
(
session
,
statement
);
cass_future_wait
(
future
);
...
...
@@ -174,15 +174,21 @@ void SensorDataStoreImpl::setTTL(uint64_t ttl)
* and creates a SensorDataStoreReading object for each
* entry which is stored in the result list.
*/
void
SensorDataStoreImpl
::
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
)
void
SensorDataStoreImpl
::
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
)
{
CassError
rc
=
CASS_OK
;
CassStatement
*
statement
=
NULL
;
CassFuture
*
future
=
NULL
;
const
CassPrepared
*
prepared
=
nullptr
;
const
char
*
query
=
"SELECT * FROM "
KEYSPACE_NAME
"."
CF_SENSORDATA
" WHERE sid = ? AND ts >= ? AND ts <= ? ;"
;
future
=
cass_session_prepare
(
session
,
query
);
std
::
string
query
=
std
::
string
(
"SELECT ts,"
);
if
(
aggregate
==
AGGREGATE_NONE
)
{
query
.
append
(
"value"
);
}
else
{
query
.
append
(
AggregateString
[
aggregate
]
+
std
::
string
(
"(value) as value"
));
}
query
.
append
(
" FROM "
KEYSPACE_NAME
"."
CF_SENSORDATA
" WHERE sid = ? AND ts >= ? AND ts <= ? ;"
);
future
=
cass_session_prepare
(
session
,
query
.
c_str
());
cass_future_wait
(
future
);
rc
=
cass_future_error_code
(
future
);
...
...
@@ -198,7 +204,7 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
const
std
::
string
key
=
sid
.
serialize
();
#if 0
std::cout << "Query: " << query << std::endl << "sid: " <<
key
<< " ts1: " << start.getRaw() << " ts2: " << end.getRaw() << std::endl;
std::cout << "Query: " << query << std::endl << "sid: " <<
sid.toString()
<< " ts1: " << start.getRaw() << " ts2: " << end.getRaw() << std::endl;
#endif
statement
=
cass_prepared_bind
(
prepared
);
...
...
@@ -253,15 +259,21 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
* This function issues a regular query to the data store
* and calls cbFunc for every reading.
*/
void
SensorDataStoreImpl
::
queryCB
(
SensorDataStore
::
QueryCbFunc
cbFunc
,
void
*
userData
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
)
void
SensorDataStoreImpl
::
queryCB
(
SensorDataStore
::
QueryCbFunc
cbFunc
,
void
*
userData
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
)
{
CassError
rc
=
CASS_OK
;
CassStatement
*
statement
=
NULL
;
CassFuture
*
future
=
NULL
;
const
CassPrepared
*
prepared
=
nullptr
;
const
char
*
query
=
"SELECT * FROM "
KEYSPACE_NAME
"."
CF_SENSORDATA
" WHERE sid = ? AND ts >= ? AND ts <= ? ;"
;
future
=
cass_session_prepare
(
session
,
query
);
std
::
string
query
=
std
::
string
(
"SELECT ts,"
);
if
(
aggregate
==
AGGREGATE_NONE
)
{
query
.
append
(
"value"
);
}
else
{
query
.
append
(
AggregateString
[
aggregate
]
+
std
::
string
(
"(value) as value"
));
}
query
.
append
(
" FROM "
KEYSPACE_NAME
"."
CF_SENSORDATA
" WHERE sid = ? AND ts >= ? AND ts <= ? ;"
);
future
=
cass_session_prepare
(
session
,
query
.
c_str
());
cass_future_wait
(
future
);
rc
=
cass_future_error_code
(
future
);
...
...
@@ -323,7 +335,7 @@ SDSQueryResult SensorDataStoreImpl::querySum(int64_t& result, SensorId& sid, Tim
std
::
list
<
SensorDataStoreReading
>
queryResult
;
/* Issue a standard query */
query
(
queryResult
,
sid
,
start
,
end
);
query
(
queryResult
,
sid
,
start
,
end
,
AGGREGATE_NONE
);
/* Check if at least 2 readings in result */
if
(
queryResult
.
size
()
<
2
)
...
...
@@ -514,9 +526,9 @@ void SensorDataStore::setTTL(uint64_t ttl)
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
void
SensorDataStore
::
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
)
void
SensorDataStore
::
query
(
std
::
list
<
SensorDataStoreReading
>&
result
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
)
{
impl
->
query
(
result
,
sid
,
start
,
end
);
impl
->
query
(
result
,
sid
,
start
,
end
,
aggregate
);
}
/**
...
...
@@ -525,9 +537,9 @@ void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId&
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
void
SensorDataStore
::
queryCB
(
SensorDataStore
::
QueryCbFunc
cbFunc
,
void
*
userData
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
)
void
SensorDataStore
::
queryCB
(
SensorDataStore
::
QueryCbFunc
cbFunc
,
void
*
userData
,
SensorId
&
sid
,
TimeStamp
&
start
,
TimeStamp
&
end
,
QueryAggregate
aggregate
)
{
return
impl
->
queryCB
(
cbFunc
,
userData
,
sid
,
start
,
end
);
return
impl
->
queryCB
(
cbFunc
,
userData
,
sid
,
start
,
end
,
aggregate
);
}
/**
...
...
scripts/dcdb_init
View file @
2e60dcb7
...
...
@@ -90,7 +90,7 @@ case $1 in
#env LD_LIBRARY_PATH=$LD_LIBRARY_PATH collectagent -D -l 0.0.0.0 -h 127.0.0.1 -t 8640000
# Start CollectAgent (no TTL)
env
LD_LIBRARY_PATH
=
$LD_LIBRARY_PATH
collectagent
-D
-
l
$MQTT_HOST
:
$MQTT_PORT
-
h
$CASSANDRA_HOST
:
$CASSANDRA_PORT
-r
$HTTP_HOST
:
$HTTP_PORT
env
LD_LIBRARY_PATH
=
$LD_LIBRARY_PATH
collectagent
d
-
m
$MQTT_HOST
:
$MQTT_PORT
-
c
$CASSANDRA_HOST
:
$CASSANDRA_PORT
-r
$HTTP_HOST
:
$HTTP_PORT
# Wait till CollectAgent is up
wait_for_listen_port
$MQTT_PORT
...
...
tools/dcdbcsvimport/dcdbcsvimport.cpp
View file @
2e60dcb7
...
...
@@ -156,19 +156,19 @@ int main(int argc, char** argv)
int
col
=
0
;
for
(
boost
::
tokenizer
<
boost
::
escaped_list_separator
<
char
>
>::
iterator
i
=
tk
.
begin
();
i
!=
tk
.
end
();
++
i
)
{
if
(
i
==
tk
.
begin
()
)
{
continue
;
}
sensor_t
sensor
;
s
ensor
.
name
=
*
i
;
s
td
::
stringstream
ss
;
s
s
<<
std
::
setfill
(
'0'
)
<<
std
::
setw
(
suffixLen
)
<<
std
::
hex
<<
topics
;
;
sensor
.
topic
=
prefix
+
ss
.
str
(
);
sensor
.
publicName
=
prefix
+
"."
+
sensor
.
name
;
std
::
replace
(
sensor
.
publicName
.
begin
(),
sensor
.
publicName
.
end
(),
' '
,
'_'
)
;
sensors
.
insert
(
std
::
pair
<
int
,
sensor_t
>
(
col
,
sensor
));
topics
++
;
if
(
col
!=
tsColumn
)
{
sensor_t
sensor
;
sensor
.
name
=
*
i
;
s
td
::
stringstream
ss
;
ss
<<
std
::
setfill
(
'0'
)
<<
std
::
setw
(
suffixLen
)
<<
std
::
hex
<<
topics
;;
s
ensor
.
topic
=
prefix
+
ss
.
str
()
;
s
ensor
.
publicName
=
prefix
+
"."
+
sensor
.
name
;
std
::
replace
(
sensor
.
publicName
.
begin
(),
sensor
.
publicName
.
end
(),
' '
,
'_'
);
sensor
s
.
insert
(
std
::
pair
<
int
,
sensor_t
>
(
col
,
sensor
))
;
topics
++
;
}
col
++
;
}
/* Read actual sensor readings */
...
...
@@ -188,10 +188,10 @@ int main(int argc, char** argv)
col
=
0
;
for
(
boost
::
tokenizer
<
boost
::
escaped_list_separator
<
char
>
>::
iterator
i
=
tk
.
begin
();
i
!=
tk
.
end
();
++
i
)