Commit 441b212e authored by Daniele Tafani's avatar Daniele Tafani
Browse files

Merge remote-tracking branch 'origin/development' into grafana

Still works on OpenSSL 1.0.2
Fixed config and Makefile
parents 3d755845 926c2cb8
......@@ -8,7 +8,7 @@ SOURCEFORGE_MROR = vorboss
CASSANDRA_VERSION = 3.0.18
MOSQUITTO_VERSION = 1.5.5
BOOST_VERSION = 1.70.0
OPENSSL_VERSION = 1.1.1c
OPENSSL_VERSION = 1.0.2l
CPPDRV_VERSION = 2.10.0
LIBUV_VERSION = 1.24.0
BACNET-STACK_VERSION = 0.8.6
......@@ -36,11 +36,11 @@ DISTFILES = apache-cassandra-$(CASSANDRA_VERSION).tar.gz;http://archive.apache.o
DISTFILES_HASHES = apache-cassandra-3.0.18.tar.gz|94dbdaa58b366166c53f881b8e266bc8;\
mosquitto-1.5.5.tar.gz|a17dffc6f63b2a4ab2eb5c51139e60e9;\
boost_1_70_0.tar.gz|fea771fe8176828fabf9c09242ee8c26;\
openssl-1.1.1c.tar.gz|15e21da6efe8aa0e0768ffd8cd37a5f6;\
openssl-1.0.2l.tar.gz|f85123cd390e864dfbe517e7616e6566;\
cpp-driver-2.10.0.tar.gz|6d15dd2cccd2efd1fdc86089d26971d0;\
libuv-v1.24.0.tar.gz|90320330757253b07404d2a97f59c66b;\
cpprestsdk-2.10.6.tar.gz|0a9b2424578fbeb1ac8465173ce8fc71; \
cpp-netlib-0.12.0-final.tar.gz|29b87c0e8c1a9e7fbdea5afcec947d53i; \
cpp-netlib-0.12.0-final.tar.gz|29b87c0e8c1a9e7fbdea5afcec947d53; \
bacnet-stack-0.8.6.tgz|544ebd42ed959deb2213209b66bbc460;\
freeipmi-1.6.3.tar.gz|b2d97e20db9b81b460ce1b9dad5bf54e;\
net-snmp-5.8.tar.gz|63bfc65fbb86cdb616598df1aff6458a; \
......@@ -278,7 +278,8 @@ $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.built: $(DCDBDEPSPATH)/bac
$(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.installed: $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.built | $(DCDBDEPLOYPATH)
@echo "Installing BACNet-Stack..."
install $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/include/* -D -t /$(DCDBDEPLOYPATH)/include/bacnet && \
mkdir $(DCDBDEPLOYPATH)/include/bacnet && \
install $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/include/* /$(DCDBDEPLOYPATH)/include/bacnet && \
install $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/lib/libbacnet.a /$(DCDBDEPLOYPATH)/lib/ && touch $(@)
$(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.built: $(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.patched
......
......@@ -52,7 +52,11 @@ $ ./dcdbpusher config/
* Alessio Netti, Micha Mueller, Axel Auweter, Carla Guillen, Michael Ott, Daniele Tafani and Martin Schulz. _From Facility to Application Sensor Data: Modular, Continuous and Holistic Monitoring with DCDB_. Proceedings of the International Conference for High Performance Computing, Networking, Storage, and Analysis (SC) 2019. [arXiv pre-print available here.](https://arxiv.org/abs/1906.07509)
## License and Copyright
## Contact, Copyright and License
DCDB was created at Leibniz Supercomputing Centre (LRZ).
For questions and/or suggestions please contact info@dcdb.it
Copyright (C) 2011-2019 Leibniz Supercomputing Centre
......
......@@ -4,7 +4,7 @@ include ../config.mk
CXXFLAGS += -DBOOST_NETWORK_ENABLE_HTTPS -I../common/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPLOYPATH)/include/opencv4
LIBS = -L../lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -rdynamic
ANALYZERS = aggregator regressor job_aggregator
OPERATORS = aggregator regressor job_aggregator testeroperator filesink smucngperf
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
......@@ -16,38 +16,47 @@ else
LIBFLAGS = -shared -Wl,-soname,
PLUGINFLAGS = -fPIC
endif
ANALYZER_LIBS = $(foreach p,$(ANALYZERS),libdcdbanalyzer_$(p).$(LIBEXT))
OPERATOR_LIBS = $(foreach p,$(OPERATORS),libdcdboperator_$(p).$(LIBEXT))
all: $(ANALYZER_LIBS)
all: $(OPERATOR_LIBS)
debug: CXXFLAGS += -DDEBUG
debug: all
clean:
rm -f $(ANALYZER_LIBS) $(shell find . -name "*.o")
rm -f $(OPERATOR_LIBS) $(shell find . -name "*.o")
rm -f ../common/src/sensornavigator.o
$(OBJS) : %.o : %.cpp
install_analyzer: $(ANALYZER_LIBS)
install_operator: $(OPERATOR_LIBS)
install $^ $(DCDBDEPLOYPATH)/lib/
install_conf: $(foreach p,global $(ANALYZERS),config/$(p).conf)
install_conf: $(foreach p,$(OPERATORS),config/$(p).conf)
install -m 644 $^ $(DCDBDEPLOYPATH)/etc/
install: install_analyzer
install: install_operator
@echo "Done with installation."
@echo "====================================="
@echo "To copy the configuration files type:"
@echo " > make install_conf"
analyzers/%.o: CXXFLAGS+= $(PLUGINFLAGS)
operators/%.o: CXXFLAGS+= $(PLUGINFLAGS)
../common/src/sensornavigator.o: CXXFLAGS+= $(PLUGINFLAGS)
libdcdbanalyzer_aggregator.$(LIBEXT): analyzers/aggregator/AggregatorAnalyzer.o analyzers/aggregator/AggregatorConfigurator.o ../common/src/sensornavigator.o
libdcdboperator_aggregator.$(LIBEXT): operators/aggregator/AggregatorOperator.o operators/aggregator/AggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbanalyzer_regressor.$(LIBEXT): analyzers/regressor/RegressorAnalyzer.o analyzers/regressor/RegressorConfigurator.o ../common/src/sensornavigator.o
libdcdboperator_regressor.$(LIBEXT): operators/regressor/RegressorOperator.o operators/regressor/RegressorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
libdcdbanalyzer_job_aggregator.$(LIBEXT): analyzers/aggregator/AggregatorAnalyzer.o analyzers/aggregator/JobAggregatorAnalyzer.o analyzers/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
libdcdboperator_job_aggregator.$(LIBEXT): operators/aggregator/AggregatorOperator.o operators/aggregator/JobAggregatorOperator.o operators/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdboperator_testeroperator.$(LIBEXT): operators/testeroperator/TesterOperator.o operators/testeroperator/TesterOperatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdboperator_filesink.$(LIBEXT): operators/filesink/FilesinkOperator.o operators/filesink/FilesinkConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdboperator_smucngperf.$(LIBEXT): operators/smucngperf/SMUCNGPerfOperator.o operators/smucngperf/SMUCNGPerfConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
This diff is collapsed.
/*
* DerivedMetrics.h
*
* Created on: Jul 11, 2019
* Author: carla
*/
#ifndef ANALYTICS_COMMONDERIVEDMETRICS_DERIVEDMETRICS_H_
#define ANALYTICS_COMMONDERIVEDMETRICS_DERIVEDMETRICS_H_
#include "cacheentry.h"
/**
* For CPI, LoadsToStore, Branch rate, miss branch ratio, etc..
*/
bool calculateMetricRatio(reading_t & dividend, reading_t & divisor,
unsigned int scaling_factor, reading_t & result) {
if(divisor.value > 0){
result.value = (dividend.value / static_cast<float>(divisor.value))*scaling_factor;
result.timestamp = dividend.timestamp;
return true;
}
return false; //Division by zero
}
/** Any generic metric per second. For instance: instructions per second, l2 misses per second **/
bool calculateMetricPerSec(reading_t & metric, uint64_t interval, unsigned int scaling_factor, reading_t & result) {
if(interval > 0) {
result.value = (metric.value / static_cast<float>(interval))*scaling_factor;
result.timestamp = metric.timestamp;
return true;
}
return false; //Division by zero
}
bool calculateFrequency(reading_t & unhaltedRef, reading_t & unhaltedClocks,
unsigned int min_freq, unsigned int max_freq, reading_t & result) {
if(unhaltedRef.value > 0){
result.value = (unhaltedClocks.value / static_cast<float>(unhaltedRef.value)) * max_freq;
if(result.value > (max_freq * 1.1) || result.value < (min_freq*0.9)) { //There is something wrong here...
return false;
}
return true;
}
return false; //Division by zero
}
#endif /* ANALYTICS_COMMONDERIVEDMETRICS_DERIVEDMETRICS_H_ */
template_sink def1 {
interval 1000
streaming true
}
; In this first example we explicitly pick the input sensors and the paths they should be written to
sink s1 {
default def1
autoName false
input {
sensor "/test/cpu0/col_user" {
path /home/col_user_cpu0.log
}
sensor "/test/MemFree" {
path /home/memfree.log
}
}
}
; In this case we enable automatic naming, and use the unit system to automatically pick the col_idle sensors
; associated to all CPUs; these will be written into the specified path, each with a filename corresponding to the
; MQTT topic
sink s2 {
default def1
autoName true
input {
sensor "<bottomup, filter cpu>col_idle" {
path /home/cpudata/
}
}
}
global {
mqttPrefix /test
}
template_supermucngperf def1 {
interval 10000
minValues 3
duplicate false
streaming true
}
supermucngperf cpi {
default def1
input {
sensor "<bottomup>clocks"{
position 0
}
sensor "<bottomup>ref_clocks"{
position 1
}
sensor "<bottomup>instructions" {
position 2
}
}
output {
sensor "<bottomup, filter /login08/cpu>cpi" {
mqttsuffix /cpi
scaling_factor 100
}
sensor "<bottomup, filter /login08/cpu>frequency" {
mqttsuffix /frequency
scaling_factor 1
}
}
}
global {
mqttPrefix /test
}
template_operator def1 {
interval 1000
minValues 3
duplicate false
streaming true
}
operator tes1 {
default def1
window 2000
relative false
input {
sensor "<bottomup>col_user"
sensor "<topdown>MemFree"
}
output {
sensor "<bottomup>queries" {
mqttsuffix /queries
}
}
}
operator tes2 {
default def1
interval 1500
relative true
input {
sensor "<topdown 1>col_user"
sensor "<bottomup 1>MemFree"
}
output {
sensor "<bottomup 1>queries" {
mqttsuffix /queries
}
}
}
//================================================================================
// Name : JobAnalyzerConfiguratorTemplate.h
// Name : JobOperatorConfiguratorTemplate.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template that implements a configurator for Job analyzer plugins.
// Description : Template that implements a configurator for Job operator plugins.
//================================================================================
//================================================================================
......@@ -24,99 +25,97 @@
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_JOBANALYZERCONFIGURATORTEMPLATE_H
#define PROJECT_JOBANALYZERCONFIGURATORTEMPLATE_H
#ifndef PROJECT_JOBOPERATORCONFIGURATORTEMPLATE_H
#define PROJECT_JOBOPERATORCONFIGURATORTEMPLATE_H
#include "AnalyzerConfiguratorTemplate.h"
#include "JobAnalyzerTemplate.h"
#include "OperatorConfiguratorTemplate.h"
#include "JobOperatorTemplate.h"
/**
* @brief Template that implements a configurator for Job analyzer plugins.
* @brief Template that implements a configurator for Job operator plugins.
*
* @details This template expands the standard AnalyzerConfiguratorTemplate,
* @details This template expands the standard OperatorConfiguratorTemplate,
* with very few changes to accomodate the different design of job
* analyzers.
* operators.
*
* @ingroup analyzer
* @ingroup operator
*/
template <class Analyzer, class SBase = SensorBase>
class JobAnalyzerConfiguratorTemplate : virtual public AnalyzerConfiguratorTemplate<Analyzer, SBase> {
template <class Operator, class SBase = SensorBase>
class JobOperatorConfiguratorTemplate : virtual public OperatorConfiguratorTemplate<Operator, SBase> {
// Verifying the types of input classes
static_assert(std::is_base_of<SensorBase, SBase>::value, "SBase must derive from SensorBase!");
static_assert(std::is_base_of<AnalyzerInterface, Analyzer>::value, "Analyzer must derive from AnalyzerInterface!");
static_assert(std::is_base_of<OperatorInterface, Operator>::value, "Operator must derive from OperatorInterface!");
protected:
// For readability
using A_Ptr = std::shared_ptr<Analyzer>;
using O_Ptr = std::shared_ptr<Operator>;
public:
/**
* @brief Class constructor
*/
JobAnalyzerConfiguratorTemplate() : AnalyzerConfiguratorTemplate<Analyzer, SBase>() {}
JobOperatorConfiguratorTemplate() : OperatorConfiguratorTemplate<Operator, SBase>() {}
/**
* @brief Copy constructor is not available
*/
JobAnalyzerConfiguratorTemplate(const JobAnalyzerConfiguratorTemplate&) = delete;
JobOperatorConfiguratorTemplate(const JobOperatorConfiguratorTemplate&) = delete;
/**
* @brief Assignment operator is not available
*/
JobAnalyzerConfiguratorTemplate& operator=(const JobAnalyzerConfiguratorTemplate&) = delete;
JobOperatorConfiguratorTemplate& operator=(const JobOperatorConfiguratorTemplate&) = delete;
/**
* @brief Class destructor
*/
virtual ~JobAnalyzerConfiguratorTemplate() {}
virtual ~JobOperatorConfiguratorTemplate() {}
protected:
/**
* @brief Instantiates all necessary units for a single (job) analyzer
* @brief Instantiates all necessary units for a single (job) operator
*
* When using job analyzers, the only unit that is always instantiated is ALWAYS the template
* unit, similarly to ordinary analyzers in on-demand mode. Such unit then is used at runtime,
* When using job operators, the only unit that is always instantiated is ALWAYS the template
* unit, similarly to ordinary operators in on-demand mode. Such unit then is used at runtime,
* even in streaming mode, to build dynamically all appropriate units for jobs that are
* currently running in the system.
*
* @param an The analyzer whose units must be created
* @param op The operator whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
*/
virtual bool readUnits(Analyzer& an, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
// Forcing the job analyzer to not be duplicated
an.setDuplicate(false);
virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
// Forcing the job operator to not be duplicated
op.setDuplicate(false);
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = this->_unitGen.generateUnits(protoInputs, protoOutputs, inputMode,
MQTTChecker::formatTopic(this->_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()),
true, an.getRelaxed());
units = this->_unitGen.generateUnits(protoInputs, protoOutputs, inputMode, op.getMqttPart(), true, op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << this->_analyzerName << " " << an.getName() << ": Error when creating template job unit: " << e.what();
LOG(error) << this->_operatorName << " " << op.getName() << ": Error when creating template job unit: " << e.what();
delete units;
return false;
}
if(units->size() > 1) {
LOG(error) << this->_analyzerName << " " << an.getName() << ": Invalid job template unit, please check your configuration!";
LOG(error) << this->_operatorName << " " << op.getName() << ": Invalid job template unit, please check your configuration!";
delete units;
return false;
}
shared_ptr<UnitTemplate<SBase>> jobUnit = units->at(0);
delete units;
an.clearUnits();
op.clearUnits();
//if(!this->constructSensorTopics(*jobUnit, an))
//if(!this->constructSensorTopics(*jobUnit, op))
// return false;
if (this->unit(*jobUnit)) {
an.addToUnitCache(jobUnit);
op.addToUnitCache(jobUnit);
LOG(debug) << " Template job unit " + jobUnit->getName() + " generated.";
} else {
LOG(error) << " Template job unit " << jobUnit->getName() << " did not pass the final check!";
......@@ -129,4 +128,4 @@ protected:
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
#endif //PROJECT_JOBANALYZERCONFIGURATORTEMPLATE_H
#endif //PROJECT_JOBOPERATORCONFIGURATORTEMPLATE_H
//================================================================================
// Name : JobAnalyzerTemplate.h
// Name : JobOperatorTemplate.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template implementing features needed by Analyzers.
// Description : Template implementing features needed by Operators.
//================================================================================
//================================================================================
......@@ -24,22 +25,22 @@
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_JOBANALYZERTEMPLATE_H
#define PROJECT_JOBANALYZERTEMPLATE_H
#ifndef PROJECT_JOBOPERATORTEMPLATE_H
#define PROJECT_JOBOPERATORTEMPLATE_H
#include "AnalyzerTemplate.h"
#include "OperatorTemplate.h"
/**
* @brief Template that implements features needed by Job Analyzers and
* complying to AnalyzerInterface.
* @brief Template that implements features needed by Job Operators and
* complying to OperatorInterface.
*
* @details This template is derived from AnalyzerTemplate, and is adjusted to
* @details This template is derived from OperatorTemplate, and is adjusted to
* simplify job-related computations.
*
* @ingroup analyzer
* @ingroup operator
*/
template <typename S>
class JobAnalyzerTemplate : virtual public AnalyzerTemplate<S> {
class JobOperatorTemplate : virtual public OperatorTemplate<S> {
// The template shall only be instantiated for classes which derive from SensorBase
static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
......@@ -54,10 +55,10 @@ public:
/**
* @brief Class constructor
*
* @param name Name of the analyzer
* @param name Name of the operator
*/
JobAnalyzerTemplate(const string name) :
AnalyzerTemplate<S>(name),
JobOperatorTemplate(const string name) :
OperatorTemplate<S>(name),
_jobDataVec(nullptr) {
_unitAccess.store(false);
......@@ -68,8 +69,8 @@ public:
* @brief Copy constructor
*
*/
JobAnalyzerTemplate(const JobAnalyzerTemplate& other) :
AnalyzerTemplate<S>(other),
JobOperatorTemplate(const JobOperatorTemplate& other) :
OperatorTemplate<S>(other),
_jobDataVec(nullptr) {
_unitAccess.store(false);
......@@ -80,8 +81,8 @@ public:
* @brief Assignment operator
*
*/
JobAnalyzerTemplate& operator=(const JobAnalyzerTemplate& other) {
AnalyzerTemplate<S>::operator=(other);
JobOperatorTemplate& operator=(const JobOperatorTemplate& other) {
OperatorTemplate<S>::operator=(other);
_jobDataVec = nullptr;
this->_dynamic = true;
return *this;
......@@ -90,20 +91,20 @@ public:
/**
* @brief Class destructor
*/
virtual ~JobAnalyzerTemplate() {
virtual ~JobOperatorTemplate() {
if(_jobDataVec)
delete _jobDataVec;
}
/**
* @brief Returns the units of this analyzer
* @brief Returns the units of this operator
*
* The units returned by this method are of the UnitInterface type. The actual units, in their
* derived type, are used internally. This type of analyzer employs dynamic units that are
* derived type, are used internally. This type of operator employs dynamic units that are
* generated at runtime: as such, an internal unit lock is acquired upon calling this method,
* and must later be released through the releaseUnits() method.
*
* @return The vector of UnitInterface objects of this analyzer
* @return The vector of UnitInterface objects of this operator
*/
virtual vector<UnitPtr>& getUnits() override {
// Spinlock to regulate access to units - normally innocuous
......@@ -121,17 +122,17 @@ public:
}
/**
* @brief Initializes this analyzer
* @brief Initializes this operator
*
* @param io Boost ASIO service to be used
*/
virtual void init(boost::asio::io_service& io) override { AnalyzerInterface::init(io); }
virtual void init(boost::asio::io_service& io) override { OperatorInterface::init(io); }
/**
* @brief Performs an on-demand compute task
*
* Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
* perform data analytics queries on the analyzer, which must have the _streaming attribute set
* perform data analytics queries on the operator, which must have the _streaming attribute set
* to false. A unit is generated on the fly, corresponding to the input node given as input,
* and results are returned in the form of a map.
*
......@@ -142,7 +143,7 @@ public:
map<string, reading_t> outMap;
if( !this->_streaming ) {
try {
// Getting exclusive access to the analyzer
// Getting exclusive access to the operator
while( this->_onDemandLock.exchange(true) ) {}
uint32_t jobId = MQTTChecker::topicToJob(node);
if(_jobDataVec)
......@@ -157,8 +158,15 @@ public:
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
if(this->_flatten) {
for (const auto& su : jobUnit->getSubUnits())
for (const auto &o : su->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
}
} else
throw std::runtime_error("Analyzer " + this->_name + ": cannot retrieve job data!");
throw std::runtime_error("Operator " + this->_name + ": cannot retrieve job data!");
} catch(const exception& e) {
this->_onDemandLock.store(false);
throw;
......@@ -166,31 +174,38 @@ public:
this->_onDemandLock.store(false);
} else if( this->_keepRunning ) {
bool found = false;
for(const auto& u : getUnits())
//Spinning explicitly as we need to iterate on the derived Unit objects
while(_unitAccess.exchange(true)) {}
for(const auto& u : this->_units)
if(u->getName() == node) {
found = true;
for(const auto& o : u->getBaseOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));