Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
dcdb
dcdb
Commits
f8e02d05
Commit
f8e02d05
authored
Mar 31, 2013
by
Axel Auweter
Browse files
C++-ified my code a bit. Now using cout instead of printf... :(
parent
b1f4199a
Changes
1
Hide whitespace changes
Inline
Side-by-side
CollectAgent/CollectAgent.cpp
View file @
f8e02d05
...
...
@@ -6,7 +6,7 @@
// Description : As of now this is some rudimentary bad code to talk to the DB
//============================================================================
#include
<cstdio>
//
#include <cstdio>
#include
<cstdlib>
#include
<sstream>
...
...
@@ -28,6 +28,7 @@
#include
"mosquitto.h"
using
namespace
std
;
using
namespace
apache
::
thrift
;
using
namespace
apache
::
thrift
::
transport
;
using
namespace
apache
::
thrift
::
protocol
;
...
...
@@ -60,11 +61,11 @@ void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_mes
myClient
->
execute_cql3_query
(
res
,
query
,
Compression
::
NONE
,
ConsistencyLevel
::
ONE
);
}
catch
(
TTransportException
te
){
printf
(
"TP Exception:
%s [%d]
\n
"
,
te
.
what
(),
te
.
getType
()
)
;
cout
<<
"TP Exception:
"
<<
te
.
what
()
<<
"["
<<
te
.
getType
()
<<
"]
\n
"
;
}
catch
(
InvalidRequestException
ire
){
printf
(
"IRE Exception:
%s [%s]
\n
"
,
ire
.
what
(),
ire
.
why
.
c_str
())
;
cout
<<
"IRE Exception:
"
<<
ire
.
what
()
<<
"["
<<
ire
.
why
<<
"]
\n
"
;
}
catch
(
NotFoundException
nfe
){
printf
(
"NF Exception:
%s
\n
"
,
nfe
.
what
()
)
;
cout
<<
"NF Exception:
"
<<
nfe
.
what
()
<<
"
\n
"
;
}
}
...
...
@@ -86,15 +87,15 @@ int main(void) {
start:
myClient
->
describe_cluster_name
(
clusterName
);
printf
(
"Cluster name:
%s
\n
"
,
clusterName
.
c_str
())
;
cout
<<
"Cluster name:
"
<<
clusterName
<<
"
\n
"
;
int
dcdbKeyspace
=
-
1
;
printf
(
"Keyspaces:
\n
"
)
;
cout
<<
"Keyspaces:
\n
"
;
std
::
vector
<
KsDef
>
keySpaces
;
myClient
->
describe_keyspaces
(
keySpaces
);
for
(
unsigned
int
i
=
0
;
i
<
keySpaces
.
size
();
i
++
)
{
printf
(
" [%d]: %s
\n
"
,
i
,
keySpaces
[
i
].
name
.
c_str
())
;
cout
<<
" ["
<<
i
<<
"]: "
<<
keySpaces
[
i
].
name
<<
"
\n
"
;
if
(
keySpaces
[
i
].
name
==
"dcdb"
)
{
dcdbKeyspace
=
i
;
}
...
...
@@ -104,49 +105,49 @@ int main(void) {
std
::
string
query
;
if
(
dcdbKeyspace
<
0
)
{
printf
(
"Creating dcdb keyspace...
\n
"
)
;
cout
<<
"Creating dcdb keyspace...
\n
"
;
query
=
"CREATE KEYSPACE dcdb WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};"
;
printf
(
"Sending CQL statement:
%s"
,
query
.
c_str
()
)
;
cout
<<
"Sending CQL statement:
"
<<
query
.
c_str
();
myClient
->
execute_cql3_query
(
res
,
query
,
Compression
::
NONE
,
ConsistencyLevel
::
ONE
);
printf
(
" --> Success!
\n
"
)
;
printf
(
"Starting over...
\n\n
"
)
;
cout
<<
" --> Success!
\n
"
;
cout
<<
"Starting over...
\n\n
"
;
goto
start
;
}
else
{
printf
(
"Using existing keyspace dcdb...
\n
"
)
;
cout
<<
"Using existing keyspace dcdb...
\n
"
;
}
query
=
"USE dcdb;"
;
printf
(
"Sending CQL statement:
%s"
,
query
.
c_str
())
;
cout
<<
"Sending CQL statement:
"
<<
query
;
myClient
->
execute_cql3_query
(
res
,
query
,
Compression
::
NONE
,
ConsistencyLevel
::
ONE
);
printf
(
" --> Success!
\n
"
)
;
cout
<<
" --> Success!
\n
"
;
int
sensordataCf
=
-
1
;
printf
(
"Column families in dcdb:
\n
"
)
;
cout
<<
"Column families in dcdb:
\n
"
;
for
(
unsigned
int
i
=
0
;
i
<
keySpaces
[
dcdbKeyspace
].
cf_defs
.
size
();
i
++
)
{
printf
(
" [%d]: %s
\n
"
,
i
,
keySpaces
[
dcdbKeyspace
].
cf_defs
[
i
].
name
.
c_str
())
;
cout
<<
" ["
<<
i
<<
"]: "
<<
keySpaces
[
dcdbKeyspace
].
cf_defs
[
i
].
name
<<
"
\n
"
;
if
(
keySpaces
[
dcdbKeyspace
].
cf_defs
[
i
].
name
==
"sensordata"
)
{
sensordataCf
=
i
;
}
}
if
(
sensordataCf
<
0
)
{
printf
(
"Creating sensordata column familiy...
\n
"
)
;
cout
<<
"Creating sensordata column familiy...
\n
"
;
query
=
"CREATE TABLE sensordata ( sid int, ts timestamp, value float, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;"
;
printf
(
"Sending CQL statement:
%s"
,
query
.
c_str
())
;
cout
<<
"Sending CQL statement:
"
<<
query
;
myClient
->
execute_cql3_query
(
res
,
query
,
Compression
::
NONE
,
ConsistencyLevel
::
ONE
);
printf
(
" --> Success!
\n
"
)
;
printf
(
"Starting over...
\n\n
"
)
;
cout
<<
" --> Success!
\n
"
;
cout
<<
"Starting over...
\n\n
"
;
goto
start
;
}
else
{
printf
(
"Using existing sensordata column familiy.
\n
"
)
;
cout
<<
"Using existing sensordata column familiy.
\n
"
;
}
/* Should have the keyspace and the column familiy in the system now, subscribe to local mqtt broker */
int
mosqMajor
,
mosqMinor
,
mosqRevision
;
mosquitto_lib_version
(
&
mosqMajor
,
&
mosqMinor
,
&
mosqRevision
);
printf
(
"Initializing Mosquitto Library Version
%d.%d.%d
\n
"
,
mosqMajor
,
mosqMinor
,
mosqRevision
)
;
cout
<<
"Initializing Mosquitto Library Version
"
<<
mosqMajor
<<
"."
<<
mosqMinor
<<
"."
<<
mosqRevision
<<
"
\n
"
;
mosquitto_lib_init
();
/* Init mosquitto struct */
...
...
@@ -158,40 +159,40 @@ int main(void) {
}
/* Connect to the broker */
printf
(
"Connecting to broker..."
)
;
f
flush
(
stdout
);
cout
<<
"Connecting to broker..."
;
cout
.
flush
();
if
(
mosquitto_connect
(
mosq
,
"localhost"
,
1883
,
1000
)
!=
MOSQ_ERR_SUCCESS
)
{
perror
(
"
\n
Could not connect to host"
);
exit
(
EXIT_FAILURE
);
}
printf
(
" Done.
\n
"
)
;
cout
<<
" Done.
\n
"
;
/* Catch SIGINT signals */
signal
(
SIGINT
,
sigHandler
);
/* Subscribe to anything */
printf
(
"Subscribing to anything on the broker (Pattern #)..."
)
;
f
flush
(
stdout
);
cout
<<
"Subscribing to anything on the broker (Pattern #)..."
;
cout
.
flush
();
if
(
mosquitto_subscribe
(
mosq
,
NULL
,
"#"
,
0
)
!=
MOSQ_ERR_SUCCESS
)
{
perror
(
"
\n
Could not subscribe"
);
exit
(
EXIT_FAILURE
);
}
printf
(
" Done.
\n
"
)
;
cout
<<
" Done.
\n
"
;
/* Set the callback for mosquitto */
printf
(
"Configuring message callback..."
)
;
f
flush
(
stdout
);
cout
<<
"Configuring message callback..."
;
cout
.
flush
();
mosquitto_message_callback_set
(
mosq
,
mqttOnMessage
);
printf
(
" Done.
\n
"
)
;
cout
<<
" Done.
\n
"
;
/* Here comes the main loop */
printf
(
"Starting mqtt loop thread..."
)
;
f
flush
(
stdout
);
cout
<<
"Starting mqtt loop thread..."
;
cout
.
flush
();
if
(
mosquitto_loop_start
(
mosq
)
!=
MOSQ_ERR_SUCCESS
)
{
perror
(
"
\n
Could not start mosquitto thread"
);
exit
(
EXIT_FAILURE
);
}
printf
(
" Done.
\n
"
)
;
cout
<<
" Done.
\n
"
;
keepRunning
=
1
;
timeval
start
,
end
;
...
...
@@ -204,11 +205,11 @@ int main(void) {
gettimeofday
(
&
end
,
NULL
);
elapsed
=
(
end
.
tv_sec
-
start
.
tv_sec
)
*
1000.0
;
elapsed
+=
(
end
.
tv_usec
-
start
.
tv_usec
)
/
1000.0
;
printf
(
"Message rate:
%f messages/second
\n
"
,
(
msgCtr
/
elapsed
)
*
1000.0
)
;
cout
<<
"Message rate:
"
<<
(
msgCtr
/
elapsed
)
*
1000.0
<<
" messages/second
\n
"
;
msgCtr
=
0
;
}
printf
(
"Cleaning up..."
)
;
cout
<<
"Cleaning up..."
;
/* Stop the mosquitto loop */
mosquitto_loop_stop
(
mosq
,
true
);
...
...
@@ -217,7 +218,7 @@ int main(void) {
/* Disconnect from Cassandra */
tr
->
close
();
printf
(
" Done.
\n
"
)
;
cout
<<
" Done.
\n
"
;
#if 0
int resB = 0;
...
...
@@ -247,11 +248,13 @@ int main(void) {
#endif
}
catch
(
TTransportException
te
){
printf
(
"TP Exception: %s [%d]
\n
"
,
te
.
what
(),
te
.
getType
());
}
catch
(
InvalidRequestException
ire
){
printf
(
"IRE Exception: %s [%s]
\n
"
,
ire
.
what
(),
ire
.
why
.
c_str
());
}
catch
(
NotFoundException
nfe
){
printf
(
"NF Exception: %s
\n
"
,
nfe
.
what
());
cout
<<
"TP Exception: "
<<
te
.
what
()
<<
"["
<<
te
.
getType
()
<<
"]
\n
"
;
}
catch
(
InvalidRequestException
ire
){
cout
<<
"IRE Exception: "
<<
ire
.
what
()
<<
"["
<<
ire
.
why
<<
"]
\n
"
;
}
catch
(
NotFoundException
nfe
){
cout
<<
"NF Exception: "
<<
nfe
.
what
()
<<
"
\n
"
;
}
return
EXIT_SUCCESS
;
}
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment