Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
7
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Open sidebar
dcdb
dcdb
Commits
e95764be
Commit
e95764be
authored
Jul 29, 2019
by
Alessio Netti
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Analytics: refactoring of Analyzers to Operators
- Make clean might be required
parent
33252803
Changes
59
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
59 changed files
with
1079 additions
and
1048 deletions
+1079
-1048
analytics/Makefile
analytics/Makefile
+14
-14
analytics/OperatorManager.cpp
analytics/OperatorManager.cpp
+137
-137
analytics/OperatorManager.h
analytics/OperatorManager.h
+86
-86
analytics/README.md
analytics/README.md
+107
-107
analytics/config/smucngperf.conf
analytics/config/smucngperf.conf
+0
-0
analytics/config/testeroperator.conf
analytics/config/testeroperator.conf
+3
-3
analytics/includes/JobOperatorConfiguratorTemplate.h
analytics/includes/JobOperatorConfiguratorTemplate.h
+32
-34
analytics/includes/JobOperatorTemplate.h
analytics/includes/JobOperatorTemplate.h
+43
-43
analytics/includes/OperatorConfiguratorInterface.h
analytics/includes/OperatorConfiguratorInterface.h
+23
-23
analytics/includes/OperatorConfiguratorTemplate.h
analytics/includes/OperatorConfiguratorTemplate.h
+139
-139
analytics/includes/OperatorInterface.h
analytics/includes/OperatorInterface.h
+44
-44
analytics/includes/OperatorTemplate.h
analytics/includes/OperatorTemplate.h
+58
-58
analytics/includes/UnitGenerator.h
analytics/includes/UnitGenerator.h
+5
-5
analytics/includes/UnitInterface.h
analytics/includes/UnitInterface.h
+4
-4
analytics/includes/UnitTemplate.h
analytics/includes/UnitTemplate.h
+3
-3
analytics/operators/aggregator/AggregatorConfigurator.cpp
analytics/operators/aggregator/AggregatorConfigurator.cpp
+4
-4
analytics/operators/aggregator/AggregatorConfigurator.h
analytics/operators/aggregator/AggregatorConfigurator.h
+6
-6
analytics/operators/aggregator/AggregatorOperator.cpp
analytics/operators/aggregator/AggregatorOperator.cpp
+17
-17
analytics/operators/aggregator/AggregatorOperator.h
analytics/operators/aggregator/AggregatorOperator.h
+11
-11
analytics/operators/aggregator/AggregatorSensorBase.h
analytics/operators/aggregator/AggregatorSensorBase.h
+3
-3
analytics/operators/aggregator/JobAggregatorConfigurator.cpp
analytics/operators/aggregator/JobAggregatorConfigurator.cpp
+5
-5
analytics/operators/aggregator/JobAggregatorConfigurator.h
analytics/operators/aggregator/JobAggregatorConfigurator.h
+6
-6
analytics/operators/aggregator/JobAggregatorOperator.cpp
analytics/operators/aggregator/JobAggregatorOperator.cpp
+10
-10
analytics/operators/aggregator/JobAggregatorOperator.h
analytics/operators/aggregator/JobAggregatorOperator.h
+11
-11
analytics/operators/filesink/FilesinkConfigurator.cpp
analytics/operators/filesink/FilesinkConfigurator.cpp
+5
-5
analytics/operators/filesink/FilesinkConfigurator.h
analytics/operators/filesink/FilesinkConfigurator.h
+6
-6
analytics/operators/filesink/FilesinkOperator.cpp
analytics/operators/filesink/FilesinkOperator.cpp
+14
-14
analytics/operators/filesink/FilesinkOperator.h
analytics/operators/filesink/FilesinkOperator.h
+10
-10
analytics/operators/filesink/FilesinkSensorBase.h
analytics/operators/filesink/FilesinkSensorBase.h
+3
-3
analytics/operators/regressor/RegressorConfigurator.cpp
analytics/operators/regressor/RegressorConfigurator.cpp
+12
-12
analytics/operators/regressor/RegressorConfigurator.h
analytics/operators/regressor/RegressorConfigurator.h
+6
-6
analytics/operators/regressor/RegressorOperator.cpp
analytics/operators/regressor/RegressorOperator.cpp
+27
-27
analytics/operators/regressor/RegressorOperator.h
analytics/operators/regressor/RegressorOperator.h
+10
-10
analytics/operators/regressor/RegressorSensorBase.h
analytics/operators/regressor/RegressorSensorBase.h
+3
-3
analytics/operators/smucngperf/SMUCNGPerfConfigurator.cpp
analytics/operators/smucngperf/SMUCNGPerfConfigurator.cpp
+4
-4
analytics/operators/smucngperf/SMUCNGPerfConfigurator.h
analytics/operators/smucngperf/SMUCNGPerfConfigurator.h
+33
-0
analytics/operators/smucngperf/SMUCNGPerfOperator.cpp
analytics/operators/smucngperf/SMUCNGPerfOperator.cpp
+7
-7
analytics/operators/smucngperf/SMUCNGPerfOperator.h
analytics/operators/smucngperf/SMUCNGPerfOperator.h
+8
-8
analytics/operators/smucngperf/SMUCSensorBase.h
analytics/operators/smucngperf/SMUCSensorBase.h
+3
-3
analytics/operators/smucngperf/skxderivedmetrics/SKXPMUMetrics.cpp
.../operators/smucngperf/skxderivedmetrics/SKXPMUMetrics.cpp
+1
-1
analytics/operators/smucngperf/skxderivedmetrics/SKXPMUMetrics.h
...cs/operators/smucngperf/skxderivedmetrics/SKXPMUMetrics.h
+0
-0
analytics/operators/testeroperator/TesterOperator.cpp
analytics/operators/testeroperator/TesterOperator.cpp
+9
-9
analytics/operators/testeroperator/TesterOperator.h
analytics/operators/testeroperator/TesterOperator.h
+11
-11
analytics/operators/testeroperator/TesterOperatorConfigurator.cpp
...s/operators/testeroperator/TesterOperatorConfigurator.cpp
+12
-12
analytics/operators/testeroperator/TesterOperatorConfigurator.h
...ics/operators/testeroperator/TesterOperatorConfigurator.h
+15
-15
collectagent/CARestAPI.cpp
collectagent/CARestAPI.cpp
+5
-5
collectagent/CARestAPI.h
collectagent/CARestAPI.h
+3
-3
collectagent/Makefile
collectagent/Makefile
+1
-1
collectagent/analyticscontroller.cpp
collectagent/analyticscontroller.cpp
+14
-14
collectagent/analyticscontroller.h
collectagent/analyticscontroller.h
+12
-12
collectagent/collectagent.cpp
collectagent/collectagent.cpp
+1
-1
dcdbpusher/MQTTPusher.cpp
dcdbpusher/MQTTPusher.cpp
+20
-20
dcdbpusher/MQTTPusher.h
dcdbpusher/MQTTPusher.h
+3
-3
dcdbpusher/Makefile
dcdbpusher/Makefile
+1
-1
dcdbpusher/README.md
dcdbpusher/README.md
+2
-2
dcdbpusher/RestAPI.cpp
dcdbpusher/RestAPI.cpp
+17
-17
dcdbpusher/RestAPI.h
dcdbpusher/RestAPI.h
+8
-8
dcdbpusher/config/dcdbpusher.conf
dcdbpusher/config/dcdbpusher.conf
+1
-1
dcdbpusher/dcdbpusher.cpp
dcdbpusher/dcdbpusher.cpp
+21
-21
No files found.
analytics/Makefile
View file @
e95764be
...
...
@@ -4,7 +4,7 @@ include ../config.mk
CXXFLAGS
+=
-DBOOST_NETWORK_ENABLE_HTTPS
-I
../common/include
-I
$(DCDBDEPLOYPATH)
/include
-I
$(DCDBDEPLOYPATH)
/include/opencv4
LIBS
=
-L
../lib
-L
$(DCDBDEPLOYPATH)
/lib/
-ldl
-lboost_system
-lboost_thread
-lboost_log_setup
-lboost_log
-lboost_regex
-lpthread
-rdynamic
ANALYZE
RS
=
aggregator regressor job_aggregator tester
analyze
r filesink smucngperf
analyzer
OPERATO
RS
=
aggregator regressor job_aggregator tester
operato
r filesink smucngperf
ifeq
($(OS),Darwin)
BACNET_PORT
=
bsd
...
...
@@ -16,47 +16,47 @@ else
LIBFLAGS
=
-shared
-Wl
,-soname,
PLUGINFLAGS
=
-fPIC
endif
ANALYZE
R_LIBS
=
$(
foreach
p,
$(
ANALYZE
RS)
,libdcdb
analyze
r_
$(p)
.
$(LIBEXT)
)
OPERATO
R_LIBS
=
$(
foreach
p,
$(
OPERATO
RS)
,libdcdb
operato
r_
$(p)
.
$(LIBEXT)
)
all
:
$(
ANALYZE
R_LIBS)
all
:
$(
OPERATO
R_LIBS)
debug
:
CXXFLAGS += -DDEBUG
debug
:
all
clean
:
rm
-f
$(
ANALYZE
R_LIBS)
$(
shell
find
.
-name
"*.o"
)
rm
-f
$(
OPERATO
R_LIBS)
$(
shell
find
.
-name
"*.o"
)
rm
-f
../common/src/sensornavigator.o
$(OBJS)
:
%.o : %.cpp
install_
analyzer
:
$(ANALYZE
R_LIBS)
install_
operator
:
$(OPERATO
R_LIBS)
install
$^
$(DCDBDEPLOYPATH)
/lib/
install_conf
:
$(foreach p
,
$(
ANALYZE
RS)
,
config/$(p).conf)
install_conf
:
$(foreach p
,
$(
OPERATO
RS)
,
config/$(p).conf)
install
-m
644
$^
$(DCDBDEPLOYPATH)
/etc/
install
:
install_
analyze
r
install
:
install_
operato
r
@
echo
"Done with installation."
@
echo
"====================================="
@
echo
"To copy the configuration files type:"
@
echo
" > make install_conf"
analyze
rs/%.o
:
CXXFLAGS+= $(PLUGINFLAGS)
operato
rs/%.o
:
CXXFLAGS+= $(PLUGINFLAGS)
../common/src/sensornavigator.o
:
CXXFLAGS+= $(PLUGINFLAGS)
libdcdb
analyze
r_aggregator.$(LIBEXT)
:
analyze
rs/aggregator/Aggregator
Analyzer.o analyze
rs/aggregator/AggregatorConfigurator.o ../common/src/sensornavigator.o
libdcdb
operato
r_aggregator.$(LIBEXT)
:
operato
rs/aggregator/Aggregator
Operator.o operato
rs/aggregator/AggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX)
$(LIBFLAGS)$@
-o
$@
$^
-L
$(DCDBDEPLOYPATH)
/lib/
-lboost_log
-lboost_system
-lboost_regex
libdcdb
analyze
r_regressor.$(LIBEXT)
:
analyze
rs/regressor/Regressor
Analyzer.o analyze
rs/regressor/RegressorConfigurator.o ../common/src/sensornavigator.o
libdcdb
operato
r_regressor.$(LIBEXT)
:
operato
rs/regressor/Regressor
Operator.o operato
rs/regressor/RegressorConfigurator.o ../common/src/sensornavigator.o
$(CXX)
$(LIBFLAGS)$@
-o
$@
$^
-L
$(DCDBDEPLOYPATH)
/lib/
-lboost_log
-lboost_system
-lboost_regex
-lopencv_core
-lopencv_ml
libdcdb
analyze
r_job_aggregator.$(LIBEXT)
:
analyze
rs/aggregator/Aggregator
Analyzer.o analyze
rs/aggregator/JobAggregator
Analyzer.o analyze
rs/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
libdcdb
operato
r_job_aggregator.$(LIBEXT)
:
operato
rs/aggregator/Aggregator
Operator.o operato
rs/aggregator/JobAggregator
Operator.o operato
rs/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX)
$(LIBFLAGS)$@
-o
$@
$^
-L
$(DCDBDEPLOYPATH)
/lib/
-lboost_log
-lboost_system
-lboost_regex
libdcdb
analyze
r_tester
analyze
r.$(LIBEXT)
:
analyze
rs/tester
analyze
r/Tester
Analyzer.o analyze
rs/tester
analyze
r/Tester
Analyze
rConfigurator.o ../common/src/sensornavigator.o
libdcdb
operato
r_tester
operato
r.$(LIBEXT)
:
operato
rs/tester
operato
r/Tester
Operator.o operato
rs/tester
operato
r/Tester
Operato
rConfigurator.o ../common/src/sensornavigator.o
$(CXX)
$(LIBFLAGS)$@
-o
$@
$^
-L
$(DCDBDEPLOYPATH)
/lib/
-lboost_log
-lboost_system
-lboost_regex
libdcdb
analyze
r_filesink.$(LIBEXT)
:
analyze
rs/filesink/Filesink
Analyzer.o analyze
rs/filesink/FilesinkConfigurator.o ../common/src/sensornavigator.o
libdcdb
operato
r_filesink.$(LIBEXT)
:
operato
rs/filesink/Filesink
Operator.o operato
rs/filesink/FilesinkConfigurator.o ../common/src/sensornavigator.o
$(CXX)
$(LIBFLAGS)$@
-o
$@
$^
-L
$(DCDBDEPLOYPATH)
/lib/
-lboost_log
-lboost_system
-lboost_regex
libdcdb
analyze
r_smucngperf
analyzer
.$(LIBEXT)
:
analyze
rs/smucngperf
analyzer
/SMUCNGPerf
Analyzer.o analyze
rs/smucngperf
analyzer
/SMUCNGPerfConfigurator.o ../common/src/sensornavigator.o
libdcdb
operato
r_smucngperf.$(LIBEXT)
:
operato
rs/smucngperf/SMUCNGPerf
Operator.o operato
rs/smucngperf/SMUCNGPerfConfigurator.o ../common/src/sensornavigator.o
$(CXX)
$(LIBFLAGS)$@
-o
$@
$^
-L
$(DCDBDEPLOYPATH)
/lib/
-lboost_log
-lboost_system
-lboost_regex
analytics/
Analytics
Manager.cpp
→
analytics/
Operator
Manager.cpp
View file @
e95764be
This diff is collapsed.
Click to expand it.
analytics/
Analytics
Manager.h
→
analytics/
Operator
Manager.h
View file @
e95764be
This diff is collapsed.
Click to expand it.
analytics/README.md
View file @
e95764be
This diff is collapsed.
Click to expand it.
analytics/config/smucngperf
analyzer
.conf
→
analytics/config/smucngperf.conf
View file @
e95764be
File moved
analytics/config/tester
analyze
r.conf
→
analytics/config/tester
operato
r.conf
View file @
e95764be
...
...
@@ -2,14 +2,14 @@ global {
mqttPrefix
/
test
}
template_
analyze
r
def1
{
template_
operato
r
def1
{
interval
1000
minValues
3
duplicate
false
streaming
true
}
analyze
r
tes1
{
operato
r
tes1
{
default
def1
window
2000
relative
false
...
...
@@ -32,7 +32,7 @@ relative false
}
analyze
r
tes2
{
operato
r
tes2
{
default
def1
interval
1500
relative
true
...
...
analytics/includes/Job
Analyze
rConfiguratorTemplate.h
→
analytics/includes/Job
Operato
rConfiguratorTemplate.h
View file @
e95764be
//================================================================================
// Name : Job
Analyze
rConfiguratorTemplate.h
// Name : Job
Operato
rConfiguratorTemplate.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template that implements a configurator for Job
analyze
r plugins.
// Description : Template that implements a configurator for Job
operato
r plugins.
//================================================================================
//================================================================================
...
...
@@ -25,99 +25,97 @@
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_JOB
ANALYZE
RCONFIGURATORTEMPLATE_H
#define PROJECT_JOB
ANALYZE
RCONFIGURATORTEMPLATE_H
#ifndef PROJECT_JOB
OPERATO
RCONFIGURATORTEMPLATE_H
#define PROJECT_JOB
OPERATO
RCONFIGURATORTEMPLATE_H
#include "
Analyze
rConfiguratorTemplate.h"
#include "Job
Analyze
rTemplate.h"
#include "
Operato
rConfiguratorTemplate.h"
#include "Job
Operato
rTemplate.h"
/**
* @brief Template that implements a configurator for Job
analyze
r plugins.
* @brief Template that implements a configurator for Job
operato
r plugins.
*
* @details This template expands the standard
Analyze
rConfiguratorTemplate,
* @details This template expands the standard
Operato
rConfiguratorTemplate,
* with very few changes to accomodate the different design of job
*
analyze
rs.
*
operato
rs.
*
* @ingroup
analyze
r
* @ingroup
operato
r
*/
template
<
class
Analyze
r
,
class
SBase
=
SensorBase
>
class
Job
Analyze
rConfiguratorTemplate
:
virtual
public
Analyze
rConfiguratorTemplate
<
Analyze
r
,
SBase
>
{
template
<
class
Operato
r
,
class
SBase
=
SensorBase
>
class
Job
Operato
rConfiguratorTemplate
:
virtual
public
Operato
rConfiguratorTemplate
<
Operato
r
,
SBase
>
{
// Verifying the types of input classes
static_assert
(
std
::
is_base_of
<
SensorBase
,
SBase
>::
value
,
"SBase must derive from SensorBase!"
);
static_assert
(
std
::
is_base_of
<
Analyze
rInterface
,
Analyze
r
>::
value
,
"
Analyze
r must derive from
Analyze
rInterface!"
);
static_assert
(
std
::
is_base_of
<
Operato
rInterface
,
Operato
r
>::
value
,
"
Operato
r must derive from
Operato
rInterface!"
);
protected:
// For readability
using
A
_Ptr
=
std
::
shared_ptr
<
Analyze
r
>
;
using
O
_Ptr
=
std
::
shared_ptr
<
Operato
r
>
;
public:
/**
* @brief Class constructor
*/
Job
Analyze
rConfiguratorTemplate
()
:
Analyze
rConfiguratorTemplate
<
Analyze
r
,
SBase
>
()
{}
Job
Operato
rConfiguratorTemplate
()
:
Operato
rConfiguratorTemplate
<
Operato
r
,
SBase
>
()
{}
/**
* @brief Copy constructor is not available
*/
Job
Analyze
rConfiguratorTemplate
(
const
Job
Analyze
rConfiguratorTemplate
&
)
=
delete
;
Job
Operato
rConfiguratorTemplate
(
const
Job
Operato
rConfiguratorTemplate
&
)
=
delete
;
/**
* @brief Assignment operator is not available
*/
Job
Analyze
rConfiguratorTemplate
&
operator
=
(
const
Job
Analyze
rConfiguratorTemplate
&
)
=
delete
;
Job
Operato
rConfiguratorTemplate
&
operator
=
(
const
Job
Operato
rConfiguratorTemplate
&
)
=
delete
;
/**
* @brief Class destructor
*/
virtual
~
Job
Analyze
rConfiguratorTemplate
()
{}
virtual
~
Job
Operato
rConfiguratorTemplate
()
{}
protected:
/**
* @brief Instantiates all necessary units for a single (job)
analyze
r
* @brief Instantiates all necessary units for a single (job)
operato
r
*
* When using job
analyze
rs, the only unit that is always instantiated is ALWAYS the template
* unit, similarly to ordinary
analyze
rs in on-demand mode. Such unit then is used at runtime,
* When using job
operato
rs, the only unit that is always instantiated is ALWAYS the template
* unit, similarly to ordinary
operato
rs in on-demand mode. Such unit then is used at runtime,
* even in streaming mode, to build dynamically all appropriate units for jobs that are
* currently running in the system.
*
* @param
an
The
analyze
r whose units must be created
* @param
op
The
operato
r whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
*/
virtual
bool
readUnits
(
Analyzer
&
an
,
std
::
vector
<
shared_ptr
<
SBase
>>&
protoInputs
,
std
::
vector
<
shared_ptr
<
SBase
>>&
protoOutputs
,
inputMode_t
inputMode
)
{
// Forcing the job
analyze
r to not be duplicated
an
.
setDuplicate
(
false
);
virtual
bool
readUnits
(
Operator
&
op
,
std
::
vector
<
shared_ptr
<
SBase
>>&
protoInputs
,
std
::
vector
<
shared_ptr
<
SBase
>>&
protoOutputs
,
inputMode_t
inputMode
)
{
// Forcing the job
operato
r to not be duplicated
op
.
setDuplicate
(
false
);
vector
<
shared_ptr
<
UnitTemplate
<
SBase
>>>
*
units
=
NULL
;
try
{
units
=
this
->
_unitGen
.
generateUnits
(
protoInputs
,
protoOutputs
,
inputMode
,
MQTTChecker
::
formatTopic
(
this
->
_mqttPrefix
)
+
MQTTChecker
::
formatTopic
(
an
.
getMqttPart
()),
true
,
an
.
getRelaxed
());
units
=
this
->
_unitGen
.
generateUnits
(
protoInputs
,
protoOutputs
,
inputMode
,
op
.
getMqttPart
(),
true
,
op
.
getRelaxed
());
}
catch
(
const
std
::
exception
&
e
)
{
LOG
(
error
)
<<
this
->
_
analyze
rName
<<
" "
<<
an
.
getName
()
<<
": Error when creating template job unit: "
<<
e
.
what
();
LOG
(
error
)
<<
this
->
_
operato
rName
<<
" "
<<
op
.
getName
()
<<
": Error when creating template job unit: "
<<
e
.
what
();
delete
units
;
return
false
;
}
if
(
units
->
size
()
>
1
)
{
LOG
(
error
)
<<
this
->
_
analyze
rName
<<
" "
<<
an
.
getName
()
<<
": Invalid job template unit, please check your configuration!"
;
LOG
(
error
)
<<
this
->
_
operato
rName
<<
" "
<<
op
.
getName
()
<<
": Invalid job template unit, please check your configuration!"
;
delete
units
;
return
false
;
}
shared_ptr
<
UnitTemplate
<
SBase
>>
jobUnit
=
units
->
at
(
0
);
delete
units
;
an
.
clearUnits
();
op
.
clearUnits
();
//if(!this->constructSensorTopics(*jobUnit,
an
))
//if(!this->constructSensorTopics(*jobUnit,
op
))
// return false;
if
(
this
->
unit
(
*
jobUnit
))
{
an
.
addToUnitCache
(
jobUnit
);
op
.
addToUnitCache
(
jobUnit
);
LOG
(
debug
)
<<
" Template job unit "
+
jobUnit
->
getName
()
+
" generated."
;
}
else
{
LOG
(
error
)
<<
" Template job unit "
<<
jobUnit
->
getName
()
<<
" did not pass the final check!"
;
...
...
@@ -130,4 +128,4 @@ protected:
boost
::
log
::
sources
::
severity_logger
<
boost
::
log
::
trivial
::
severity_level
>
lg
;
};
#endif //PROJECT_JOB
ANALYZE
RCONFIGURATORTEMPLATE_H
#endif //PROJECT_JOB
OPERATO
RCONFIGURATORTEMPLATE_H
analytics/includes/Job
Analyze
rTemplate.h
→
analytics/includes/Job
Operato
rTemplate.h
View file @
e95764be
//================================================================================
// Name : Job
Analyze
rTemplate.h
// Name : Job
Operato
rTemplate.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template implementing features needed by
Analyze
rs.
// Description : Template implementing features needed by
Operato
rs.
//================================================================================
//================================================================================
...
...
@@ -25,22 +25,22 @@
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_JOB
ANALYZE
RTEMPLATE_H
#define PROJECT_JOB
ANALYZE
RTEMPLATE_H
#ifndef PROJECT_JOB
OPERATO
RTEMPLATE_H
#define PROJECT_JOB
OPERATO
RTEMPLATE_H
#include "
Analyze
rTemplate.h"
#include "
Operato
rTemplate.h"
/**
* @brief Template that implements features needed by Job
Analyze
rs and
* complying to
Analyze
rInterface.
* @brief Template that implements features needed by Job
Operato
rs and
* complying to
Operato
rInterface.
*
* @details This template is derived from
Analyze
rTemplate, and is adjusted to
* @details This template is derived from
Operato
rTemplate, and is adjusted to
* simplify job-related computations.
*
* @ingroup
analyze
r
* @ingroup
operato
r
*/
template
<
typename
S
>
class
Job
Analyze
rTemplate
:
virtual
public
Analyze
rTemplate
<
S
>
{
class
Job
Operato
rTemplate
:
virtual
public
Operato
rTemplate
<
S
>
{
// The template shall only be instantiated for classes which derive from SensorBase
static_assert
(
is_base_of
<
SensorBase
,
S
>::
value
,
"S must derive from SensorBase!"
);
...
...
@@ -55,10 +55,10 @@ public:
/**
* @brief Class constructor
*
* @param name Name of the
analyze
r
* @param name Name of the
operato
r
*/
Job
Analyze
rTemplate
(
const
string
name
)
:
Analyze
rTemplate
<
S
>
(
name
),
Job
Operato
rTemplate
(
const
string
name
)
:
Operato
rTemplate
<
S
>
(
name
),
_jobDataVec
(
nullptr
)
{
_unitAccess
.
store
(
false
);
...
...
@@ -69,8 +69,8 @@ public:
* @brief Copy constructor
*
*/
Job
Analyze
rTemplate
(
const
Job
Analyze
rTemplate
&
other
)
:
Analyze
rTemplate
<
S
>
(
other
),
Job
Operato
rTemplate
(
const
Job
Operato
rTemplate
&
other
)
:
Operato
rTemplate
<
S
>
(
other
),
_jobDataVec
(
nullptr
)
{
_unitAccess
.
store
(
false
);
...
...
@@ -81,8 +81,8 @@ public:
* @brief Assignment operator
*
*/
Job
Analyze
rTemplate
&
operator
=
(
const
Job
Analyze
rTemplate
&
other
)
{
Analyze
rTemplate
<
S
>::
operator
=
(
other
);
Job
Operato
rTemplate
&
operator
=
(
const
Job
Operato
rTemplate
&
other
)
{
Operato
rTemplate
<
S
>::
operator
=
(
other
);
_jobDataVec
=
nullptr
;
this
->
_dynamic
=
true
;
return
*
this
;
...
...
@@ -91,20 +91,20 @@ public:
/**
* @brief Class destructor
*/
virtual
~
Job
Analyze
rTemplate
()
{
virtual
~
Job
Operato
rTemplate
()
{
if
(
_jobDataVec
)
delete
_jobDataVec
;
}
/**
* @brief Returns the units of this
analyze
r
* @brief Returns the units of this
operato
r
*
* The units returned by this method are of the UnitInterface type. The actual units, in their
* derived type, are used internally. This type of
analyze
r employs dynamic units that are
* derived type, are used internally. This type of
operato
r employs dynamic units that are
* generated at runtime: as such, an internal unit lock is acquired upon calling this method,
* and must later be released through the releaseUnits() method.
*
* @return The vector of UnitInterface objects of this
analyze
r
* @return The vector of UnitInterface objects of this
operato
r
*/
virtual
vector
<
UnitPtr
>&
getUnits
()
override
{
// Spinlock to regulate access to units - normally innocuous
...
...
@@ -122,17 +122,17 @@ public:
}
/**
* @brief Initializes this
analyze
r
* @brief Initializes this
operato
r
*
* @param io Boost ASIO service to be used
*/
virtual
void
init
(
boost
::
asio
::
io_service
&
io
)
override
{
Analyze
rInterface
::
init
(
io
);
}
virtual
void
init
(
boost
::
asio
::
io_service
&
io
)
override
{
Operato
rInterface
::
init
(
io
);
}
/**
* @brief Performs an on-demand compute task
*
* Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
* perform data analytics queries on the
analyze
r, which must have the _streaming attribute set
* perform data analytics queries on the
operato
r, which must have the _streaming attribute set
* to false. A unit is generated on the fly, corresponding to the input node given as input,
* and results are returned in the form of a map.
*
...
...
@@ -143,7 +143,7 @@ public:
map
<
string
,
reading_t
>
outMap
;
if
(
!
this
->
_streaming
)
{
try
{
// Getting exclusive access to the
analyze
r
// Getting exclusive access to the
operato
r
while
(
this
->
_onDemandLock
.
exchange
(
true
)
)
{}
uint32_t
jobId
=
MQTTChecker
::
topicToJob
(
node
);
if
(
_jobDataVec
)
...
...
@@ -166,7 +166,7 @@ public:
}
}
}
else
throw
std
::
runtime_error
(
"
Analyze
r "
+
this
->
_name
+
": cannot retrieve job data!"
);
throw
std
::
runtime_error
(
"
Operato
r "
+
this
->
_name
+
": cannot retrieve job data!"
);
}
catch
(
const
exception
&
e
)
{
this
->
_onDemandLock
.
store
(
false
);
throw
;
...
...
@@ -192,20 +192,20 @@ public:
if
(
!
found
)
throw
std
::
domain_error
(
"Job "
+
node
+
" does not belong to the domain of "
+
this
->
_name
+
"!"
);
}
else
throw
std
::
runtime_error
(
"
Analyze
r "
+
this
->
_name
+
": not available for on-demand query!"
);
throw
std
::
runtime_error
(
"
Operato
r "
+
this
->
_name
+
": not available for on-demand query!"
);
return
outMap
;
}
protected:
using
Analyze
rTemplate
<
S
>::
compute
;
using
Operato
rTemplate
<
S
>::
compute
;
/**
* @brief Data analytics (job) computation logic
*
* This method contains the actual logic used by the analyzed, and is automatically called by
* the computeAsync method. This variant of the compute() method defined in
Analyze
rTemplate also
* includes a job data structure in its list of arguments, and is specialized for job
analyze
rs.
* the computeAsync method. This variant of the compute() method defined in
Operato
rTemplate also
* includes a job data structure in its list of arguments, and is specialized for job
operato
rs.
*
* @param unit Shared pointer to unit to be processed
* @param jobData Job data structure
...
...
@@ -216,7 +216,7 @@ protected:
* @brief This method encapsulates all logic to generate and manage job units
*
* The algorithm implemented in this method is very similar to that used in computeOnDemand in
*
Analyze
rTemplate, and it is used to manage job units both in on-demand and streaming mode. The
*
Operato
rTemplate, and it is used to manage job units both in on-demand and streaming mode. The
* internal unit cache is used to store recent job units. Moreover, the job data returned by the
* QueryEngine is converted to a format compatible with the UnitGenerator.
*
...
...
@@ -227,18 +227,18 @@ protected:
string
jobTopic
=
MQTTChecker
::
jobToTopic
(
jobData
.
jobId
);
U_Ptr
jobUnit
=
nullptr
;
if
(
!
this
->
_unitCache
)
throw
std
::
runtime_error
(
"Initialization error in
analyze
r "
+
this
->
_name
+
"!"
);
throw
std
::
runtime_error
(
"Initialization error in
operato
r "
+
this
->
_name
+
"!"
);
if
(
this
->
_unitCache
->
count
(
jobTopic
))
{
jobUnit
=
this
->
_unitCache
->
at
(
jobTopic
);
if
(
!
this
->
_streaming
)
LOG
(
debug
)
<<
"
Analyze
r "
<<
this
->
_name
<<
": cache hit for unit "
<<
jobTopic
<<
"."
;
LOG
(
debug
)
<<
"
Operato
r "
<<
this
->
_name
<<
": cache hit for unit "
<<
jobTopic
<<
"."
;
}
else
{
if
(
!
this
->
_unitCache
->
count
(
SensorNavigator
::
templateKey
))
throw
std
::
runtime_error
(
"No template unit in
analyze
r "
+
this
->
_name
+
"!"
);
throw
std
::
runtime_error
(
"No template unit in
operato
r "
+
this
->
_name
+
"!"
);
if
(
!
this
->
_streaming
)
LOG
(
debug
)
<<
"
Analyze
r "
<<
this
->
_name
<<
": cache miss for unit "
<<
jobTopic
<<
"."
;
LOG
(
debug
)
<<
"
Operato
r "
<<
this
->
_name
<<
": cache miss for unit "
<<
jobTopic
<<
"."
;
U_Ptr
uTemplate
=
this
->
_unitCache
->
at
(
SensorNavigator
::
templateKey
);
shared_ptr
<
SensorNavigator
>
navi
=
this
->
_queryEngine
.
getNavigator
();
UnitGenerator
<
S
>
unitGen
(
navi
);
...
...
@@ -274,10 +274,10 @@ protected:
* @brief Performs a compute task
*
* This method is tasked with scheduling the next compute task, and invoking the internal
* compute() method, which encapsulates the real logic of the
analyze
r. The compute method
* is automatically called over units as required by the
Analyze
r's configuration.
* compute() method, which encapsulates the real logic of the
operato
r. The compute method
* is automatically called over units as required by the
operato
r's configuration.
*
* In the case of job
analyze
rs, this method will also automatically retrieve the list of jobs
* In the case of job
operato
rs, this method will also automatically retrieve the list of jobs
* that were running in the last interval. One unit for each of them is instantiated (or
* retrieved from the local unit cache, if available) and then the compute phase starts.
*
...
...
@@ -286,7 +286,7 @@ protected:
if
(
this
->
_delayInterval
>
0
)
{
sleep
(
this
->
_delayInterval
);
this
->
_delayInterval
=
0
;
LOG
(
info
)
<<
"
Analyze
r "
+
this
->
_name
+
": starting computation after delayed start!"
;
LOG
(
info
)
<<
"
Operato
r "
+
this
->
_name
+
": starting computation after delayed start!"
;
}
try
{
...
...
@@ -325,16 +325,16 @@ protected:
_tempUnits
.
clear
();
}
else
LOG
(
error
)
<<
"
Analyze
r "
+
this
->
_name
+
": cannot retrieve job data!"
;
LOG
(
error
)
<<
"
Operato
r "
+
this
->
_name
+
": cannot retrieve job data!"
;
}
catch
(
const
exception
&
e
)
{
LOG
(
error
)
<<
"
Analyze
r "
+
this
->
_name
+
": internal error "
+
e
.
what
()
+
" during computation!"
;
LOG
(
error
)
<<
"
Operato
r "
+
this
->
_name
+
": internal error "
+
e
.
what
()
+
" during computation!"
;
_unitAccess
.
store
(
false
);
}
if
(
this
->
_timer
&&
this
->
_keepRunning
)
{
this
->
_timer
->
expires_at
(
timestamp2ptime
(
this
->
nextReadingTime
()));
this
->
_pendingTasks
++
;
this
->
_timer
->
async_wait
(
bind
(
&
Job
Analyze
rTemplate
::
computeAsync
,
this
));
this
->
_timer
->
async_wait
(
bind
(
&
Job
Operato
rTemplate
::
computeAsync
,
this
));
}
this
->
_pendingTasks
--
;
}
...
...
@@ -351,4 +351,4 @@ protected:
};
#endif //PROJECT_JOB
ANALYZE
RTEMPLATE_H
#endif //PROJECT_JOB
OPERATO
RTEMPLATE_H
analytics/includes/
Analyze
rConfiguratorInterface.h
→
analytics/includes/
Operato
rConfiguratorInterface.h
View file @
e95764be
//================================================================================
// Name :
Analyze
rConfiguratorInterface.h
// Name :
Operato
rConfiguratorInterface.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Interface to configurators for
data analyze
r plugins.
// Description : Interface to configurators for
operato
r plugins.
//================================================================================
//================================================================================
...
...
@@ -25,44 +25,44 @@
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_
ANALYZE
RCONFIGURATORINTERFACE_H
#define PROJECT_
ANALYZE
RCONFIGURATORINTERFACE_H
#ifndef PROJECT_
OPERATO
RCONFIGURATORINTERFACE_H
#define PROJECT_
OPERATO
RCONFIGURATORINTERFACE_H
#include <string>
#include <vector>
#include "QueryEngine.h"
#include "
Analyze
rInterface.h"
#include "
Operato
rInterface.h"
#include "globalconfiguration.h"
/**
* @brief Interface to configurators for
data analyze
r plugins
* @brief Interface to configurators for
operato
r plugins
*
* @details This interface is the one exposed outside of the .dll library
* containing a plugin. Classes implementing this interface must
* perform configuration and instantiation of
Analyzers
objects, which
* perform configuration and instantiation of
Operator
objects, which
* are made accessible externally.
*
* @ingroup
analyze
r
* @ingroup
operato
r
*/
class
Analyze
rConfiguratorInterface
{
class
Operato
rConfiguratorInterface
{
public:
/**
* @brief Class constructor
*/
Analyze
rConfiguratorInterface
()
{}
Operato
rConfiguratorInterface
()
{}
/**
* @brief Class destructor
*/
virtual
~
Analyze
rConfiguratorInterface
()
{}
virtual
~
Operato
rConfiguratorInterface
()
{}
/**
* @brief Reads a config file and instantiates
analyze
rs
* @brief Reads a config file and instantiates
operato
rs
*
* This method will read the config file to which the input path points, and instantiate
analyze
r
* This method will read the config file to which the input path points, and instantiate
operato
r
* objects accordingly. This method must be implemented in derived classes.
*
* @param cfgPath Path of the input config file
...
...
@@ -73,7 +73,7 @@ public:
/**
* @brief Repeats the configuration of the plugin
*
* This method will stop and clear all
analyze
rs that were created, and repeats the configuration,
* This method will stop and clear all
operato
rs that were created, and repeats the configuration,
* by reading the file once again. This method must be implemented in derived classes.
*
* @return True if successful, false otherwise
...
...
@@ -83,14 +83,14 @@ public:
/**
* @brief Clears the plugin configuration
*
* This method will stop and clear all
analyze
rs that were created, returning the plugin to its
* This method will stop and clear all
operato
rs that were created, returning the plugin to its
* uninitialized state.
*
*/
virtual
void
clearConfig
()
=
0
;
/**
* @brief Sets a structure containing global settings to be used during
analyze
r creation This
* @brief Sets a structure containing global settings to be used during
operato
r creation This
* method must be implemented in derived classes.
*
*
...
...
@@ -99,14 +99,14 @@ public:
virtual
void
setGlobalSettings
(
const
pluginSettings_t
&
pluginSettings
)
=
0
;
/**
* @brief Returns a vector of
analyze
rs