Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
7
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Open sidebar
dcdb
dcdb
Commits
76633c62
Commit
76633c62
authored
Oct 30, 2019
by
Carla Guillen Carias
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'development' of
https://gitlab.lrz.de/dcdb/dcdb
into development
parents
05b7e277
509656b2
Changes
92
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
75 changed files
with
6199 additions
and
6161 deletions
+6199
-6161
.clang-format
.clang-format
+9
-0
dcdbpusher/Caliper/dcdbpusher/DcdbPusher.cpp
dcdbpusher/Caliper/dcdbpusher/DcdbPusher.cpp
+755
-759
dcdbpusher/Configuration.cpp
dcdbpusher/Configuration.cpp
+14
-14
dcdbpusher/Configuration.h
dcdbpusher/Configuration.h
+12
-13
dcdbpusher/MQTTPusher.cpp
dcdbpusher/MQTTPusher.cpp
+105
-104
dcdbpusher/MQTTPusher.h
dcdbpusher/MQTTPusher.h
+26
-24
dcdbpusher/PluginManager.cpp
dcdbpusher/PluginManager.cpp
+237
-237
dcdbpusher/PluginManager.h
dcdbpusher/PluginManager.h
+46
-50
dcdbpusher/README.md
dcdbpusher/README.md
+1
-2
dcdbpusher/RestAPI.cpp
dcdbpusher/RestAPI.cpp
+409
-410
dcdbpusher/RestAPI.h
dcdbpusher/RestAPI.h
+65
-66
dcdbpusher/config/snmp.conf
dcdbpusher/config/snmp.conf
+0
-1
dcdbpusher/dcdbpusher.cpp
dcdbpusher/dcdbpusher.cpp
+114
-113
dcdbpusher/includes/ConfiguratorInterface.h
dcdbpusher/includes/ConfiguratorInterface.h
+158
-155
dcdbpusher/includes/ConfiguratorTemplate.h
dcdbpusher/includes/ConfiguratorTemplate.h
+464
-468
dcdbpusher/includes/EntityInterface.h
dcdbpusher/includes/EntityInterface.h
+89
-87
dcdbpusher/includes/PluginDefinitions.h
dcdbpusher/includes/PluginDefinitions.h
+5
-5
dcdbpusher/includes/SensorGroupInterface.h
dcdbpusher/includes/SensorGroupInterface.h
+148
-147
dcdbpusher/includes/SensorGroupTemplate.h
dcdbpusher/includes/SensorGroupTemplate.h
+255
-259
dcdbpusher/sensors/bacnet/BACnetClient.cpp
dcdbpusher/sensors/bacnet/BACnetClient.cpp
+75
-77
dcdbpusher/sensors/bacnet/BACnetClient.h
dcdbpusher/sensors/bacnet/BACnetClient.h
+40
-43
dcdbpusher/sensors/bacnet/BACnetConfigurator.cpp
dcdbpusher/sensors/bacnet/BACnetConfigurator.cpp
+19
-14
dcdbpusher/sensors/bacnet/BACnetConfigurator.h
dcdbpusher/sensors/bacnet/BACnetConfigurator.h
+8
-8
dcdbpusher/sensors/bacnet/BACnetSensorBase.h
dcdbpusher/sensors/bacnet/BACnetSensorBase.h
+43
-42
dcdbpusher/sensors/bacnet/BACnetSensorGroup.cpp
dcdbpusher/sensors/bacnet/BACnetSensorGroup.cpp
+13
-13
dcdbpusher/sensors/bacnet/BACnetSensorGroup.h
dcdbpusher/sensors/bacnet/BACnetSensorGroup.h
+8
-8
dcdbpusher/sensors/caliper/CaliperConfigurator.cpp
dcdbpusher/sensors/caliper/CaliperConfigurator.cpp
+7
-7
dcdbpusher/sensors/caliper/CaliperConfigurator.h
dcdbpusher/sensors/caliper/CaliperConfigurator.h
+6
-6
dcdbpusher/sensors/caliper/CaliperSensorBase.h
dcdbpusher/sensors/caliper/CaliperSensorBase.h
+19
-20
dcdbpusher/sensors/caliper/CaliperSensorGroup.cpp
dcdbpusher/sensors/caliper/CaliperSensorGroup.cpp
+309
-309
dcdbpusher/sensors/caliper/CaliperSensorGroup.h
dcdbpusher/sensors/caliper/CaliperSensorGroup.h
+46
-48
dcdbpusher/sensors/gpfsmon/GpfsmonConfigurator.cpp
dcdbpusher/sensors/gpfsmon/GpfsmonConfigurator.cpp
+13
-14
dcdbpusher/sensors/gpfsmon/GpfsmonConfigurator.h
dcdbpusher/sensors/gpfsmon/GpfsmonConfigurator.h
+7
-7
dcdbpusher/sensors/gpfsmon/GpfsmonSensorBase.h
dcdbpusher/sensors/gpfsmon/GpfsmonSensorBase.h
+55
-56
dcdbpusher/sensors/gpfsmon/GpfsmonSensorGroup.cpp
dcdbpusher/sensors/gpfsmon/GpfsmonSensorGroup.cpp
+35
-31
dcdbpusher/sensors/gpfsmon/GpfsmonSensorGroup.h
dcdbpusher/sensors/gpfsmon/GpfsmonSensorGroup.h
+9
-11
dcdbpusher/sensors/ipmi/IPMIConfigurator.cpp
dcdbpusher/sensors/ipmi/IPMIConfigurator.cpp
+11
-13
dcdbpusher/sensors/ipmi/IPMIConfigurator.h
dcdbpusher/sensors/ipmi/IPMIConfigurator.h
+11
-11
dcdbpusher/sensors/ipmi/IPMIHost.cpp
dcdbpusher/sensors/ipmi/IPMIHost.cpp
+91
-91
dcdbpusher/sensors/ipmi/IPMIHost.h
dcdbpusher/sensors/ipmi/IPMIHost.h
+42
-42
dcdbpusher/sensors/ipmi/IPMISensorBase.h
dcdbpusher/sensors/ipmi/IPMISensorBase.h
+80
-75
dcdbpusher/sensors/ipmi/IPMISensorGroup.cpp
dcdbpusher/sensors/ipmi/IPMISensorGroup.cpp
+46
-47
dcdbpusher/sensors/ipmi/IPMISensorGroup.h
dcdbpusher/sensors/ipmi/IPMISensorGroup.h
+11
-11
dcdbpusher/sensors/ipmi/LenovoXCC.cpp
dcdbpusher/sensors/ipmi/LenovoXCC.cpp
+167
-169
dcdbpusher/sensors/ipmi/LenovoXCC.h
dcdbpusher/sensors/ipmi/LenovoXCC.h
+19
-20
dcdbpusher/sensors/msr/MSRConfigurator.cpp
dcdbpusher/sensors/msr/MSRConfigurator.cpp
+150
-151
dcdbpusher/sensors/msr/MSRConfigurator.h
dcdbpusher/sensors/msr/MSRConfigurator.h
+7
-7
dcdbpusher/sensors/msr/MSRSensorBase.h
dcdbpusher/sensors/msr/MSRSensorBase.h
+16
-17
dcdbpusher/sensors/msr/MSRSensorGroup.cpp
dcdbpusher/sensors/msr/MSRSensorGroup.cpp
+72
-74
dcdbpusher/sensors/msr/MSRSensorGroup.h
dcdbpusher/sensors/msr/MSRSensorGroup.h
+42
-44
dcdbpusher/sensors/msr/Types.h
dcdbpusher/sensors/msr/Types.h
+28
-32
dcdbpusher/sensors/opa/OpaConfigurator.cpp
dcdbpusher/sensors/opa/OpaConfigurator.cpp
+32
-32
dcdbpusher/sensors/opa/OpaConfigurator.h
dcdbpusher/sensors/opa/OpaConfigurator.h
+7
-7
dcdbpusher/sensors/opa/OpaSensorBase.h
dcdbpusher/sensors/opa/OpaSensorBase.h
+130
-130
dcdbpusher/sensors/opa/OpaSensorGroup.cpp
dcdbpusher/sensors/opa/OpaSensorGroup.cpp
+112
-112
dcdbpusher/sensors/opa/OpaSensorGroup.h
dcdbpusher/sensors/opa/OpaSensorGroup.h
+14
-14
dcdbpusher/sensors/pdu/PDUConfigurator.cpp
dcdbpusher/sensors/pdu/PDUConfigurator.cpp
+3
-3
dcdbpusher/sensors/pdu/PDUConfigurator.h
dcdbpusher/sensors/pdu/PDUConfigurator.h
+11
-11
dcdbpusher/sensors/pdu/PDUSensorBase.h
dcdbpusher/sensors/pdu/PDUSensorBase.h
+30
-30
dcdbpusher/sensors/pdu/PDUSensorGroup.cpp
dcdbpusher/sensors/pdu/PDUSensorGroup.cpp
+25
-25
dcdbpusher/sensors/pdu/PDUSensorGroup.h
dcdbpusher/sensors/pdu/PDUSensorGroup.h
+10
-10
dcdbpusher/sensors/pdu/PDUUnit.cpp
dcdbpusher/sensors/pdu/PDUUnit.cpp
+21
-22
dcdbpusher/sensors/pdu/PDUUnit.h
dcdbpusher/sensors/pdu/PDUUnit.h
+12
-12
dcdbpusher/sensors/perfevent/PerfSensorBase.h
dcdbpusher/sensors/perfevent/PerfSensorBase.h
+44
-44
dcdbpusher/sensors/perfevent/PerfSensorGroup.cpp
dcdbpusher/sensors/perfevent/PerfSensorGroup.cpp
+219
-220
dcdbpusher/sensors/perfevent/PerfSensorGroup.h
dcdbpusher/sensors/perfevent/PerfSensorGroup.h
+30
-30
dcdbpusher/sensors/perfevent/PerfeventConfigurator.cpp
dcdbpusher/sensors/perfevent/PerfeventConfigurator.cpp
+170
-170
dcdbpusher/sensors/perfevent/PerfeventConfigurator.h
dcdbpusher/sensors/perfevent/PerfeventConfigurator.h
+10
-10
dcdbpusher/sensors/procfs/ProcfsConfigurator.cpp
dcdbpusher/sensors/procfs/ProcfsConfigurator.cpp
+102
-98
dcdbpusher/sensors/procfs/ProcfsConfigurator.h
dcdbpusher/sensors/procfs/ProcfsConfigurator.h
+17
-17
dcdbpusher/sensors/procfs/ProcfsParser.cpp
dcdbpusher/sensors/procfs/ProcfsParser.cpp
+491
-459
dcdbpusher/sensors/procfs/ProcfsParser.h
dcdbpusher/sensors/procfs/ProcfsParser.h
+136
-128
dcdbpusher/sensors/procfs/ProcfsSensorBase.h
dcdbpusher/sensors/procfs/ProcfsSensorBase.h
+39
-36
dcdbpusher/sensors/procfs/ProcfsSensorGroup.cpp
dcdbpusher/sensors/procfs/ProcfsSensorGroup.cpp
+66
-66
dcdbpusher/sensors/procfs/ProcfsSensorGroup.h
dcdbpusher/sensors/procfs/ProcfsSensorGroup.h
+51
-43
No files found.
.clang-format
0 → 100644
View file @
76633c62
BasedOnStyle: LLVM
IndentWidth: 8
UseTab: ForContinuationAndIndentation
BreakBeforeBraces: Attach
AlignConsecutiveDeclarations: true
AllowShortBlocksOnASingleLine: true
AllowShortFunctionsOnASingleLine: Inline
ColumnLimit: 0
IndentCaseLabels: true
dcdbpusher/Caliper/dcdbpusher/DcdbPusher.cpp
View file @
76633c62
This diff is collapsed.
Click to expand it.
dcdbpusher/Configuration.cpp
View file @
76633c62
...
...
@@ -35,10 +35,10 @@ bool Configuration::readAdditionalValues(boost::property_tree::iptree::value_typ
// ----- READING ADDITIONAL GLOBAL SETTINGS -----
if
(
boost
::
iequals
(
global
.
first
,
"mqttBroker"
))
{
brokerHost
=
parseNetworkHost
(
global
.
second
.
data
());
brokerPort
=
parseNetworkPort
(
global
.
second
.
data
())
==
""
?
BROKERPORT
:
stoi
(
parseNetworkPort
(
global
.
second
.
data
()));
brokerPort
=
parseNetworkPort
(
global
.
second
.
data
())
==
""
?
BROKERPORT
:
stoi
(
parseNetworkPort
(
global
.
second
.
data
()));
}
else
if
(
boost
::
iequals
(
global
.
first
,
"qosLevel"
))
{
qosLevel
=
stoi
(
global
.
second
.
data
());
if
(
qosLevel
>
2
||
qosLevel
<
0
)
if
(
qosLevel
>
2
||
qosLevel
<
0
)
qosLevel
=
1
;
}
else
if
(
boost
::
iequals
(
global
.
first
,
"maxInflightMsgNum"
))
{
maxInflightMsgNum
=
stoull
(
global
.
second
.
data
());
...
...
@@ -52,32 +52,32 @@ bool Configuration::readAdditionalValues(boost::property_tree::iptree::value_typ
return
true
;
}
bool
Configuration
::
readPlugins
(
PluginManager
&
pluginManager
)
{
bool
Configuration
::
readPlugins
(
PluginManager
&
pluginManager
)
{
std
::
string
globalConfig
=
_cfgFilePath
;
globalConfig
.
append
(
_cfgFileName
);
boost
::
property_tree
::
iptree
cfg
;
try
{
boost
::
property_tree
::
read_info
(
globalConfig
,
cfg
);
}
catch
(
boost
::
property_tree
::
info_parser_error
&
e
)
{
}
catch
(
boost
::
property_tree
::
info_parser_error
&
e
)
{
LOG
(
error
)
<<
"Error when reading plugins from "
<<
_cfgFileName
<<
": "
<<
e
.
what
();
return
false
;
}
pluginManager
.
setCfgFilePath
(
_cfgFilePath
);
//read plugins
BOOST_FOREACH
(
boost
::
property_tree
::
iptree
::
value_type
&
plugin
,
cfg
.
get_child
(
"plugins"
))
{
BOOST_FOREACH
(
boost
::
property_tree
::
iptree
::
value_type
&
plugin
,
cfg
.
get_child
(
"plugins"
))
{
if
(
boost
::
iequals
(
plugin
.
first
,
"plugin"
))
{
if
(
!
plugin
.
second
.
data
().
empty
())
{
std
::
string
pluginName
=
plugin
.
second
.
data
();
std
::
string
pluginConfig
=
""
;
// path to config file for plugin
std
::
string
pluginPath
=
""
;
// path to plugin
std
::
string
pluginName
=
plugin
.
second
.
data
();
std
::
string
pluginConfig
=
""
;
// path to config file for plugin
std
::
string
pluginPath
=
""
;
// path to plugin
LOG
(
info
)
<<
"Read plugin "
<<
pluginName
<<
"..."
;
BOOST_FOREACH
(
boost
::
property_tree
::
iptree
::
value_type
&
val
,
plugin
.
second
)
{
BOOST_FOREACH
(
boost
::
property_tree
::
iptree
::
value_type
&
val
,
plugin
.
second
)
{
if
(
boost
::
iequals
(
val
.
first
,
"path"
))
{
pluginPath
=
val
.
second
.
data
();
pluginPath
=
val
.
second
.
data
();
}
else
if
(
boost
::
iequals
(
val
.
first
,
"config"
))
{
pluginConfig
=
val
.
second
.
data
();
}
else
{
...
...
@@ -85,10 +85,10 @@ bool Configuration::readPlugins(PluginManager& pluginManager) {
}
}
if
(
!
pluginManager
.
loadPlugin
(
pluginName
,
pluginPath
,
pluginConfig
))
{
LOG
(
error
)
<<
"Could not load plugin "
<<
pluginName
;
pluginManager
.
unloadPlugin
();
return
false
;
if
(
!
pluginManager
.
loadPlugin
(
pluginName
,
pluginPath
,
pluginConfig
))
{
LOG
(
error
)
<<
"Could not load plugin "
<<
pluginName
;
pluginManager
.
unloadPlugin
();
return
false
;
}
}
}
...
...
dcdbpusher/Configuration.h
View file @
76633c62
...
...
@@ -43,17 +43,17 @@
* @ingroup pusher
*/
class
Configuration
:
public
GlobalConfiguration
{
public:
public:
/**
* Create new Configuration. Sets global config file to read from to cfgFile.
*
* @param cfgFilePath Path to where all config-files are located
* @param cfgFileName Name of the file containing the config
*/
Configuration
(
const
std
::
string
&
cfgFilePath
,
const
std
::
string
&
cfgFileName
)
:
GlobalConfiguration
(
cfgFilePath
,
cfgFileName
)
{}
Configuration
(
const
std
::
string
&
cfgFilePath
,
const
std
::
string
&
cfgFileName
)
:
GlobalConfiguration
(
cfgFilePath
,
cfgFileName
)
{}
virtual
~
Configuration
()
{}
/**
...
...
@@ -64,18 +64,17 @@ public:
*
* @return true on success, false otherwise
*/
bool
readPlugins
(
PluginManager
&
pluginManager
);
bool
readPlugins
(
PluginManager
&
pluginManager
);
// Additional configuration parameters to be parsed and stored in the global block
int
qosLevel
=
1
;
int
qosLevel
=
1
;
unsigned
int
maxInflightMsgNum
=
20
;
unsigned
int
maxQueuedMsgNum
=
0
;
int
brokerPort
=
BROKERPORT
;
std
::
string
brokerHost
=
BROKERHOST
;
int
maxMsgNum
=
0
;
protected:
int
brokerPort
=
BROKERPORT
;
std
::
string
brokerHost
=
BROKERHOST
;
int
maxMsgNum
=
0
;
protected:
bool
readAdditionalValues
(
boost
::
property_tree
::
iptree
::
value_type
&
global
)
override
;
};
...
...
dcdbpusher/MQTTPusher.cpp
View file @
76633c62
...
...
@@ -26,27 +26,27 @@
//================================================================================
#include "MQTTPusher.h"
#include "timestamp.h"
#include <iostream>
#include <string>
#include <unistd.h>
#include "timestamp.h"
MQTTPusher
::
MQTTPusher
(
int
brokerPort
,
const
std
::
string
&
brokerHost
,
const
bool
autoPublish
,
int
qosLevel
,
pusherPluginStorage_t
&
plugins
,
op_pluginVector_t
&
oPlugins
,
int
maxNumberOfMessages
,
unsigned
int
maxInflightMsgNum
,
unsigned
int
maxQueuedMsgNum
)
:
_qosLevel
(
qosLevel
),
_brokerPort
(
brokerPort
),
_brokerHost
(
brokerHost
),
_autoPublish
(
autoPublish
),
_plugins
(
plugins
),
_operatorPlugins
(
oPlugins
),
_connected
(
false
),
_keepRunning
(
true
),
_msgCap
(
DISABLED
),
_doHalt
(
false
),
_halted
(
false
),
_maxNumberOfMessages
(
maxNumberOfMessages
),
_maxInflightMsgNum
(
maxInflightMsgNum
),
_maxQueuedMsgNum
(
maxQueuedMsgNum
)
{
MQTTPusher
::
MQTTPusher
(
int
brokerPort
,
const
std
::
string
&
brokerHost
,
const
bool
autoPublish
,
int
qosLevel
,
pusherPluginStorage_t
&
plugins
,
op_pluginVector_t
&
oPlugins
,
int
maxNumberOfMessages
,
unsigned
int
maxInflightMsgNum
,
unsigned
int
maxQueuedMsgNum
)
:
_qosLevel
(
qosLevel
),
_brokerPort
(
brokerPort
),
_brokerHost
(
brokerHost
),
_autoPublish
(
autoPublish
),
_plugins
(
plugins
),
_operatorPlugins
(
oPlugins
),
_connected
(
false
),
_keepRunning
(
true
),
_msgCap
(
DISABLED
),
_doHalt
(
false
),
_halted
(
false
),
_maxNumberOfMessages
(
maxNumberOfMessages
),
_maxInflightMsgNum
(
maxInflightMsgNum
),
_maxQueuedMsgNum
(
maxQueuedMsgNum
)
{
//first print some info
int
mosqMajor
,
mosqMinor
,
mosqRevision
;
...
...
@@ -75,7 +75,7 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const bool
}
MQTTPusher
::~
MQTTPusher
()
{
if
(
_connected
)
{
if
(
_connected
)
{
mosquitto_disconnect
(
_mosq
);
}
mosquitto_destroy
(
_mosq
);
...
...
@@ -83,8 +83,8 @@ MQTTPusher::~MQTTPusher() {
}
void
MQTTPusher
::
push
()
{
int
mosqErr
;
uint64_t
idleTime
=
0
;
int
mosqErr
;
uint64_t
idleTime
=
0
;
//connect to broker (if necessary)
while
(
_keepRunning
&&
!
_connected
)
{
if
(
mosquitto_connect
(
_mosq
,
_brokerHost
.
c_str
(),
_brokerPort
,
1000
)
!=
MOSQ_ERR_SUCCESS
)
{
...
...
@@ -98,10 +98,10 @@ void MQTTPusher::push() {
//Performing auto-publish if necessary
sendMappings
();
computeMsgRate
();
//collect sensor-data
reading_t
*
reads
=
new
reading_t
[
SensorBase
::
QUEUE_MAXLIMIT
];
reading_t
*
reads
=
new
reading_t
[
SensorBase
::
QUEUE_MAXLIMIT
];
std
::
size_t
totalCount
=
0
;
//number of messages
while
(
_keepRunning
||
totalCount
)
{
if
(
_doHalt
)
{
...
...
@@ -110,7 +110,7 @@ void MQTTPusher::push() {
continue
;
}
_halted
=
false
;
//there was a (unintended) disconnect in the meantime --> reconnect
if
(
!
_connected
)
{
LOGM
(
error
)
<<
"Lost connection. Reconnecting..."
;
...
...
@@ -124,41 +124,41 @@ void MQTTPusher::push() {
}
if
(
_connected
)
{
if
(
getTimestamp
()
-
idleTime
>=
PUSHER_IDLETIME
)
{
idleTime
=
getTimestamp
();
totalCount
=
0
;
// Push sensor data
for
(
auto
&
p
:
_plugins
)
{
if
(
_doHalt
)
{
//for faster response
break
;
}
for
(
const
auto
&
g
:
p
.
configurator
->
getSensorGroups
())
{
for
(
const
auto
&
s
:
g
->
acquireSensors
())
{
if
(
s
->
getSizeOfReadingQueue
()
>=
g
->
getMinValues
())
{
if
(
_msgCap
==
DISABLED
||
totalCount
<
(
unsigned
)
_maxNumberOfMessages
)
{
if
(
sendReadings
(
*
s
,
reads
,
totalCount
)
>
0
)
{
break
;
}
}
else
{
break
;
//ultimately we will go to sleep 1 second
}
}
}
g
->
releaseSensors
();
}
}
// Push output analytics sensors
for
(
auto
&
p
:
_operatorPlugins
)
{
if
(
_doHalt
)
{
break
;
}
for
(
const
auto
&
op
:
p
.
configurator
->
getOperators
())
{
if
(
op
->
getStreaming
())
{
if
(
getTimestamp
()
-
idleTime
>=
PUSHER_IDLETIME
)
{
idleTime
=
getTimestamp
();
totalCount
=
0
;
// Push sensor data
for
(
auto
&
p
:
_plugins
)
{
if
(
_doHalt
)
{
//for faster response
break
;
}
for
(
const
auto
&
g
:
p
.
configurator
->
getSensorGroups
())
{
for
(
const
auto
&
s
:
g
->
acquireSensors
())
{
if
(
s
->
getSizeOfReadingQueue
()
>=
g
->
getMinValues
())
{
if
(
_msgCap
==
DISABLED
||
totalCount
<
(
unsigned
)
_maxNumberOfMessages
)
{
if
(
sendReadings
(
*
s
,
reads
,
totalCount
)
>
0
)
{
break
;
}
}
else
{
break
;
//ultimately we will go to sleep 1 second
}
}
}
g
->
releaseSensors
();
}
}
// Push output analytics sensors
for
(
auto
&
p
:
_operatorPlugins
)
{
if
(
_doHalt
)
{
break
;
}
for
(
const
auto
&
op
:
p
.
configurator
->
getOperators
())
{
if
(
op
->
getStreaming
())
{
for
(
const
auto
&
u
:
op
->
getUnits
())
{
for
(
const
auto
&
s
:
u
->
getBaseOutputs
())
{
if
(
s
->
getSizeOfReadingQueue
()
>=
op
->
getMinValues
())
{
if
(
_msgCap
==
DISABLED
||
totalCount
<
(
unsigned
)
_maxNumberOfMessages
)
{
if
(
_msgCap
==
DISABLED
||
totalCount
<
(
unsigned
)
_maxNumberOfMessages
)
{
if
(
sendReadings
(
*
s
,
reads
,
totalCount
)
>
0
)
{
break
;
}
...
...
@@ -170,9 +170,9 @@ void MQTTPusher::push() {
}
op
->
releaseUnits
();
}
}
}
}
}
}
}
if
((
mosqErr
=
mosquitto_loop
(
_mosq
,
-
1
,
1
))
!=
MOSQ_ERR_SUCCESS
)
{
if
(
mosqErr
==
MOSQ_ERR_CONN_LOST
)
{
...
...
@@ -188,23 +188,23 @@ void MQTTPusher::push() {
mosquitto_disconnect
(
_mosq
);
}
int
MQTTPusher
::
sendReadings
(
SensorBase
&
s
,
reading_t
*
reads
,
std
::
size_t
&
totalCount
)
{
int
MQTTPusher
::
sendReadings
(
SensorBase
&
s
,
reading_t
*
reads
,
std
::
size_t
&
totalCount
)
{
//get all sensor values out of its queue
std
::
size_t
count
=
s
.
popReadingQueue
(
reads
,
SensorBase
::
QUEUE_MAXLIMIT
);
//totalCount+= count;
totalCount
+=
1
;
totalCount
+=
1
;
#ifdef DEBUG
LOGM
(
debug
)
<<
"Sending "
<<
count
<<
" values from "
<<
s
.
getName
();
#endif
#if DEBUG
for
(
std
::
size_t
i
=
0
;
i
<
count
;
i
++
)
{
for
(
std
::
size_t
i
=
0
;
i
<
count
;
i
++
)
{
LOG
(
debug
)
<<
" "
<<
reads
[
i
].
timestamp
<<
" "
<<
reads
[
i
].
value
;
}
#endif
//try to send them to the broker
int
rc
;
if
((
rc
=
mosquitto_publish
(
_mosq
,
NULL
,
s
.
getMqtt
().
c_str
(),
sizeof
(
reading_t
)
*
count
,
reads
,
_qosLevel
,
false
))
!=
MOSQ_ERR_SUCCESS
)
{
if
((
rc
=
mosquitto_publish
(
_mosq
,
NULL
,
s
.
getMqtt
().
c_str
(),
sizeof
(
reading_t
)
*
count
,
reads
,
_qosLevel
,
false
))
!=
MOSQ_ERR_SUCCESS
)
{
//could not send them --> push the sensor values back into the queue
if
(
rc
==
MOSQ_ERR_NOMEM
)
{
LOGM
(
info
)
<<
"Can
\'
t queue additional messages"
;
...
...
@@ -221,15 +221,16 @@ int MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& total
}
bool
MQTTPusher
::
sendMappings
()
{
if
(
!
_autoPublish
)
return
false
;
if
(
!
_autoPublish
)
return
false
;
std
::
string
topic
,
payload
;
unsigned
int
publishCtr
=
0
;
std
::
string
topic
,
payload
;
unsigned
int
publishCtr
=
0
;
// Performing auto-publish for sensors
for
(
auto
&
p
:
_plugins
)
{
for
(
auto
&
g
:
p
.
configurator
->
getSensorGroups
())
{
for
(
auto
&
s
:
g
->
acquireSensors
())
{
if
(
s
->
getPublish
())
{
for
(
auto
&
p
:
_plugins
)
{
for
(
auto
&
g
:
p
.
configurator
->
getSensorGroups
())
{
for
(
auto
&
s
:
g
->
acquireSensors
())
{
if
(
s
->
getPublish
())
{
if
(
!
s
->
getMetadata
())
{
topic
=
std
::
string
(
DCDB_MAP
)
+
s
->
getMqtt
();
payload
=
s
->
getName
();
...
...
@@ -257,11 +258,11 @@ bool MQTTPusher::sendMappings() {
}
// Performing auto-publish for analytics output sensors
for
(
auto
&
p
:
_operatorPlugins
)
for
(
auto
&
op
:
p
.
configurator
->
getOperators
())
if
(
op
->
getStreaming
()
&&
!
op
->
getDynamic
())
{
for
(
auto
&
u
:
op
->
getUnits
())
for
(
auto
&
s
:
u
->
getBaseOutputs
())
{
for
(
auto
&
p
:
_operatorPlugins
)
for
(
auto
&
op
:
p
.
configurator
->
getOperators
())
if
(
op
->
getStreaming
()
&&
!
op
->
getDynamic
())
{
for
(
auto
&
u
:
op
->
getUnits
())
for
(
auto
&
s
:
u
->
getBaseOutputs
())
{
if
(
s
->
getPublish
())
{
if
(
!
s
->
getMetadata
())
{
topic
=
std
::
string
(
DCDB_MAP
)
+
s
->
getMqtt
();
...
...
@@ -292,47 +293,47 @@ bool MQTTPusher::sendMappings() {
}
bool
MQTTPusher
::
halt
(
unsigned
short
timeout
)
{
_doHalt
=
true
;
_doHalt
=
true
;
for
(
unsigned
short
i
=
1
;
i
<=
timeout
;
i
++
)
{
if
(
_halted
)
{
return
true
;
}
else
{
LOGM
(
info
)
<<
"Waiting for push cycle to pause... ("
<<
i
<<
"/"
<<
timeout
<<
")"
;
sleep
(
1
);
}
}
for
(
unsigned
short
i
=
1
;
i
<=
timeout
;
i
++
)
{
if
(
_halted
)
{
return
true
;
}
else
{
LOGM
(
info
)
<<
"Waiting for push cycle to pause... ("
<<
i
<<
"/"
<<
timeout
<<
")"
;
sleep
(
1
);
}
}
cont
();
LOGM
(
info
)
<<
"Timeout: push cycle did not pause. Continuing..."
;
return
false
;
cont
();
LOGM
(
info
)
<<
"Timeout: push cycle did not pause. Continuing..."
;
return
false
;
}
void
MQTTPusher
::
computeMsgRate
()
{
// Computing number of sent MQTT messages per second
float
msgRate
=
0
;
bool
dynWarning
=
false
;
for
(
auto
&
p
:
_plugins
)
{
for
(
const
auto
&
g
:
p
.
configurator
->
getSensorGroups
())
{
msgRate
+=
(
float
)
g
->
acquireSensors
().
size
()
*
(
1000.0
f
/
(
float
)
g
->
getInterval
()
)
/
(
float
)
g
->
getMinValues
();
bool
dynWarning
=
false
;
for
(
auto
&
p
:
_plugins
)
{
for
(
const
auto
&
g
:
p
.
configurator
->
getSensorGroups
())
{
msgRate
+=
(
float
)
g
->
acquireSensors
().
size
()
*
(
1000.0
f
/
(
float
)
g
->
getInterval
())
/
(
float
)
g
->
getMinValues
();
g
->
releaseSensors
();
}
}
for
(
auto
&
p
:
_operatorPlugins
)
for
(
const
auto
&
op
:
p
.
configurator
->
getOperators
())
{
if
(
op
->
getStreaming
()
&&
!
op
->
getDynamic
())
{
for
(
const
auto
&
u
:
op
->
getUnits
())
msgRate
+=
(
float
)
u
->
getBaseOutputs
().
size
()
*
(
1000.0
f
/
(
float
)
op
->
getInterval
())
/
(
float
)
op
->
getMinValues
();
op
->
releaseUnits
();
}
else
if
(
op
->
getDynamic
())
dynWarning
=
true
;
}
for
(
auto
&
p
:
_operatorPlugins
)
for
(
const
auto
&
op
:
p
.
configurator
->
getOperators
())
{
if
(
op
->
getStreaming
()
&&
!
op
->
getDynamic
())
{
for
(
const
auto
&
u
:
op
->
getUnits
())
msgRate
+=
(
float
)
u
->
getBaseOutputs
().
size
()
*
(
1000.0
f
/
(
float
)
op
->
getInterval
())
/
(
float
)
op
->
getMinValues
();
op
->
releaseUnits
();
}
else
if
(
op
->
getDynamic
())
dynWarning
=
true
;
}
// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
if
(
_maxNumberOfMessages
>=
0
&&
_msgCap
!=
MINIMUM
)
{
if
(
_maxNumberOfMessages
>=
0
&&
_msgCap
!=
MINIMUM
)
{
_msgCap
=
_maxNumberOfMessages
==
0
||
msgRate
>
_maxNumberOfMessages
?
DISABLED
:
ENABLED
;
if
(
_msgCap
==
DISABLED
&&
_maxNumberOfMessages
>
0
)
LOGM
(
warning
)
<<
"Cannot enforce max rate of "
<<
_maxNumberOfMessages
<<
" msg/s lower than actual "
<<
msgRate
<<
" msg/s!"
;
else
if
(
_maxNumberOfMessages
>
0
)
else
if
(
_maxNumberOfMessages
>
0
)
LOGM
(
info
)
<<
"Enforcing message cap of "
<<
_maxNumberOfMessages
<<
" msg/s against actual "
<<
msgRate
<<
" msg/s."
;
else
LOGM
(
info
)
<<
"No message cap enforced. Predicted message rate is "
<<
msgRate
<<
" msg/s."
;
...
...
@@ -341,6 +342,6 @@ void MQTTPusher::computeMsgRate() {
_maxNumberOfMessages
=
msgRate
+
10
;
LOGM
(
info
)
<<
"Enforcing message cap of "
<<
_maxNumberOfMessages
<<
" msg/s against actual "
<<
msgRate
<<
" msg/s."
;
}
if
(
_msgCap
!=
DISABLED
&&
dynWarning
)
LOGM
(
warning
)
<<
"Attention! The computed message rate does not account for analyzers with dynamically-generated sensors."
;
if
(
_msgCap
!=
DISABLED
&&
dynWarning
)
LOGM
(
warning
)
<<
"Attention! The computed message rate does not account for analyzers with dynamically-generated sensors."
;
}
dcdbpusher/MQTTPusher.h
View file @
76633c62
...
...
@@ -32,13 +32,15 @@
#define DCDB_MET "/DCDB_MAP/METADATA/"
#define PUSHER_IDLETIME 1000000000
#include <mosquitto.h>
#include <map>
#include "../analytics/OperatorManager.h"
#include "PluginManager.h"
#include "sensorbase.h"
#include "../analytics/OperatorManager.h"
#include <map>
#include <mosquitto.h>
enum
msgCap_t
{
DISABLED
=
1
,
ENABLED
=
2
,
MINIMUM
=
3
};
enum
msgCap_t
{
DISABLED
=
1
,
ENABLED
=
2
,
MINIMUM
=
3
};
/**
* @brief Collects values from the sensors and pushes them to the database.
...
...
@@ -46,9 +48,9 @@ enum msgCap_t {DISABLED = 1, ENABLED = 2, MINIMUM = 3};
* @ingroup pusher
*/
class
MQTTPusher
{
public:
MQTTPusher
(
int
brokerPort
,
const
std
::
string
&
brokerHost
,
const
bool
autoPublish
,
int
qosLevel
,
pusherPluginStorage_t
&
plugins
,
op_pluginVector_t
&
oPlugins
,
int
maxNumberOfMessages
,
unsigned
int
maxInflightMsgNum
,
unsigned
int
maxQueuedMsgNum
);
public:
MQTTPusher
(
int
brokerPort
,
const
std
::
string
&
brokerHost
,
const
bool
autoPublish
,
int
qosLevel
,
pusherPluginStorage_t
&
plugins
,
op_pluginVector_t
&
oPlugins
,
int
maxNumberOfMessages
,
unsigned
int
maxInflightMsgNum
,
unsigned
int
maxQueuedMsgNum
);
virtual
~
MQTTPusher
();
/**
...
...
@@ -104,25 +106,25 @@ public:
_doHalt
=
false
;
}
private:
int
sendReadings
(
SensorBase
&
s
,
reading_t
*
reads
,
std
::
size_t
&
totalCount
);
private:
int
sendReadings
(
SensorBase
&
s
,
reading_t
*
reads
,
std
::
size_t
&
totalCount
);
void
computeMsgRate
();
int
_qosLevel
;
int
_brokerPort
;
std
::
string
_brokerHost
;
bool
_autoPublish
;
pusherPluginStorage_t
&
_plugins
;
op_pluginVector_t
&
_operatorPlugins
;
struct
mosquitto
*
_mosq
;
bool
_connected
;
bool
_keepRunning
;
msgCap_t
_msgCap
;
bool
_doHalt
;
bool
_halted
;
int
_maxNumberOfMessages
;
unsigned
int
_maxInflightMsgNum
;
unsigned
int
_maxQueuedMsgNum
;
int
_qosLevel
;
int
_brokerPort
;
std
::
string
_brokerHost
;
bool
_autoPublish
;
pusherPluginStorage_t
&
_plugins
;
op_pluginVector_t
&
_operatorPlugins
;
struct
mosquitto
*
_mosq
;
bool
_connected
;
bool
_keepRunning
;
msgCap_t
_msgCap
;
bool
_doHalt
;
bool
_halted
;
int
_maxNumberOfMessages
;
unsigned
int
_maxInflightMsgNum
;
unsigned
int
_maxQueuedMsgNum
;
boost
::
log
::
sources
::
severity_logger
<
boost
::
log
::
trivial
::
severity_level
>
lg
;
};
...
...
dcdbpusher/PluginManager.cpp
View file @
76633c62
...
...
@@ -33,266 +33,266 @@
using
namespace
std
;
PluginManager
::
PluginManager
(
const
pluginSettings_t
&
pluginSettings
)
:
_pluginSettings
(
pluginSettings
),
_cfgFilePath
(
"./"
)
{}
PluginManager
::
PluginManager
(
const
pluginSettings_t
&
pluginSettings
)
:
_pluginSettings
(
pluginSettings
),
_cfgFilePath
(
"./"
)
{}
PluginManager
::~
PluginManager
()
{
for
(
const
auto
&
p
:
_plugins
)
p
.
destroy
(
p
.
configurator
);
_plugins
.
clear
();
for
(
const
auto
&
p
:
_plugins
)
p
.
destroy
(
p
.
configurator
);
_plugins
.
clear
();
}
bool
PluginManager
::
loadPlugin
(
const
string
&
name
,
const
string
&
pluginPath
,
const
string
&
config
)
{
bool
PluginManager
::
loadPlugin
(
const
string
&
name
,
const
string
&
pluginPath
,
const
string
&
config
)
{
LOG
(
info
)
<<
"Loading plugin "
<<
name
<<
"..."
;
string
pluginLib
;
//path to the plugin-lib
string
pluginConfig
;
//path to config file for plugin
LOG
(
info
)
<<
"Loading plugin "
<<
name
<<
"..."
;
string
pluginLib
;
//path to the plugin-lib
string
pluginConfig
;
//path to config file for plugin
// build plugin path
pluginLib
=
"libdcdbplugin_"
+
name
;
//TODO add version information?
// build plugin path
pluginLib
=
"libdcdbplugin_"
+
name
;
//TODO add version information?
#if __APPLE__
pluginLib
+=
".dylib"
;
pluginLib
+=
".dylib"
;
#else
pluginLib
+=
".so"
;
pluginLib
+=
".so"
;