Commit 3a310364 authored by Alessio Netti's avatar Alessio Netti

Analytics: re-worked unit access method in job analyzers

- New method is lock-based and compatible with all features (e.g. REST API)
in the framework
- Performance impact is expected to be negligible as race conditions will
not be frequent
parent 7cb4c7fb
......@@ -262,12 +262,14 @@ void AnalyticsManager::removeTopics(an_dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
for(const auto& a : p.configurator->getAnalyzers()) {
mqttCheck.removeGroup(a->getName());
if (a->getStreaming())
if (a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs()) {
mqttCheck.removeTopic(o->getMqtt());
mqttCheck.removeName(o->getName());
}
a->releaseUnits();
}
}
}
......@@ -277,11 +279,13 @@ bool AnalyticsManager::checkTopics(an_dl_t p) {
for(const auto& a : p.configurator->getAnalyzers()) {
if (!mqttCheck.checkGroup(a->getName()))
validTopics = false;
if (a->getStreaming())
if (a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs())
if (!mqttCheck.checkTopic(o->getMqtt()) || !mqttCheck.checkName(o->getName()))
validTopics = false;
a->releaseUnits();
}
}
return validTopics;
}
......@@ -407,6 +411,7 @@ void AnalyticsManager::GET_analytics_sensors(endpointArgs) {
group.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(s->getMqtt())));
}
}
a->releaseUnits();
sensors.add_child(a->getName(), group);
}
}
......@@ -421,6 +426,7 @@ void AnalyticsManager::GET_analytics_sensors(endpointArgs) {
data << a->getName() << "::" << s->getMqtt() << "\n";
}
}
a->releaseUnits();
}
}
}
......@@ -463,6 +469,7 @@ void AnalyticsManager::GET_analytics_units(endpointArgs) {
for (const auto& u : a->getUnits()) {
group.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(u->getName())));
}
a->releaseUnits();
units.add_child(a->getName(), group);
}
root.add_child(p.id, units);
......@@ -474,6 +481,7 @@ void AnalyticsManager::GET_analytics_units(endpointArgs) {
for (const auto& u : a->getUnits()) {
data << a->getName() << "::" << u->getName() << "\n";
}
a->releaseUnits();
}
}
}
......
......@@ -198,6 +198,7 @@ public:
// If the analyzer must be duplicated for each compute unit, we copy-construct identical
// instances that have different unit IDs
unsigned numUnits = an->getUnits().size();
an->releaseUnits();
if(an->getDuplicate() && numUnits>1) {
for(unsigned int i=0; i < numUnits; i++) {
A_Ptr anCopy = std::make_shared<Analyzer>(*an);
......
......@@ -258,7 +258,8 @@ public:
void setUnitCacheLimit(unsigned uc) { _unitCacheLimit = uc+1; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
void setDelayInterval(unsigned delayInterval) { _delayInterval = delayInterval; }
virtual vector<UnitPtr>& getUnits(bool pop=false) = 0;
virtual vector<UnitPtr>& getUnits() = 0;
virtual void releaseUnits() = 0;
protected:
......
......@@ -167,12 +167,22 @@ public:
* @brief Returns the units of this analyzer
*
* The units returned by this method are of the UnitInterface type. The actual units, in their
* derived type, are used internally.
* derived type, are used internally. If the analyzer uses a lock to regulate unit access,
* this will be acquired and must be released through the releaseUnits() method.
*
* @param pop This parameter does not affect the output of this method
* @return The vector of UnitInterface objects of this analyzer
*/
virtual vector<UnitPtr>& getUnits(bool pop=false) override { return _baseUnits; }
virtual vector<UnitPtr>& getUnits() override { return _baseUnits; }
/**
* @brief Releases the internal lock for unit access
*
* This method is meant to regulate concurrent access to units which are generated dynamically.
* In this specific implementation, the method does not perform anything as units in standard
* analyzers are static and never modified.
*
*/
virtual void releaseUnits() override {}
/**
* @brief Clears all the units contained in this analyzer
......
//================================================================================
// Name : JobAnalyzerConfiguratorTemplate.h
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description : Template that implements a configurator for Job analyzer plugins.
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// Created by Netti, Alessio on 07.06.19.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_JOBANALYZERCONFIGURATORTEMPLATE_H
#define PROJECT_JOBANALYZERCONFIGURATORTEMPLATE_H
......@@ -65,6 +87,8 @@ protected:
* @return True if successful, false otherwise
*/
virtual bool readUnits(Analyzer& an, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
// Forcing the job analyzer to not be duplicated
an.setDuplicate(false);
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = this->_unitGen.generateUnits(protoInputs, protoOutputs, inputMode,
......
......@@ -28,10 +28,6 @@
#define PROJECT_JOBANALYZERTEMPLATE_H
#include "AnalyzerTemplate.h"
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/property_tree/json_parser.hpp>
/**
* Template that implements features needed by Job Analyzers and complying to AnalyzerInterface.
......@@ -59,10 +55,9 @@ public:
*/
JobAnalyzerTemplate(const string name) :
AnalyzerTemplate<S>(name),
_jobUnitsQueue(nullptr),
_jobDataVec(nullptr) {
_mapAccess.store(false);
_unitAccess.store(false);
}
/**
......@@ -71,10 +66,9 @@ public:
*/
JobAnalyzerTemplate(const JobAnalyzerTemplate& other) :
AnalyzerTemplate<S>(other),
_jobUnitsQueue(nullptr),
_jobDataVec(nullptr) {
_mapAccess.store(false);
_unitAccess.store(false);
}
/**
......@@ -83,7 +77,6 @@ public:
*/
JobAnalyzerTemplate& operator=(const JobAnalyzerTemplate& other) {
AnalyzerTemplate<S>::operator=(other);
_jobUnitsQueue.reset(nullptr);
_jobDataVec = nullptr;
}
......@@ -95,78 +88,29 @@ public:
delete _jobDataVec;
}
/**
* @brief Perform a REST-triggered PUT action
*
* This implementation supplies a "jobs" action that can be used to retrieve the list of jobs
* that have been recently processed by this analyzer.
*
* @param action Name of the action to be performed
* @param queries Vector of queries (key-value pairs)
*
* @return Response to the request as a <response, data> pair
*/
virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) override {
std::ostringstream data;
if(action=="jobs") {
uint32_t maxJobs = queries.count("max")>0 ? stoull(queries.at("max")) : 100;
bool json = queries.count("json")>0 ? queries.at("json")=="true" : false;
uint32_t jobCtr = 0;
if(!this->_unitCache)
throw std::runtime_error("Initialization error in analyzer " + this->_name + "!");
while( _mapAccess.exchange(true) ) {}
if(json) {
boost::property_tree::ptree root, units, sensors;
for (auto it = this->_insertionLUT->rbegin(); it != this->_insertionLUT->rend() && jobCtr < maxJobs; ++it) {
for (const auto &s : *it->getBaseOutputs())
sensors.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(s->getName())));
units.add_child(*it->getName(), sensors);
jobCtr++;
}
root.add_child(this->_name, units);
boost::property_tree::write_json(data, root, true);
} else {
for (auto it = this->_insertionLUT->rbegin(); it != this->_insertionLUT->rend() && jobCtr < maxJobs; ++it) {
for (const auto &s : *it->getBaseOutputs())
data << *it->getName() << "::" << s->getMqtt() << "\n";
jobCtr++;
}
}
_mapAccess.store(false);
} else
throw invalid_argument("Unknown plugin action " + action + " requested!");
restResponse_t resp;
resp.data = data.str();
return resp;
}
/**
* @brief Returns the units of this analyzer
*
* The units returned by this method are of the UnitInterface type. The actual units, in their
* derived type, are used internally. This type of analyzer employs dynamic units that are
* generated at runtime: as such, lock-free access to the units that have been modified
* recently has to be guaranteed. If the pop parameter is true, recently-modified units
* will be popped from an internal spsc queue, and returned. This mode is to be used when
* retrieving new data to be sent. If pop=false, this method will return an empty vector, as
* job analyzers do not have permanent units.
* generated at runtime: as such, an internal unit lock is acquired upon calling this method,
* and must later be released through the releaseUnits() method.
*
* @param pop If the analyzer stores units dynamically as they are modified, these will be returned too
* @return The vector of UnitInterface objects of this analyzer
*/
virtual vector<UnitPtr>& getUnits(bool pop=false) override {
if(pop) {
this->_baseUnits.clear();
this->_units.clear();
//unordered_set<string> unitSet;
U_Ptr u;
while(_jobUnitsQueue.pop(u))
//if(unitSet.insert(u->getName()))
addUnit(u);
return this->_baseUnits;
} else
return _dummyBaseUnits;
virtual vector<UnitPtr>& getUnits() override {
// Spinlock to regulate access to units - normally innocuous
while(_unitAccess.exchange(true)) {}
return this->_baseUnits;
}
/**
* @brief Releases the access lock to units
*
* This method must be called anytime operations on units are performed through getUnits().
*/
virtual void releaseUnits() {
_unitAccess.store(false);
}
/**
......@@ -174,10 +118,7 @@ public:
*
* @param io Boost ASIO service to be used
*/
virtual void init(boost::asio::io_service& io) override {
AnalyzerInterface::init(io);
_jobUnitsQueue.reset(new boost::lockfree::spsc_queue<U_Ptr>(this->_unitCacheLimit));
}
virtual void init(boost::asio::io_service& io) override { AnalyzerInterface::init(io); }
/**
* @brief Performs an on-demand compute task
......@@ -216,14 +157,13 @@ public:
this->_onDemandLock.store(false);
} else if( this->_keepRunning ) {
bool found = false;
while( _mapAccess.exchange(true) ) {}
for(const auto& kv : this->_unitCache)
if(kv.first == node) {
for(const auto& u : getUnits())
if(u->getName() == node) {
found = true;
for(const auto& o : kv.second->getBaseOutputs())
for(const auto& o : u->getBaseOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
}
_mapAccess.store(false);
releaseUnits();
if(!found)
throw std::domain_error("Job " + node + " does not belong to the domain of " + this->_name + "!");
......@@ -271,11 +211,8 @@ protected:
for (const auto s : jobUnit->getOutputs())
if (!s->isInit())
s->initSensor(this->_cacheSize);
// Spinlock to regulate access to the internal unit map - normally innocuous
while( _mapAccess.exchange(true) ) {}
addToUnitCache(jobUnit);
_mapAccess.store(false);
}
return jobUnit;
}
......@@ -316,16 +253,30 @@ protected:
vector<qeJobData>* buf = this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true);
if(buf) {
_jobDataVec = buf;
_tempUnits.clear();
// Producing units from the job data, discarding invalid jobs in the process
for(const auto& job : *_jobDataVec) {
U_Ptr jobUnit = jobDataToUnit(job);
compute(jobUnit);
_jobUnitsQueue->push(jobUnit);
try {
_tempUnits.push_back(jobDataToUnit(job));
} catch(const invalid_argument& e2) { continue; }
}
// Performing actual computation on each unit
for(const auto& ju : _tempUnits)
compute(ju);
// Acquiring the spinlock to refresh the exposed units
while(_unitAccess.exchange(true)) {}
this->clearUnits();
for(const auto& ju : _tempUnits)
addUnit(ju);
_unitAccess.store(false);
_tempUnits.clear();
}
else
LOG(error) << "Analyzer " + this->_name + ": cannot retrieve job data!";
} catch(const exception& e) {
LOG(error) << "Analyzer " + this->_name + ": internal error " + e.what() + " during computation!";
_unitAccess.store(false);
}
if (this->_timer && this->_keepRunning) {
......@@ -336,13 +287,10 @@ protected:
this->_pendingTasks--;
}
// Queue of recently-modified units that is periodically emptied when messages are pushed
std::unique_ptr<boost::lockfree::spsc_queue<U_Ptr>> _jobUnitsQueue;
// Vector of recently-modified units
vector<U_Ptr> _tempUnits;
// Spinlock used to regulate access to the internal units map, for "visualization" purposes
atomic<bool> _mapAccess;
// Dummy vector used to hide dynamic units in getUnits, while avoiding heap allocations
// It is quite ugly, but it is also the most convenient way to achieve this
vector<UnitPtr> _dummyBaseUnits;
atomic<bool> _unitAccess;
// Vector of job data structures used to retrieve job data at runtime
vector<qeJobData>* _jobDataVec;
// Logger object
......
......@@ -134,19 +134,21 @@ void AnalyticsController::run() {
for (auto &p : _analyticsPlugins) {
if (_doHalt) break;
for (const auto &a : p.configurator->getAnalyzers())
if(a->getStreaming())
for (const auto &u : a->getUnits(true))
if(a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &s : u->getBaseOutputs())
if (s->getSizeOfReadingQueue() >= a->getMinValues() && sid.mqttTopicConvert(s->getMqtt())) {
readings.clear();
sensorQueue = s->getReadingQueue();
while(sensorQueue->pop(readingBuf)) {
while (sensorQueue->pop(readingBuf)) {
readings.push_back(DCDB::SensorDataStoreReading(sid, readingBuf.timestamp, readingBuf.value));
_sensorCache->storeSensor(sid, readingBuf.timestamp, readingBuf.value);
}
_dcdbStore->insertBatch(readings);
_readingCtr+=readings.size();
_readingCtr += readings.size();
}
a->releaseUnits();
}
}
sleep(1);
}
......@@ -162,7 +164,7 @@ bool AnalyticsController::publishSensors() {
uint64_t publishCtr = 0;
for (auto &p : _analyticsPlugins)
for (const auto &a : p.configurator->getAnalyzers())
if(a->getStreaming())
if(a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &s : u->getBaseOutputs()) {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
......@@ -186,6 +188,8 @@ bool AnalyticsController::publishSensors() {
break;
}
}
a->releaseUnits();
}
if(failedPublish)
LOG(error) << "Issues during sensor name auto-publish! Only " << publishCtr << " sensors were published.";
......
......@@ -153,7 +153,7 @@ void MQTTPusher::push() {
}
for (const auto &a : p.configurator->getAnalyzers()) {
if(a->getStreaming()) {
for (const auto &u : a->getUnits(true)) {
for (const auto &u : a->getUnits()) {
for (const auto &s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_msgCap == DISABLED || totalCount < (unsigned) _maxNumberOfMessages) {
......@@ -166,6 +166,7 @@ void MQTTPusher::push() {
}
}
}
a->releaseUnits();
}
}
}
......@@ -242,21 +243,22 @@ bool MQTTPusher::sendMappings() {
// Performing auto-publish for analytics output sensors
for(auto& p: _analyticsPlugins)
for(auto& a: p.configurator->getAnalyzers())
if(a->getStreaming())
for(auto& u: a->getUnits())
for(auto& s: u->getBaseOutputs()) {
if(a->getStreaming()) {
for (auto &u: a->getUnits())
for (auto &s: u->getBaseOutputs()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = s->getName();
name = s->getName();
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
}
else
} else
publishCtr++;
}
a->releaseUnits();
}
LOGM(info) << "Sensor name auto-publish performed for all " << publishCtr << " sensors!";
return true;
}
......@@ -286,8 +288,11 @@ void MQTTPusher::computeMsgRate() {
msgRate += (float)g->getSensors().size() * ( 1000.0f / (float)g->getInterval() ) / (float)g->getMinValues();
for(auto& p : _analyticsPlugins)
for(const auto& a : p.configurator->getAnalyzers())
for(const auto& u : a->getUnits())
msgRate += (float)u->getBaseOutputs().size() * ( 1000.0f / (float)a->getInterval() ) / (float)a->getMinValues();
if(a->getStreaming()) {
for (const auto &u : a->getUnits())
msgRate += (float) u->getBaseOutputs().size() * (1000.0f / (float) a->getInterval()) / (float) a->getMinValues();
a->releaseUnits();
}
// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
if(_maxNumberOfMessages >= 0 && _msgCap != MINIMUM) {
_msgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages ? DISABLED : ENABLED;
......
......@@ -200,6 +200,7 @@ void RestAPI::GET_average(endpointArgs) {
}
}
}
a->releaseUnits();
}
}
}
......
......@@ -88,10 +88,12 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
// Adding data analytics sensors to the map
for(auto& p : _analyticsManager->getPlugins()) {
for(const auto& a : p.configurator->getAnalyzers())
if (a->getStreaming())
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs())
_sensorMap.insert(std::make_pair(o->getName(), o));
if (a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs())
_sensorMap.insert(std::make_pair(o->getName(), o));
a->releaseUnits();
}
}
_queryEngine.updated.store(false);
_queryEngine.updating.store(false);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment