Commit 9965c8a0 authored by Micha Müller's avatar Micha Müller
Browse files

Pusher WIP: Adapter all SensorGroups to refactored template

parent 5da840d6
......@@ -29,63 +29,28 @@
#include <functional>
BACnetSensorGroup::BACnetSensorGroup(const std::string& name) : SensorGroupTemplate(name), _deviceInstance(0) {
_bacClient = nullptr;
}
BACnetSensorGroup::BACnetSensorGroup(const BACnetSensorGroup& other) : SensorGroupTemplate(other),
_bacClient(other._bacClient),
_deviceInstance(other._deviceInstance) {}
BACnetSensorGroup::~BACnetSensorGroup() {}
BACnetSensorGroup& BACnetSensorGroup::operator=(const BACnetSensorGroup& other) {
SensorGroupTemplate::operator =(other);
_bacClient = other._bacClient;
_deviceInstance = other._deviceInstance;
return *this;
}
void BACnetSensorGroup::init(boost::asio::io_service& io) {
SensorGroupTemplate::init(io);
if(_bacClient) {
_bacClient->initEntity(io);
} else {
LOG(error) << "No BACnetClient set for sensor " << _groupName << "! Cannot initialize sensor.";
}
}
void BACnetSensorGroup::start() {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensorgroup " << _groupName << " already running.";
return;
}
if (_bacClient) {
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(_bacClient->getStrand()->wrap(std::bind(&BACnetSensorGroup::readAsync, this)));
LOG(info) << "Sensorgroup " << _groupName << " started.";
} else {
LOG(error) << "No BACnetClient set for sensorgroup " << _groupName << "! Cannot start polling.";
}
}
void BACnetSensorGroup::stop() {
_keepRunning = 0;
//cancel any outstanding readAsync()
_timer->cancel();
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
void BACnetSensorGroup::read() {
reading_t reading;
reading.timestamp = getTimestamp();
for(const auto& s : _sensors) {
try {
reading.value = _bacClient->readProperty(getDeviceInstance(), s->getObjectInstance(), s->getObjectType(), s->getPropertyId());
reading.value = _entity->readProperty(getDeviceInstance(), s->getObjectInstance(), s->getObjectType(), s->getPropertyId());
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << " raw reading: \"" << reading.value << "\"";
#endif
......@@ -98,21 +63,6 @@ void BACnetSensorGroup::read() {
}
void BACnetSensorGroup::readAsync() {
read();
if (_timer && _keepRunning) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_pendingTasks++;
_timer->async_wait(_bacClient->getStrand()->wrap(std::bind(&BACnetSensorGroup::readAsync, this)));
}
_pendingTasks--;
}
void BACnetSensorGroup::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " deviceInstance: " << _deviceInstance;
if (_bacClient) {
LOG_VAR(ll) << " BACClient set";
} else {
LOG_VAR(ll) << " No BACClient set!";
}
}
......@@ -30,14 +30,12 @@
#include "BACnetSensorBase.h"
#include "../../includes/SensorGroupTemplate.h"
using BACnetClientPtr = std::shared_ptr<BACnetClient>;
/**
* @brief SensorGroupTemplate specialization for this plugin.
*
* @ingroup bacnet
*/
class BACnetSensorGroup: public SensorGroupTemplate<BACnetSensorBase> {
class BACnetSensorGroup: public SensorGroupTemplate<BACnetSensorBase, BACnetClient> {
public:
BACnetSensorGroup(const std::string& name);
......@@ -45,22 +43,14 @@ public:
virtual ~BACnetSensorGroup();
BACnetSensorGroup& operator=(const BACnetSensorGroup& other);
void init(boost::asio::io_service& io) override;
void start() override;
void stop() override;
void setBACnetClient(BACnetClientPtr bacClient) { _bacClient = bacClient; }
BACnetClientPtr const getBACnetClient() const { return _bacClient; }
void setDeviceInstance(const std::string& deviceInstance) { _deviceInstance = stoul(deviceInstance); }
uint32_t getDeviceInstance() const { return _deviceInstance; }
void printConfig(LOG_LEVEL ll) override;
void printConfig(LOG_LEVEL ll) final override;
private:
void read() override;
void readAsync() override;
void read() final override;
BACnetClientPtr _bacClient;
uint32_t _deviceInstance;
};
......
......@@ -45,11 +45,7 @@ CaliperSensorGroup::CaliperSensorGroup(const CaliperSensorGroup& other) :
_connection(-1) {
}
CaliperSensorGroup::~CaliperSensorGroup() {
if(_keepRunning) {
stop();
}
}
CaliperSensorGroup::~CaliperSensorGroup() {}
CaliperSensorGroup& CaliperSensorGroup::operator=(const CaliperSensorGroup& other) {
SensorGroupTemplate::operator=(other);
......@@ -59,18 +55,12 @@ CaliperSensorGroup& CaliperSensorGroup::operator=(const CaliperSensorGroup& othe
return *this;
}
void CaliperSensorGroup::start() {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensorgroup " << _groupName << " already running.";
return;
}
bool CaliperSensorGroup::execOnStart() {
_socket = socket(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0);
if(_socket == -1) {
LOG(error) << _groupName << ": Failed to open socket: " << strerror(errno);
return;
return false;
}
sockaddr_un addr;
......@@ -83,28 +73,20 @@ void CaliperSensorGroup::start() {
LOG(error) << _groupName << ": Failed to bind socket: " << strerror(errno);
close(_socket);
_socket = -1;
return;
return false;
}
if(listen(_socket, 1)) {
LOG(error) << _groupName << ": Can not listen on socket: " << strerror(errno);
close(_socket);
_socket = -1;
return;
return false;
}
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&CaliperSensorGroup::readAsync, this));
LOG(info) << "Sensorgroup " << _groupName << " started.";
return true;
}
void CaliperSensorGroup::stop() {
_keepRunning = 0;
//wait before closing _file
wait();
void CaliperSensorGroup::execOnStop() {
if(_connection != -1) {
close(_connection);
_connection = -1;
......@@ -113,8 +95,6 @@ void CaliperSensorGroup::stop() {
close(_socket);
_socket = -1;
}
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
void CaliperSensorGroup::read() {
......@@ -170,17 +150,6 @@ void CaliperSensorGroup::read() {
}*/
}
void CaliperSensorGroup::readAsync() {
uint64_t now = getTimestamp();
read();
if (_timer && _keepRunning) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_pendingTasks++;
_timer->async_wait(std::bind(&CaliperSensorGroup::readAsync, this));
}
_pendingTasks--;
}
void CaliperSensorGroup::printConfig(LOG_LEVEL ll) {
}
......@@ -43,14 +43,13 @@ public:
virtual ~CaliperSensorGroup();
CaliperSensorGroup& operator=(const CaliperSensorGroup& other);
void start() override;
void stop() override;
bool execOnStart() final override;
void execOnStop() final override;
void printConfig(LOG_LEVEL ll) override;
void printConfig(LOG_LEVEL ll) final override;
private:
void read() override;
void readAsync() override;
int _socket;
int _connection;
......
......@@ -38,13 +38,13 @@ GpfsmonSensorGroup::GpfsmonSensorGroup(const std::string& name) :
}
GpfsmonSensorGroup& GpfsmonSensorGroup::operator=(const GpfsmonSensorGroup& other){
SensorGroupTemplate<GpfsmonSensorBase>::operator=(other);
SensorGroupTemplate::operator=(other);
//no need to copy _data
return *this;
}
GpfsmonSensorGroup::GpfsmonSensorGroup(const GpfsmonSensorGroup& other):
SensorGroupTemplate<GpfsmonSensorBase>(other){
SensorGroupTemplate(other){
_data.resize(GPFS_METRIC::SIZE);
if(!fileExists(TMP_GPFSMON)) createTempFile();
}
......@@ -53,24 +53,6 @@ GpfsmonSensorGroup::~GpfsmonSensorGroup() {
_data.clear();
}
void GpfsmonSensorGroup::start() {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensorgroup " << _groupName << " already running.";
return;
}
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&GpfsmonSensorGroup::readAsync, this));
LOG(info) << "Sensorgroup " << _groupName << " started.";
}
void GpfsmonSensorGroup::stop() {
_keepRunning = 0;
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
void GpfsmonSensorGroup::read() {
ureading_t reading;
reading.timestamp = getTimestamp();
......@@ -102,17 +84,6 @@ void GpfsmonSensorGroup::read() {
}
}
void GpfsmonSensorGroup::readAsync() {
uint64_t now = getTimestamp();
read();
if (_timer && _keepRunning) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_pendingTasks++;
_timer->async_wait(std::bind(&GpfsmonSensorGroup::readAsync, this));
}
_pendingTasks--;
}
void GpfsmonSensorGroup::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " No other specific attributes";
}
......@@ -128,8 +99,7 @@ void GpfsmonSensorGroup::createTempFile(){
}
}
void GpfsmonSensorGroup::init(boost::asio::io_service& io) {
SensorGroupTemplate::init(io);
void GpfsmonSensorGroup::execOnInit() {
if(!fileExists(TMP_GPFSMON)) createTempFile();
_data.resize(GPFS_METRIC::SIZE);
}
......
......@@ -43,18 +43,15 @@ public:
GpfsmonSensorGroup(const std::string& name);
virtual ~GpfsmonSensorGroup();
void init(boost::asio::io_service& io) override;
void start() override;
void stop() override;
void execOnInit() final override;
GpfsmonSensorGroup& operator=(const GpfsmonSensorGroup& other);
GpfsmonSensorGroup(const GpfsmonSensorGroup& other);
void printConfig(LOG_LEVEL ll) override;
void printConfig(LOG_LEVEL ll) final override;
private:
void read() override;
void readAsync() override;
void read() final override;
bool fileExists(const char* filename);
void createTempFile();
......
......@@ -299,6 +299,7 @@ double IPMIHost::readSensorRecord(std::vector<uint8_t>& record) {
double ret = .0;
if (success && reading) {
//TODO should _delayNextReadUntil be reset here?
_errorCount = 0;
ret = *reading;
free(reading);
......
......@@ -39,71 +39,44 @@
IPMISensorGroup::IPMISensorGroup(const std::string& name) :
SensorGroupTemplate(name) {
_host = nullptr;
}
IPMISensorGroup::IPMISensorGroup(const IPMISensorGroup& other) :
SensorGroupTemplate(other),
_host(other._host) {}
SensorGroupTemplate(other) {
}
IPMISensorGroup::~IPMISensorGroup() {}
IPMISensorGroup& IPMISensorGroup::operator=(const IPMISensorGroup& other) {
SensorGroupTemplate::operator=(other);
_host = other._host;
return *this;
}
void IPMISensorGroup::init(boost::asio::io_service& io) {
SensorGroupTemplate::init(io);
if (_host) {
_host->initEntity(io);
} else {
LOG(error)<< "No host set for sensorgroup " << _groupName << "! Cannot initialize sensor.";
}
}
void IPMISensorGroup::start() {
if (_keepRunning) {
//we have been started already
LOG(info)<< "Sensorgroup " << _groupName << " already running.";
return;
}
if (_host) {
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(_host->getStrand()->wrap(boost::bind(&IPMISensorGroup::readAsync, this)));
LOG(info) << "Sensorgroup " << _groupName << " started.";
} else {
LOG(error) << "No host set for sensorgroup " << _groupName << "! Cannot start polling.";
return;
}
}
void IPMISensorGroup::stop() {
_keepRunning = 0;
//cancel any outstanding readAsync()
_timer->cancel();
LOG(info)<< "Sensorgroup " << _groupName << " stopped.";
}
void IPMISensorGroup::read() {
reading_t reading;
reading.timestamp = getTimestamp();
//TODO with SensorGroup refactor we lost the ability to sleep until
// delayNextRead. Instead we are now checking every time if we can read
// again. Overhead acceptable? General delayNextReadUntil implementation in
// SensorGroupTemplate required?
if (reading.timestamp < _entity->getDelayNextReadUntil()) {
return;
}
for(const auto& s : _sensors) {
try {
if (s->getRecordId() != 0) { /* recordId was set */
std::vector<uint8_t> sdrRecord = s->getSdrRecord();
if (sdrRecord.size() == 0) {
_host->getSdrRecord(s->getRecordId(), sdrRecord);
_entity->getSdrRecord(s->getRecordId(), sdrRecord);
s->setSdrRecord(sdrRecord);
}
reading.value = _host->readSensorRecord(sdrRecord);
reading.value = _entity->readSensorRecord(sdrRecord);
} else { /* use raw command */
reading.value = _host->sendRawCmd(s->getRawCmd(), s->getLsb(), s->getMsb());
reading.value = _entity->sendRawCmd(s->getRawCmd(), s->getLsb(), s->getMsb());
}
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << " raw reading: \"" << reading.value << "\"";
......@@ -116,27 +89,6 @@ void IPMISensorGroup::read() {
}
}
void IPMISensorGroup::readAsync() {
uint64_t now = getTimestamp();
if (now >= _host->getDelayNextReadUntil()) {
read();
}
if (_timer && _keepRunning) {
uint64_t next = nextReadingTime();
while (next < _host->getDelayNextReadUntil()) {
next += MS_TO_NS(_interval);
}
_timer->expires_at(timestamp2ptime(next));
_pendingTasks++;
_timer->async_wait(_host->getStrand()->wrap(boost::bind(&IPMISensorGroup::readAsync, this)));
}
_pendingTasks--;
}
void IPMISensorGroup::printConfig(LOG_LEVEL ll) {
if (_host) {
LOG_VAR(ll) << " Host: " << _host->getHostName();
} else {
LOG_VAR(ll) << " No Host set!";
}
/* nothing to see here */
}
......@@ -35,7 +35,7 @@
*
* @ingroup ipmi
*/
class IPMISensorGroup: public SensorGroupTemplate<IPMISensorBase> {
class IPMISensorGroup: public SensorGroupTemplate<IPMISensorBase, IPMIHost> {
public:
IPMISensorGroup(const std::string& name);
......@@ -43,20 +43,10 @@ public:
virtual ~IPMISensorGroup();
IPMISensorGroup& operator=(const IPMISensorGroup& other);
void init(boost::asio::io_service& io) override;
void start() override;
void stop() override;
void setHost(IPMIHost* host) { _host = host; }
const IPMIHost* const getHost() const { return _host; }
void printConfig(LOG_LEVEL ll) override;
void printConfig(LOG_LEVEL ll) final override;
private:
void read() override;
void readAsync() override;
IPMIHost* _host;
void read() final override;
};
#endif /* IPMISENSORGROUP_H_ */
......@@ -40,7 +40,6 @@
#include <utility>
#include "logging.h"
#include "../../includes/SensorBase.h"
#include "timestamp.h"
#include "Types.h"
#include <sstream>
......@@ -54,13 +53,7 @@ MSRSensorGroup::MSRSensorGroup(const std::string& name) :
MSRSensorGroup::~MSRSensorGroup() {
}
void MSRSensorGroup::start() {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensorgroup " << _groupName << " already running.";
return;
}
bool MSRSensorGroup::execOnStart() {
for (auto &kv : cpuToFd) {
int cpu = kv.first;
char * path = new char[200];
......@@ -81,21 +74,15 @@ void MSRSensorGroup::start() {
program_fixed();
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&MSRSensorGroup::readAsync, this));
LOG(info) << "Sensorgroup " << _groupName << " started.";
return true;
}
void MSRSensorGroup::stop() {
_keepRunning = 0;
void MSRSensorGroup::execOnStop() {
//close file descriptors and leave counters running freely
for (auto &kv: cpuToFd) {
close(kv.second);
kv.second = -1;
}
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
void MSRSensorGroup::read() {
......@@ -117,17 +104,6 @@ void MSRSensorGroup::read() {
}
}
void MSRSensorGroup::readAsync() {
uint64_t now = getTimestamp();
read();
if (_timer && _keepRunning) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_pendingTasks++;
_timer->async_wait(std::bind(&MSRSensorGroup::readAsync, this));
}
_pendingTasks--;
}
int32_t MSRSensorGroup::msr_read(uint64_t msr_number, uint64_t * value, unsigned int cpu){
return pread(cpuToFd[cpu], (void *) value, sizeof(uint64_t), msr_number);
}
......
......@@ -48,16 +48,15 @@ public:
virtual ~MSRSensorGroup();
MSRSensorGroup& operator=(const MSRSensorGroup& other)=default;
void start() override;
void stop() override;
bool execOnStart() final override;
void execOnStop() final override;
void addCpu(unsigned int cpu);
std::vector<unsigned> getCpus();
void printConfig(LOG_LEVEL ll) override;
void printConfig(LOG_LEVEL ll) final override;
private:
void read() override;
void readAsync() override;
void read() final override;
void program_fixed();
std::map<unsigned int,int> cpuToFd;
......
......@@ -61,41 +61,28 @@ OpaSensorGroup& OpaSensorGroup::operator=(const OpaSensorGroup& other) {
return *this;
}
void OpaSensorGroup::start() {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensorgroup " << _groupName << " already running.";
return;
}
bool OpaSensorGroup::execOnStart() {
if (omgt_open_port_by_num(&_port, _hfiNum, _portNum, NULL) != OMGT_STATUS_SUCCESS) {
LOG(error) << "Sensorgroup " << _groupName << " failed to open port or initialize PA connection";
_port = nullptr;
return;
return false;
}
if (omgt_pa_get_image_info(_port, _imageId, &_imageInfo)) {
LOG(error) << "Sensorgroup " << _groupName << " failed to get PA image";
omgt_close_port(_port);
_port = nullptr;
return;
return false;
}
_keepRunning