CaliperSensorGroup.cpp 9.54 KB
Newer Older
1
2
3
//================================================================================
// Name        : CaliperSensorGroup.cpp
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Copyright   : Leibniz Supercomputing Centre
// Description : Source file for Caliper sensor group class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include "CaliperSensorGroup.h"

#include <errno.h>
#include <stdio.h>
Micha Müller's avatar
Micha Müller committed
32
33

#include <sys/mman.h>
34
#include <sys/socket.h>
Micha Müller's avatar
Micha Müller committed
35
36
#include <sys/stat.h>
#include <sys/types.h>
37
#include <sys/un.h>
38
39
40
41

#include "timestamp.h"

CaliperSensorGroup::CaliperSensorGroup(const std::string& name) :
42
43
44
    SensorGroupTemplate(name),
    _socket(-1),
    _connection(-1),
Micha Müller's avatar
Micha Müller committed
45
46
47
    _shm(nullptr),
    _shmFile(-1),
    _shmFailCnt(0),
48
49
    _globalMqttPrefix("") {
    _lock.clear();
50
51
52
}

CaliperSensorGroup::CaliperSensorGroup(const CaliperSensorGroup& other) :
53
54
55
    SensorGroupTemplate(other),
    _socket(-1),
    _connection(-1),
Micha Müller's avatar
Micha Müller committed
56
57
58
    _shm(nullptr),
    _shmFile(-1),
    _shmFailCnt(0),
59
60
61
62
63
64
65
    _globalMqttPrefix(other._globalMqttPrefix) {
    _lock.clear();

    //SensorGroupTemplate already copy constructed _sensor
    for (auto& s : _sensors) {
        _sensorIndex.insert(std::make_pair(s->getName(), s));
    }
66
67
}

68
69
70
CaliperSensorGroup::~CaliperSensorGroup() {
    _sensorIndex.clear();
}
71
72

CaliperSensorGroup& CaliperSensorGroup::operator=(const CaliperSensorGroup& other) {
73
74
75
76
77
78
79
80
81
82
83
84
    SensorGroupTemplate::operator=(other);
    _socket = -1;
    _connection = -1;
    _globalMqttPrefix = other._globalMqttPrefix;
    _lock.clear();

    //SensorGroupTemplate already copied _sensor
    for (auto& s : _sensors) {
        _sensorIndex.insert(std::make_pair(s->getName(), s));
    }

    return *this;
85
86
}

87
bool CaliperSensorGroup::execOnStart() {
88
89
90
	_socket = socket(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0);

	if(_socket == -1) {
91
	    LOG(error) << _groupName << ": Failed to open socket: " << strerror(errno);
92
	    return false;
93
94
95
96
97
98
	}

	sockaddr_un addr;
	memset(&addr, 0, sizeof(struct sockaddr_un));

	addr.sun_family = AF_UNIX;
Micha Müller's avatar
Micha Müller committed
99
	snprintf(&addr.sun_path[1], 91, SOCK_NAME);
100
101

	if(bind(_socket, (struct sockaddr*) &addr, sizeof(addr))) {
102
	    LOG(error) << _groupName << ": Failed to bind socket: " << strerror(errno);
103
104
	    close(_socket);
	    _socket = -1;
105
	    return false;
106
107
108
	}

	if(listen(_socket, 1)) {
109
	    LOG(error) << _groupName << ": Can not listen on socket: " << strerror(errno);
110
111
	    close(_socket);
	    _socket = -1;
112
	    return false;
113
114
	}

115
	return true;
116
117
}

118
void CaliperSensorGroup::execOnStop() {
119
120
121
122
123
124
125
126
127
128
129
    if(_connection != -1) {
        close(_connection);
        _connection = -1;
    }
    if(_socket != -1) {
        close(_socket);
        _socket = -1;
    }
}

void CaliperSensorGroup::read() {
Micha Müller's avatar
Micha Müller committed
130
131
    if (_shm == nullptr) {
        //check if new application wants to send us its PID
132
133
134
        _connection = accept(_socket, NULL, NULL);
        if (_connection == -1) {
            if (errno != EAGAIN && errno != EWOULDBLOCK) {
135
                LOG(error) << _groupName << ": Accept failed: " << strerror(errno);
136
137
138
            }
            return;
        }
139
140
141
142
143
144
        //Clear sensors from last connection. If their values have not been pushed
        //by now they are lost.
        acquireSensors();
        _sensors.clear();
        _baseSensors.clear();
        releaseSensors();
145

Micha Müller's avatar
Micha Müller committed
146
147
        const size_t bufSize = 64;
        char buf[bufSize];
148

Micha Müller's avatar
Micha Müller committed
149
        const ssize_t nrec = recv(_connection, (void*) buf, bufSize, MSG_DONTWAIT);
150

Micha Müller's avatar
Micha Müller committed
151
152
        close(_connection);
        _connection = -1;
153

Micha Müller's avatar
Micha Müller committed
154
155
        if (nrec <= 0) {
            LOG(error) << _groupName << ": Connection accepted but got no message";
156
            return;
Micha Müller's avatar
Micha Müller committed
157
158
159
160
161
162
163
        }

        std::string pidStr(buf);

        _shmFile = shm_open((STR_PREFIX + pidStr).c_str(), O_RDWR, 0666);
        if (_shmFile == -1) {
            LOG(error) << _groupName << ": Failed to open _shmFile";
164
            return;
165
        }
Micha Müller's avatar
Micha Müller committed
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
        _shm = mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, _shmFile, 0);
        if (_shm == (void*) -1) {
            LOG(error) << _groupName << ": Failed to mmap _shmFile";
            _shm = nullptr;
            close(_shmFile);
            _shmFile = -1;
            return;
        }
    }

    //get snapshot data from message queue in shared memory
    size_t r_index;
    size_t w_index;

    sem_t* r_sem;
    sem_t* w_sem;

    snap_data* msg_queue;

    r_sem = reinterpret_cast<sem_t*>(static_cast<char*>(_shm) + 2*sizeof(size_t));
    w_sem = r_sem + 1;
    msg_queue = reinterpret_cast<snap_data*>(w_sem + 1);

    //TODO atomic load/stores instead of semaphore locking?
    if (sem_wait(r_sem)) {
        return;
    }
    r_index = *(reinterpret_cast<size_t*>(static_cast<char*>(_shm)));
    sem_post(r_sem);

    if (sem_wait(w_sem)) {
        return;
    }
    w_index = *(reinterpret_cast<size_t*>(static_cast<char*>(_shm) + sizeof(size_t)));
    sem_post(w_sem);

Micha Müller's avatar
Micha Müller committed
202
203
204
205
206
207
    //are new elements there at all?
    if (r_index == w_index) {
        ++_shmFailCnt;
        if (_shmFailCnt > SHM_MAX_RETRIES) {
            //"Timeout". We assume that the application terminated
            _sensorIndex.clear();
Micha Müller's avatar
Micha Müller committed
208

Micha Müller's avatar
Micha Müller committed
209
210
            sem_destroy(r_sem);
            sem_destroy(w_sem);
Micha Müller's avatar
Micha Müller committed
211

Micha Müller's avatar
Micha Müller committed
212
213
214
215
216
217
            munmap(_shm, SHM_SIZE);
            _shm = nullptr;
            close(_shmFile);
            _shmFile = -1;
        }
        LOG(debug) << "No data available (failCnt=" << _shmFailCnt << ")";
Micha Müller's avatar
Micha Müller committed
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
        return;
    }

    _shmFailCnt = 0;
    size_t nelems = 0;
    snap_data snaps[MSGQ_SIZE];

    if (r_index < w_index) {
        nelems = w_index - r_index;
        memcpy(snaps, &msg_queue[r_index+1], nelems*sizeof(snap_data));
    } else {
        nelems = MSGQ_SIZE - r_index + w_index;
        size_t sep = MSGQ_SIZE - r_index - 1;
        memcpy(snaps, &msg_queue[r_index+1], sep*sizeof(snap_data));
        memcpy(&snaps[sep], msg_queue, (nelems-sep)*sizeof(snap_data));
    }

    //update r_index in _shm
    if (sem_wait(r_sem)) {
        return;
    }
    *(reinterpret_cast<size_t*>(static_cast<char*>(_shm))) = w_index;
    sem_post(r_sem);
241

Micha Müller's avatar
Micha Müller committed
242
    LOG(debug) << "Processing " << nelems << " snapshots";
Micha Müller's avatar
Micha Müller committed
243
244

    for (size_t i = 0; i < nelems; ++i) {
245
246
        reading_t reading;
        reading.value     = 1;
Micha Müller's avatar
Micha Müller committed
247
        reading.timestamp = snaps[i].ts;
Micha Müller's avatar
Micha Müller committed
248

Micha Müller's avatar
Micha Müller committed
249
        std::string sName("cpu" + std::to_string(snaps[i].cpu) + '/');
Micha Müller's avatar
Micha Müller committed
250
251
        uintptr_t pc = snaps[i].pc;

Micha Müller's avatar
Micha Müller committed
252
253
254
255
256
        const size_t addrCnt = *(reinterpret_cast<size_t*>(static_cast<char*>(_shm)
                                                            + lookup_data_offset));
        const addr_data* const addrs = reinterpret_cast<addr_data*>(static_cast<char*>(_shm)
                                                                    + lookup_data_offset + sizeof(size_t));

Micha Müller's avatar
Micha Müller committed
257
        for(size_t j = 0; j < addrCnt; ++j) {
Micha Müller's avatar
Micha Müller committed
258
259
260
261
262
263
264
            if (pc >= addrs[j].start_addr && pc <= addrs[j].end_addr) {
                sName += addrs[j].pathname;

                const fsym_data* const fsyms = reinterpret_cast<const fsym_data* const>(
                                               reinterpret_cast<const char* const>(addrs) + addrs[j].fsym_offset);
                for(size_t k = 0; k < addrs[j].fsym_count; ++k) {
                    if (pc >= fsyms[k].start_addr && pc <= fsyms[k].end_addr) {
Micha Müller's avatar
Micha Müller committed
265
266
                        sName += "::";
                        sName += fsyms[k].name;
Micha Müller's avatar
Micha Müller committed
267
268
                        break;
                    }
Micha Müller's avatar
Micha Müller committed
269
                } //It's OK if we found no symbol. There are possibly none
Micha Müller's avatar
Micha Müller committed
270
271

                //store in sensors
Micha Müller's avatar
Micha Müller committed
272
                //TODO aggregate values (see header file)
Micha Müller's avatar
Micha Müller committed
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
                S_Ptr s;
                auto it = _sensorIndex.find(sName);
                if(it != _sensorIndex.end()) {
                    //we encountered this function or event name already
                    s = it->second;
                } else {
                    //unknown function or event name --> create a new sensor
                    s = std::make_shared<CaliperSensorBase>(sName);
                    s->setMqtt(_globalMqttPrefix + _mqttPart + sName);
                    s->setName(s->getMqtt());
                    s->initSensor(_interval);

                    acquireSensors();
                    _sensors.push_back(s);
                    _baseSensors.push_back(s);
                    releaseSensors();
                    _sensorIndex.insert(std::make_pair(sName, s));
                }
                s->storeReading(reading);
292
#ifdef DEBUG
Micha Müller's avatar
Micha Müller committed
293
                LOG(debug) << _groupName << "::" << s->getName() << " raw reading: \"" << reading.value << "\"";
294
#endif
Micha Müller's avatar
Micha Müller committed
295
296
297
298
                break;
            }
        }
        //TODO what if pc was not within any range?
299
    }
300
301
}

302
void CaliperSensorGroup::printGroupConfig(LOG_LEVEL ll) {
303
    //nothing special to print
304
}