CaliperSensorGroup.cpp 5.49 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//================================================================================
// Name        : CaliperSensorGroup.cpp
// Author      : Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : Source file for Caliper sensor group class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include "CaliperSensorGroup.h"

#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
32
#include <sys/un.h>
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

#include "timestamp.h"

CaliperSensorGroup::CaliperSensorGroup(const std::string& name) :
	SensorGroupTemplate(name),
	_socket(-1),
	_connection(-1) {
}

CaliperSensorGroup::CaliperSensorGroup(const CaliperSensorGroup& other) :
  SensorGroupTemplate(other),
  _socket(-1),
  _connection(-1) {
}

48
CaliperSensorGroup::~CaliperSensorGroup() {}
49
50
51
52
53
54
55
56
57

CaliperSensorGroup& CaliperSensorGroup::operator=(const CaliperSensorGroup& other) {
  SensorGroupTemplate::operator=(other); 
  _socket = -1;
  _connection = -1;
  
  return *this;
}

58
bool CaliperSensorGroup::execOnStart() {
59
60
61
	_socket = socket(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0);

	if(_socket == -1) {
62
	    LOG(error) << _groupName << ": Failed to open socket: " << strerror(errno);
63
	    return false;
64
65
66
67
68
69
70
71
72
	}

	sockaddr_un addr;
	memset(&addr, 0, sizeof(struct sockaddr_un));

	addr.sun_family = AF_UNIX;
	snprintf(&addr.sun_path[1], 91, "DCDBPusherCaliSocket");

	if(bind(_socket, (struct sockaddr*) &addr, sizeof(addr))) {
73
	    LOG(error) << _groupName << ": Failed to bind socket: " << strerror(errno);
74
75
	    close(_socket);
	    _socket = -1;
76
	    return false;
77
78
79
	}

	if(listen(_socket, 1)) {
80
	    LOG(error) << _groupName << ": Can not listen on socket: " << strerror(errno);
81
82
	    close(_socket);
	    _socket = -1;
83
	    return false;
84
85
	}

86
	return true;
87
88
}

89
void CaliperSensorGroup::execOnStop() {
90
91
92
93
94
95
96
97
98
99
    if(_connection != -1) {
        close(_connection);
        _connection = -1;
    }
    if(_socket != -1) {
        close(_socket);
        _socket = -1;
    }
}

100
//TODO y not terminating?
101
102
103
104
105
void CaliperSensorGroup::read() {
    if (_connection == -1) {
        _connection = accept(_socket, NULL, NULL);
        if (_connection == -1) {
            if (errno != EAGAIN && errno != EWOULDBLOCK) {
106
                LOG(error) << _groupName << ": Accept failed: " << strerror(errno);
107
108
109
110
111
112
113
114
            }
            return;
        }
    }

    const size_t bufSize = 2048;
    char buf[bufSize];

115
116
117
118
119
120
121
122
123
    //retrieve all messages currently available at the socket
    while(true) {
        const ssize_t nrec = recv(_connection, (void *) buf, bufSize, MSG_DONTWAIT);
#ifdef DEBUG
        LOG(debug) << _groupName << ": Receiving message...";
#endif

        //nrec==0 indicates that the connection was closed. Probably because Caliper terminated
        if (nrec == 0) {
124
            //TODO clean up sensors
125
126
127
128
129
130
131
132
133
134
135
            close(_connection);
            _connection = -1;
            LOG(debug) << _groupName << ": Connection closed";
            return;
        //nrec==-1 indicates an error during recv()
        //if errno==EAGAIN or errno==EWOULDBLOCK there are currently no more messages available to receive
        } else if (nrec == -1) {
            if (errno != EAGAIN && errno != EWOULDBLOCK) {
                LOG(error) << _groupName << ": Recv failed: " << strerror(errno);
            }
            return;
136
137
        }

138
139
140
        //actual message processing
        std::string timestamp(buf);
        std::string feName(&(buf[timestamp.length()+1])); //function OR event name
141

142
143
144
145
146
147
        reading_t reading;
        reading.value     = 1;
        reading.timestamp = std::stoull(timestamp);
#ifdef DEBUG
        LOG(debug) << _groupName << ": Received value " << feName << " (" << timestamp << ")";
#endif
148

149
150
151
152
153
154
155
156
157
158
159
160
        S_Ptr s;
        auto it = _sensorIndex.find(feName);
        if(it != _sensorIndex.end()) {
            //we encountered this function or event name already
            s = it->second;
        } else {
            //unknown function or event name --> create a new sensor
            s = std::make_shared<CaliperSensorBase>(feName);
            s->setMqtt(_globalMqttPrefix + _mqttPart + feName);
            s->setName(s->getMqtt());
            s->initSensor(_interval);

161
            //TODO lock access!
162
163
164
165
166
            _sensors.push_back(s);
            _baseSensors.push_back(s);
            _sensorIndex.insert(std::make_pair(feName, s));
        }
        s->storeReading(reading);
167
#ifdef DEBUG
168
        LOG(debug) << _groupName << "::" << s->getName() << " raw reading: \"" << reading.value << "\"";
169
#endif
170
    }
171
172
}

173
void CaliperSensorGroup::printGroupConfig(LOG_LEVEL ll) {
174
    //nothing special to print
175
}