Commit be1a21cb authored by Michael Ott's avatar Michael Ott

Make _readingQueue size in sensorBase configurable

parent dfa45b57
......@@ -177,6 +177,7 @@ file. The following is instead a list of configuration parameters that are avail
|:----- |:----------- |
| default | Name of the template that must be used to configure this operator.
| interval | Specifies how often (in milliseconds) the operator will be invoked to perform computations, and thus the sampling interval of its output sensors. Only used for operators in _streaming_ mode.
| queueSize | Maximum number of readings to queue. Default is 1024.
| relaxed | If set to _true_, the units of this operator will be instantiated even if some of the respective input sensors do not exist.
| delay | Delay in milliseconds to be applied to the interval of the operator. This parameter can be used to tune how operator pipelines work, ensuring that the next computation stage is started only after the previous one has finished.
| unitCacheLimit | Defines the maximum size of the unit cache that is used in the on-demand and job modes. Default is 1000.
......
......@@ -225,7 +225,7 @@ protected:
// The job unit is generated as a hierarchical unit
jobUnit = unitGen.generateFromTemplate(uTemplate, jobTopic, jobData.nodes, this->_mqttPart, this->_enforceTopics, this->_relaxed);
// Initializing sensors if necessary
jobUnit->init(this->_interval);
jobUnit->init(this->_interval, this->_queueSize);
this->addToUnitCache(jobUnit);
}
return jobUnit;
......
......@@ -382,6 +382,8 @@ protected:
{
if (boost::iequals(val.first, "interval")) {
op.setInterval(stoull(val.second.data()));
} else if (boost::iequals(val.first, "queueSize")) {
op.setQueueSize(stoull(val.second.data()));
} else if (boost::iequals(val.first, "minValues")) {
op.setMinValues(stoull(val.second.data()));
} else if (boost::iequals(val.first, "mqttPart")) {
......
......@@ -265,6 +265,7 @@ public:
bool getStreaming() const { return _streaming; }
unsigned getMinValues() const { return _minValues; }
unsigned getInterval() const { return _interval; }
unsigned getQueueSize() const { return _queueSize; }
unsigned getCacheSize() const { return _cacheSize; }
unsigned getUnitCacheLimit() const { return _unitCacheLimit; }
unsigned getDelayInterval() const { return _delayInterval; }
......@@ -285,6 +286,7 @@ public:
void setDisabled(bool disabled) { _disabled = disabled; }
void setMinValues(unsigned minValues) { _minValues = minValues; }
void setInterval(unsigned interval) { _interval = interval; }
void setQueueSize(unsigned queueSize) { _queueSize = queueSize; }
void setUnitCacheLimit(unsigned uc) { _unitCacheLimit = uc+1; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
void setDelayInterval(unsigned delayInterval) { _delayInterval = delayInterval; }
......@@ -358,6 +360,8 @@ protected:
unsigned int _minValues;
// Sampling period regulating compute batches
unsigned int _interval;
// readingQueue size
unsigned int _queueSize;
// Size of the cache in time for the output sensors in this operator
unsigned int _cacheInterval;
// Maximum number of units that can be contained in the unit cache
......
......@@ -144,6 +144,7 @@ public:
LOG_VAR(ll) << " MinValues: " << _minValues;
LOG_VAR(ll) << " Interval: " << _interval;
LOG_VAR(ll) << " Interval Delay: " << _delayInterval;
LOG_VAR(ll) << " QueueSize: " << _queueSize;
LOG_VAR(ll) << " Unit Cache Size: " << _unitCacheLimit;
if(!_units.empty()) {
LOG_VAR(ll) << " Units:";
......@@ -234,7 +235,7 @@ public:
OperatorInterface::init(io);
for(const auto u : _units)
u->init(_interval);
u->init(_interval, _queueSize);
this->execOnInit();
}
......@@ -353,7 +354,7 @@ public:
addToUnitCache(tempUnit);
}
// Initializing sensors if necessary
tempUnit->init(_interval);
tempUnit->init(_interval, _queueSize);
compute(tempUnit);
retrieveAndFlush(outMap, tempUnit);
} catch(const exception& e) {
......
......@@ -61,7 +61,7 @@ public:
*
* @param interval Sampling interval in milliseconds
*/
virtual void init(unsigned int interval) = 0;
virtual void init(unsigned int interval, unsigned int queueSize) = 0;
/**
* @brief Sets the name of this unit
......
......@@ -179,14 +179,14 @@ public:
*
* @param interval Sampling interval in milliseconds
*/
void init(unsigned int interval) override {
void init(unsigned int interval, unsigned int queueSize) override {
for(const auto s : _outputs)
if (!s->isInit())
s->initSensor(interval);
s->initSensor(interval, queueSize);
for (const auto &su : _subUnits)
for (const auto s : su->getOutputs())
if (!s->isInit())
s->initSensor(interval);
s->initSensor(interval, queueSize);
}
/**
......
......@@ -44,8 +44,6 @@
*/
class SensorBase {
public:
static const size_t QUEUE_MAXLIMIT=1024;
SensorBase(const std::string& name) :
_name(name),
_mqtt(""),
......@@ -154,7 +152,7 @@ public:
void clearReadingQueue() const { reading_t buf; while(_readingQueue->pop(buf)) {} }
void pushReadingQueue(reading_t *reads, std::size_t count) const { _readingQueue->push(reads, count); }
virtual void initSensor(unsigned interval) {
virtual void initSensor(unsigned interval, unsigned queueLen) {
uint64_t cacheSize = _cacheInterval / interval + 1;
if(!_cache) {
//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
......@@ -162,7 +160,7 @@ public:
_cache->updateBatchSize(1, true);
}
if(!_readingQueue) {
_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(queueLen));
}
}
......
......@@ -107,7 +107,7 @@ void MQTTPusher::push() {
computeMsgRate();
//collect sensor-data
reading_t * reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
reading_t * reads = new reading_t[1024];
std::size_t totalCount = 0; //number of messages
while (_keepRunning || totalCount) {
if (_doHalt) {
......@@ -200,7 +200,7 @@ void MQTTPusher::push() {
int MQTTPusher::sendReadings(SensorBase &s, reading_t *reads, std::size_t &totalCount) {
//get all sensor values out of its queue
std::size_t count = s.popReadingQueue(reads, SensorBase::QUEUE_MAXLIMIT);
std::size_t count = s.popReadingQueue(reads, 1024);
//totalCount+= count;
totalCount += 1;
#ifdef DEBUG
......
......@@ -315,7 +315,8 @@ All the different plugins share some same general principles in common regarding
* Other entity attributes could be: mqttPart, protocol-version, host address and port.
2. Groups hold all attributes which multiple sensors belonging to it share in common. Common group attributes:
* __interval__ (Time in [ms] between two consecutive sensor reads. Default is 1000[ms] = 1[s])
* __minValues__ (Minimum number of sensor reads the sensors in a group should gather before they are sent together to the database. Useful to reduce MQTT-overhead. Default is 1 (every sensor value is sent on its own))
* __queueSize__ (Maximum number of sensor readings to queue to bridge connectivity issues with the CollectAgent. Default is 1024.
* __minValues__ (Minimum number of sensor reads the sensors in a group should gather before they are sent together to the database. Useful to reduce MQTT-overhead. Default is 1 (every sensor value is sent on its own))
* __mqttPart__ (Part for the [mqtt-topic](#mqttTopic) all sensors in this group should share in common)
* __default__ (One can define the name of a template group (see below) whose values and sensors should be used as default)
3. Sensors hold only those attributes which are necessary to uniquely identify the target sensor. Common base attributes:
......
......@@ -363,6 +363,8 @@ class ConfiguratorTemplate : public ConfiguratorInterface {
BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "interval")) {
sGroup.setInterval(stoull(val.second.data()));
} else if (boost::iequals(val.first, "queueSize")) {
sGroup.setQueueSize(stoull(val.second.data()));
} else if (boost::iequals(val.first, "minValues")) {
sGroup.setMinValues(stoull(val.second.data()));
} else if (boost::iequals(val.first, "mqttPart")) {
......
......@@ -61,6 +61,7 @@ class SensorGroupInterface {
_disabled(false),
_minValues(1),
_interval(1000),
_queueSize(1024),
_pendingTasks(0),
_timer(nullptr) {
}
......@@ -73,6 +74,7 @@ class SensorGroupInterface {
_disabled(other._disabled),
_minValues(other._minValues),
_interval(other._interval),
_queueSize(other._queueSize),
_pendingTasks(0),
_timer(nullptr) {
}
......@@ -87,6 +89,7 @@ class SensorGroupInterface {
_disabled = other._disabled;
_minValues = other._minValues;
_interval = other._interval;
_queueSize = other._queueSize;
_pendingTasks.store(0);
_timer = nullptr;
......@@ -101,6 +104,7 @@ class SensorGroupInterface {
virtual bool isDisabled() const { return _disabled; }
unsigned getMinValues() const { return _minValues; }
unsigned getInterval() const { return _interval; }
unsigned getQueueSize() const { return _queueSize; }
///@}
///@name Setters
......@@ -120,6 +124,7 @@ class SensorGroupInterface {
void setDisabled(const bool disabled) { _disabled = disabled; }
void setMinValues(unsigned minValues) { _minValues = minValues; }
void setInterval(unsigned interval) { _interval = interval; }
void setQueueSize(unsigned queueSize) { _queueSize = queueSize; }
///@}
///@name Publicly accessible interface methods (implemented in SensorGroupTemplate)
......@@ -228,6 +233,7 @@ class SensorGroupInterface {
LOG_VAR(ll) << leading << " Synchronized: " << (_sync ? std::string("true") : std::string("false"));
LOG_VAR(ll) << leading << " minValues: " << _minValues;
LOG_VAR(ll) << leading << " interval: " << _interval;
LOG_VAR(ll) << leading << " queueSize: " << _queueSize;
}
///@}
......@@ -318,6 +324,7 @@ class SensorGroupInterface {
bool _disabled;
unsigned int _minValues; ///< Minimum number of values a sensor should gather before they get pushed (to reduce MQTT overhead)
unsigned int _interval; ///< Reading interval cycle in milliseconds
unsigned int _queueSize; ///< Maximum number of queued readings
std::atomic_uint _pendingTasks; ///< Number of currently outstanding read operations
std::unique_ptr<boost::asio::deadline_timer> _timer; ///< Time readings in a periodic interval
std::vector<SBasePtr> _baseSensors; ///< Vector with sensors associated to this group
......
......@@ -105,7 +105,7 @@ class SensorGroupTemplate : public SensorGroupInterface {
SensorGroupInterface::init(io);
for (auto s : _sensors) {
s->initSensor(_interval);
s->initSensor(_interval, _queueSize);
}
this->execOnInit();
......
......@@ -53,6 +53,7 @@ class SensorGroupTemplateEntity : public SensorGroupTemplate<S> {
using SensorGroupInterface::_disabled;
using SensorGroupInterface::_groupName;
using SensorGroupInterface::_interval;
using SensorGroupInterface::_queueSize;
using SensorGroupInterface::_keepRunning;
using SensorGroupInterface::_pendingTasks;
using SensorGroupInterface::_timer;
......@@ -100,7 +101,7 @@ class SensorGroupTemplateEntity : public SensorGroupTemplate<S> {
SensorGroupInterface::init(io);
for (auto s : _sensors) {
s->initSensor(_interval);
s->initSensor(_interval, _queueSize);
}
_entity->init(io);
......
......@@ -95,13 +95,6 @@ class IPMISensorBase : public SensorBase {
xccBulkEnergy
};
void initSensor(unsigned interval) override {
if (_type == xccDatastorePower) {
_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(3000));
}
SensorBase::initSensor(interval);
}
uint16_t getRecordId() const { return _recordId; }
const std::vector<uint8_t> &getSdrRecord() const { return _sdrRecord; }
double getFactor() const { return _factor; }
......
......@@ -215,6 +215,12 @@ bool IPMISensorGroup::checkConfig() {
LOG(error) << _groupName << "::" << s->getName() << " has an undefined sensor type";
return false;
}
if (s->getType() == IPMISensorBase::sensorType::xccDatastorePower) {
if (_queueSize < 3000) {
LOG(info) << _groupName << "::" << s->getName() << " increasing queueSize to 3000 to store all data store readings (was " << _queueSize << ")";
_queueSize = 3000;
}
}
}
return true;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment