Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
9.2.2023: Due to updates GitLab will be unavailable for some minutes between 9:00 and 11:00.
Open sidebar
dcdb
dcdb
Commits
1a57111c
Commit
1a57111c
authored
Sep 29, 2016
by
Michael Ott
Browse files
Allow specification of host interfaces and ports for all servers (MQTT, HTTP, Cassandra)
parent
0c265b79
Changes
1
Hide whitespace changes
Inline
Side-by-side
CollectAgent/collectagent.cpp
View file @
1a57111c
...
...
@@ -54,7 +54,6 @@ bool statistics;
uint64_t
msgCtr
;
uint64_t
pmsgCtr
;
DCDB
::
SensorDataStore
*
mySensorDataStore
;
std
::
string
listenHost
,
cassandraHost
,
ttl
;
DCDB
::
SensorCache
mySensorCache
;
/* Normal termination (SIGINT, CTRL+C) */
...
...
@@ -69,55 +68,44 @@ void abrtHandler(int sig)
abrt
(
EXIT_FAILURE
,
SIGNAL
);
}
namespace
http
=
boost
::
network
::
http
;
struct
httpHandler_t
;
typedef
boost
::
network
::
http
::
server
<
httpHandler_t
>
httpServer_t
;
struct
httpHandler_t
{
void
operator
()(
httpServer_t
::
request
const
&
request
,
httpServer_t
::
connection_ptr
connection
)
{
httpServer_t
::
string_type
ip
=
source
(
request
);
/*<< Defines the server. >>*/
struct
http_handler
;
typedef
http
::
server
<
http_handler
>
server
;
/*<< Defines the request handler. It's a class that defines two
functions, `operator()` and `log()` >>*/
struct
http_handler
{
/*<< This is the function that handles the incoming request. >>*/
void
operator
()(
server
::
request
const
&
request
,
server
::
connection_ptr
connection
)
{
server
::
string_type
ip
=
source
(
request
);
DCDB
::
SensorId
sid
;
sid
.
mqttTopicConvert
(
request
.
destination
);
std
::
ostringstream
data
;
static
s
erver
::
response_header
headers
[]
=
{
{
"Connection"
,
"close"
},
{
"Content-Type"
,
"text/plain"
}
};
static
httpS
erver
_t
::
response_header
headers
[]
=
{
{
"Connection"
,
"close"
},
{
"Content-Type"
,
"text/plain"
}
};
//try getting the latest value
try
{
uint64_t
val
=
mySensorCache
.
getSensor
(
sid
);
data
<<
val
<<
"
\n
"
;
//data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
connection
->
set_status
(
server
::
connection
::
ok
);
connection
->
set_headers
(
boost
::
make_iterator_range
(
headers
,
headers
+
2
));
connection
->
write
(
data
.
str
());
}
catch
(
const
std
::
invalid_argument
&
e
)
{
connection
->
set_status
(
server
::
connection
::
not_found
);
connection
->
set_headers
(
boost
::
make_iterator_range
(
headers
,
headers
+
2
));
connection
->
write
(
"Error: Sensor id not found.
\n
"
);
}
catch
(
const
std
::
out_of_range
&
e
)
{
connection
->
set_status
(
server
::
connection
::
no_content
);
connection
->
set_headers
(
boost
::
make_iterator_range
(
headers
,
headers
+
2
));
connection
->
write
(
"Error: Sensor unavailable.
\n
"
);
}
catch
(
const
std
::
exception
&
e
)
{
connection
->
set_status
(
server
::
connection
::
internal_server_error
);
connection
->
set_headers
(
boost
::
make_iterator_range
(
headers
,
headers
+
2
));
connection
->
write
(
"Server error.
\n
"
);
}
uint64_t
val
=
mySensorCache
.
getSensor
(
sid
);
data
<<
val
<<
"
\n
"
;
//data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
connection
->
set_status
(
httpServer_t
::
connection
::
ok
);
connection
->
set_headers
(
boost
::
make_iterator_range
(
headers
,
headers
+
2
));
connection
->
write
(
data
.
str
());
}
catch
(
const
std
::
invalid_argument
&
e
)
{
connection
->
set_status
(
httpServer_t
::
connection
::
not_found
);
connection
->
set_headers
(
boost
::
make_iterator_range
(
headers
,
headers
+
2
));
connection
->
write
(
"Error: Sensor id not found.
\n
"
);
}
catch
(
const
std
::
out_of_range
&
e
)
{
connection
->
set_status
(
httpServer_t
::
connection
::
no_content
);
connection
->
set_headers
(
boost
::
make_iterator_range
(
headers
,
headers
+
2
));
connection
->
write
(
"Error: Sensor unavailable.
\n
"
);
}
catch
(
const
std
::
exception
&
e
)
{
connection
->
set_status
(
httpServer_t
::
connection
::
internal_server_error
);
connection
->
set_headers
(
boost
::
make_iterator_range
(
headers
,
headers
+
2
));
connection
->
write
(
"Server error.
\n
"
);
}
}
};
...
...
@@ -235,11 +223,14 @@ int main(int argc, char* const argv[]) {
/* Parse command line */
int
ret
;
std
::
string
listenHost
,
cassandraHost
,
restApiHost
,
ttl
;
std
::
string
listenPort
,
cassandraPort
,
restApiPort
;
listenHost
=
"localhost"
;
cassandraHost
=
"127.0.0.1"
;
restApiHost
=
"0:0:0:0"
;
ttl
=
"0"
;
statistics
=
false
;
while
((
ret
=
getopt
(
argc
,
argv
,
"h:l:t:Ds?"
))
!=-
1
)
{
while
((
ret
=
getopt
(
argc
,
argv
,
"h:l:
r:
t:Ds?"
))
!=-
1
)
{
switch
(
ret
)
{
case
'h'
:
cassandraHost
=
optarg
;
...
...
@@ -247,6 +238,9 @@ int main(int argc, char* const argv[]) {
case
'l'
:
listenHost
=
optarg
;
break
;
case
'r'
:
restApiHost
=
optarg
;
break
;
case
't'
:
ttl
=
optarg
;
break
;
...
...
@@ -263,12 +257,37 @@ int main(int argc, char* const argv[]) {
}
}
/*
* Parse hostnames for port specifications
*/
size_t
pos
=
listenHost
.
find
(
":"
);
if
(
pos
!=
string
::
npos
)
{
listenPort
=
listenHost
.
substr
(
pos
+
1
);
listenHost
.
erase
(
pos
);
}
else
{
listenPort
=
"1883"
;
}
pos
=
cassandraHost
.
find
(
":"
);
if
(
pos
!=
string
::
npos
)
{
cassandraPort
=
cassandraHost
.
substr
(
pos
+
1
);
cassandraHost
.
erase
(
pos
);
}
else
{
cassandraPort
=
"9042"
;
}
pos
=
restApiHost
.
find
(
":"
);
if
(
pos
!=
string
::
npos
)
{
restApiPort
=
restApiHost
.
substr
(
pos
+
1
);
restApiHost
.
erase
(
pos
);
}
else
{
restApiPort
=
"8080"
;
}
/*
* Allocate and initialize connection to Cassandra.
*/
std
::
string
sdHost
=
cassandraHost
;
DCDB
::
Connection
*
dcdbConn
;
dcdbConn
=
new
DCDB
::
Connection
(
sdHost
,
9042
);
dcdbConn
=
new
DCDB
::
Connection
(
cassandraHost
,
atoi
(
cassandraPort
.
c_str
())
);
if
(
!
dcdbConn
->
connect
())
{
std
::
cout
<<
"Cannot connect to Cassandra!"
<<
std
::
endl
;
...
...
@@ -301,28 +320,27 @@ int main(int argc, char* const argv[]) {
/*
* Start the MQTT Message Server.
*/
SimpleMQTTServer
ms
(
listenHost
,
"1883"
);
SimpleMQTTServer
ms
(
listenHost
,
listenPort
);
ms
.
setMessageCallback
(
mqttCallback
);
ms
.
start
();
cout
<<
"MQTT Server running..."
<<
std
::
endl
;
std
::
thread
t1
;
/*<< Creates the request handler. >>*/
http_handler
handler
;
/*<< Creates the server. >>*/
server
::
options
options
(
h
andler
)
;
options
.
reuse_address
(
true
);
options
.
thread_pool
(
std
::
make_shared
<
boost
::
network
::
utils
::
thread_pool
>
()
);
server
server_
(
options
.
address
(
"127.0.0.1"
).
port
(
"8080"
));
/*<< Runs the server. >>*/
t1
=
std
::
thread
([
&
server_
]
{
server_
.
run
();
});
cout
<<
"HTTP Server running..."
<<
std
::
endl
;
/*
* Start the HTTP Server for the REST API
*/
std
::
thread
httpThread
;
httpHandler_t
httpH
andler
;
httpServer_t
::
options
httpOptions
(
httpHandler
);
httpOptions
.
reuse_address
(
true
);
httpOptions
.
thread_pool
(
std
::
make_shared
<
boost
::
network
::
utils
::
thread_pool
>
(
));
httpServer_t
httpServer
(
httpOptions
.
address
(
restApiHost
).
port
(
restApiPort
));
httpThread
=
std
::
thread
([
&
httpServer
]
{
httpServer
.
run
();
});
cout
<<
"HTTP Server running..."
<<
std
::
endl
;
/*
* Run (hopefully) forever...
*/
...
...
@@ -351,8 +369,8 @@ int main(int argc, char* const argv[]) {
ms
.
stop
();
cout
<<
"MQTT Server stopped..."
<<
std
::
endl
;
s
erver
_
.
stop
();
t1
.
join
();
httpS
erver
.
stop
();
httpThread
.
join
();
cout
<<
"HTTP Server stopped..."
<<
std
::
endl
;
delete
mySensorDataStore
;
dcdbConn
->
disconnect
();
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment