CaliperSensorGroup.cpp 6.23 KB
Newer Older
1
2
3
//================================================================================
// Name        : CaliperSensorGroup.cpp
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Copyright   : Leibniz Supercomputing Centre
// Description : Source file for Caliper sensor group class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include "CaliperSensorGroup.h"

#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
33
#include <sys/un.h>
34
35
36
37

#include "timestamp.h"

CaliperSensorGroup::CaliperSensorGroup(const std::string& name) :
38
39
40
41
42
    SensorGroupTemplate(name),
    _socket(-1),
    _connection(-1),
    _globalMqttPrefix("") {
    _lock.clear();
43
44
45
}

CaliperSensorGroup::CaliperSensorGroup(const CaliperSensorGroup& other) :
46
47
48
49
50
51
52
53
54
55
    SensorGroupTemplate(other),
    _socket(-1),
    _connection(-1),
    _globalMqttPrefix(other._globalMqttPrefix) {
    _lock.clear();

    //SensorGroupTemplate already copy constructed _sensor
    for (auto& s : _sensors) {
        _sensorIndex.insert(std::make_pair(s->getName(), s));
    }
56
57
}

58
59
60
CaliperSensorGroup::~CaliperSensorGroup() {
    _sensorIndex.clear();
}
61
62

CaliperSensorGroup& CaliperSensorGroup::operator=(const CaliperSensorGroup& other) {
63
64
65
66
67
68
69
70
71
72
73
74
    SensorGroupTemplate::operator=(other);
    _socket = -1;
    _connection = -1;
    _globalMqttPrefix = other._globalMqttPrefix;
    _lock.clear();

    //SensorGroupTemplate already copied _sensor
    for (auto& s : _sensors) {
        _sensorIndex.insert(std::make_pair(s->getName(), s));
    }

    return *this;
75
76
}

77
bool CaliperSensorGroup::execOnStart() {
78
79
80
	_socket = socket(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0);

	if(_socket == -1) {
81
	    LOG(error) << _groupName << ": Failed to open socket: " << strerror(errno);
82
	    return false;
83
84
85
86
87
88
89
90
91
	}

	sockaddr_un addr;
	memset(&addr, 0, sizeof(struct sockaddr_un));

	addr.sun_family = AF_UNIX;
	snprintf(&addr.sun_path[1], 91, "DCDBPusherCaliSocket");

	if(bind(_socket, (struct sockaddr*) &addr, sizeof(addr))) {
92
	    LOG(error) << _groupName << ": Failed to bind socket: " << strerror(errno);
93
94
	    close(_socket);
	    _socket = -1;
95
	    return false;
96
97
98
	}

	if(listen(_socket, 1)) {
99
	    LOG(error) << _groupName << ": Can not listen on socket: " << strerror(errno);
100
101
	    close(_socket);
	    _socket = -1;
102
	    return false;
103
104
	}

105
	return true;
106
107
}

108
void CaliperSensorGroup::execOnStop() {
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
    if(_connection != -1) {
        close(_connection);
        _connection = -1;
    }
    if(_socket != -1) {
        close(_socket);
        _socket = -1;
    }
}

void CaliperSensorGroup::read() {
    if (_connection == -1) {
        _connection = accept(_socket, NULL, NULL);
        if (_connection == -1) {
            if (errno != EAGAIN && errno != EWOULDBLOCK) {
124
                LOG(error) << _groupName << ": Accept failed: " << strerror(errno);
125
126
127
128
129
130
131
132
            }
            return;
        }
    }

    const size_t bufSize = 2048;
    char buf[bufSize];

133
134
135
136
137
138
139
140
141
    //retrieve all messages currently available at the socket
    while(true) {
        const ssize_t nrec = recv(_connection, (void *) buf, bufSize, MSG_DONTWAIT);

        //nrec==0 indicates that the connection was closed. Probably because Caliper terminated
        if (nrec == 0) {
            close(_connection);
            _connection = -1;
            LOG(debug) << _groupName << ": Connection closed";
142
143
144
145
146
            acquireSensors();
            _sensors.clear();
            _baseSensors.clear();
            releaseSensors();
            _sensorIndex.clear();
147
148
            return;
        //nrec==-1 indicates an error during recv()
149
        //if errno==EAGAIN or errno==EWOULDBLOCK there are currently just no more messages available to receive
150
151
152
153
154
        } else if (nrec == -1) {
            if (errno != EAGAIN && errno != EWOULDBLOCK) {
                LOG(error) << _groupName << ": Recv failed: " << strerror(errno);
            }
            return;
155
156
        }

157
158
159
        //actual message processing
        std::string timestamp(buf);
        std::string feName(&(buf[timestamp.length()+1])); //function OR event name
160

161
162
        reading_t reading;
        reading.value     = 1;
163
164
165
166
167
        reading.timestamp = S_TO_NS(std::stoull(timestamp)); //Caliper timestamps are seconds since UNIX epoch

        //a timestamp with a precision of seconds is not unique enough --> append current nanoseconds
        uint64_t now_ns = getTimestamp() % 1000000000;
        reading.timestamp += now_ns;
168

169
170
171
172
173
174
175
176
177
178
179
180
        S_Ptr s;
        auto it = _sensorIndex.find(feName);
        if(it != _sensorIndex.end()) {
            //we encountered this function or event name already
            s = it->second;
        } else {
            //unknown function or event name --> create a new sensor
            s = std::make_shared<CaliperSensorBase>(feName);
            s->setMqtt(_globalMqttPrefix + _mqttPart + feName);
            s->setName(s->getMqtt());
            s->initSensor(_interval);

181
            acquireSensors();
182
183
            _sensors.push_back(s);
            _baseSensors.push_back(s);
184
            releaseSensors();
185
186
187
            _sensorIndex.insert(std::make_pair(feName, s));
        }
        s->storeReading(reading);
188
#ifdef DEBUG
189
        LOG(debug) << _groupName << "::" << s->getName() << " raw reading: \"" << reading.value << "\"";
190
#endif
191
    }
192
193
}

194
void CaliperSensorGroup::printGroupConfig(LOG_LEVEL ll) {
195
    //nothing special to print
196
}