SensorGroupInterface.h 5.15 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : SensorGroupInterface.h
// Author      : Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : Abstract interface defining sensor group functionality.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
29
30
31
32
33
34
/**
 * @defgroup pusherplugins Pusher Plugins
 * @ingroup  pusher
 *
 * @brief Collection of plugin interfaces, plugin templates summarizing common logic
 * and the plugins itself.
 */

35
36
37
#ifndef SENSORGROUPINTERFACE_H_
#define SENSORGROUPINTERFACE_H_

38
39
40
41
#include <atomic>
#include <memory>
#include <boost/asio.hpp>

42
43
#include "logging.h"
#include "sensorbase.h"
44

45
46
47
48
49
/**
 * @brief Abstract interface defining sensor group functionality.
 *
 * @ingroup pusherplugins
 */
50
class SensorGroupInterface {
51
public:
52

53
	SensorGroupInterface(const std::string& groupName) :
54
		_groupName(groupName),
55
		_mqttPart(""),
56
		_sync(true),
57
58
59
60
61
62
63
64
		_keepRunning(0),
		_minValues(1),
		_interval(1000),
		_pendingTasks(0),
		_timer(nullptr) {}

	SensorGroupInterface(const SensorGroupInterface& other) :
		_groupName(other._groupName),
65
		_mqttPart(other._mqttPart),
66
		_sync(other._sync),
67
68
69
70
71
72
		_keepRunning(other._keepRunning),
		_minValues(other._minValues),
		_interval(other._interval),
		_timer(nullptr) {
		_pendingTasks.store(other._pendingTasks.load());
	}
73
74
75

	virtual ~SensorGroupInterface() {}

76
77
	SensorGroupInterface& operator=(const SensorGroupInterface& other) {
		_groupName = other._groupName;
78
		_mqttPart = other._mqttPart;
79
		_sync = other._sync;
80
81
82
83
84
85
86
87
88
		_keepRunning = other._keepRunning;
		_minValues = other._minValues;
		_interval = other._interval;
		_pendingTasks.store(other._pendingTasks.load());
		_timer = nullptr;

		return *this;
	}

89
	const std::string& 	getGroupName()	const	{ return _groupName; }
90
	const std::string& 	getMqttPart()	const	{ return _mqttPart; }
91
	bool				getSync()		const	{ return _sync; }
92
93
94
95
	unsigned			getMinValues()	const	{ return _minValues; }
	unsigned			getInterval()	const	{ return _interval; }

	void setGroupName(const std::string& groupName)	{ _groupName = groupName; }
96
	void setMqttPart(const std::string& mqttPart)	{ _mqttPart = mqttPart; }
97
	void setSync(bool sync)							{ _sync = sync; }
98
99
100
101
102
	void setMinValues(unsigned minValues)			{ _minValues = minValues; }
	void setInterval(unsigned interval)				{ _interval = interval; }

	/**
	 * Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0)
103
104
	 * If the wait takes longer than a reasonable amount of time we return anyway,
	 * to not block termination of dcdbpusher.
105
106
	 */
	void wait() {
107
108
109
110
111
112
113
114
115
116
117
118
	    unsigned short retries = 3;

	    for (unsigned short i = 1; i <= retries; i++) {
	        if (_pendingTasks) {
	            LOG(info) << "Group " << _groupName << " not yet finished. Waiting... (" << i << "/" << retries << ")";
	            sleep((_interval/1000) + 1);
	        } else {
	            return;
	        }
	    }

	    LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
119
120
121
122
123
124
125
	}

	//can be overwritten
	virtual void init(boost::asio::io_service& io) {
		_timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
	}

126
	virtual void printConfig(LOG_LEVEL ll) {
127
	  LOG_VAR(ll) << "        Sensor Group: " << _groupName;
128
	  if (_mqttPart != "") {
129
	    LOG_VAR(ll) << "            MQTT part: " << _mqttPart;
130
131
132
	  }

	  if (_sync) {
133
	    LOG_VAR(ll) << "            Synchronized readings enabled";
134
	  } else {
135
	    LOG_VAR(ll) << "            Synchronized readings disabled";
136
137
	  }

138
139
	  LOG_VAR(ll) << "            minValues: " << _minValues;
	  LOG_VAR(ll) << "            interval:  " << _interval;
140
141
	}

142
143
144
	//have to be overwritten
	virtual void start() = 0;
	virtual void stop() = 0;
145

146
147
	virtual void pushBackSensor(SBasePtr s)		= 0;
	virtual std::vector<SBasePtr>& getSensors()	= 0;
148
149

protected:
150
151
152
	virtual void read() = 0;
	virtual void readAsync() = 0;

153
	std::string _groupName;
154
	std::string	_mqttPart;
155
	bool _sync;
156
157
158
159
160
	int _keepRunning;
	unsigned int _minValues;
	unsigned int _interval;
	std::atomic_uint _pendingTasks;
	std::unique_ptr<boost::asio::deadline_timer> _timer;
161
	LOGGER lg;
162
163
};

164
165
166
//for better readability
using SGroupPtr = std::shared_ptr<SensorGroupInterface>;

167
#endif /* SENSORGROUPINTERFACE_H_ */