Commit 8537d856 authored by Micha Müller's avatar Micha Müller
Browse files

Caliper-service rework WIP6

-Fixes and improvements to Caliper service
-Start work on Pusher plugin
parent ba9004a1
......@@ -84,6 +84,8 @@ namespace {
#define MAX_SYMBOL_SIZE 512
#define MAX_PATH_SIZE 4096
#define MSGQ_SIZE 8192
#define STR_PREFIX "/cali_dcdb_"
#define SHM_SIZE (32*1024*1024)
typedef struct {
uintptr_t pc;
......@@ -149,10 +151,12 @@ private:
static constexpr size_t lookup_data_offset = 2*sizeof(size_t)
+ 2*sizeof(sem_t)
+ MSGQ_SIZE*sizeof(snap_data);
//TODO max one application per node? multiple threads per node OK, what about multiple (MPI-)processes
//TODO close shm file at end
//TODO dynamic rebuild at runtime
void* shm; // pointer to shared memory object
size_t shm_size; // size of shm in bytes
int shm_file; // fd of the underlying shared memory file
size_t shm_bytes_written; // keep track if we exceed shm_size
int sock; // unix socket fd for initial shm setup communication
const std::string pid_str; // PID at process start
......@@ -272,9 +276,17 @@ private:
// symdat.end_addr,
// sym.st_size);
memcpy(dest_ptr, &symdat, sizeof(addr_data));
++dest_ptr;
++entryCnt;
shm_bytes_written += sizeof(fsym_data);
if (shm_bytes_written <= SHM_SIZE) {
memcpy(dest_ptr, &symdat, sizeof(fsym_data));
++dest_ptr;
++entryCnt;
} else {
Log(1).stream() << chn->name() << ": DcdbPusher: Not enough shared memory!" << std::endl;
elf_end(elf);
close(fd);
return entryCnt;
}
} else {
// printf("Symbol %s out of mem range (%llx-%llx, size %llx)\n", symdat.name,
// symdat.start_addr,
......@@ -302,6 +314,7 @@ private:
char buf[MAX_PATH_SIZE];
addr_data* addr_ptr;
shm_bytes_written = lookup_data_offset;
//some pointer arithmetic for the beginning to get appropriate start pointers
size_t& addr_cnt = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + lookup_data_offset));
addr_data* const addr_start = reinterpret_cast<addr_data*>(&addr_cnt + 1);
......@@ -340,9 +353,16 @@ private:
addr.fsym_offset = 0;
//save in shared memory
memcpy(addr_ptr, &addr, sizeof(addr_data));
++addr_ptr;
++addr_cnt;
shm_bytes_written += sizeof(addr_data);
if (shm_bytes_written <= SHM_SIZE) {
memcpy(addr_ptr, &addr, sizeof(addr_data));
++addr_ptr;
++addr_cnt;
} else {
Log(1).stream() << chn->name() << ": DcdbPusher: Running out of shared memory!" << std::endl;
fclose(file);
return false;
}
}
}
fclose(file);
......@@ -445,33 +465,28 @@ private:
return false;
}
bool ret = false;
size_t& w_index = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + sizeof(size_t)));
if (w_index < r_index) {
if (shm_buf_size <= (r_index - w_index - 1)) {
memcpy(&(msg_queue[w_index+1]), shm_buf, shm_buf_size*sizeof(snap_data));
memcpy(&msg_queue[w_index+1], shm_buf, shm_buf_size*sizeof(snap_data));
w_index += shm_buf_size;
} else {
goto fail;
ret = true;
}
} else {
if (shm_buf_size <= (MSGQ_SIZE - w_index + r_index - 1)) {
size_t sep = MSGQ_SIZE - w_index - 1;
memcpy(&(msg_queue[w_index+1]), shm_buf, sep*sizeof(snap_data));
memcpy(msg_queue, &(shm_buf[sep]), (shm_buf_size-sep)*sizeof(snap_data));
memcpy(&msg_queue[w_index+1], shm_buf, sep*sizeof(snap_data));
memcpy(msg_queue, &shm_buf[sep], (shm_buf_size-sep)*sizeof(snap_data));
w_index += shm_buf_size;
w_index %= MSGQ_SIZE;
} else {
goto fail;
ret = true;
}
}
sem_post(w_sem);
return true;
fail:
sem_post(w_sem);
return false;
return ret;
}
void post_init_cb(Caliper* c, Channel* chn) {
......@@ -492,20 +507,18 @@ private:
return;
}
std::string bah("/cali_dcdb_");
bah += pid_str.c_str();
shm_file = shm_open(bah.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
shm_file = shm_open((STR_PREFIX + pid_str).c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
if (shm_file == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to open shm_file: "
<< strerror(errno) << std::endl;
return;
}
if (ftruncate(shm_file, shm_size)) {
if (ftruncate(shm_file, SHM_SIZE)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to truncate shm_file: "
<< strerror(errno) << std::endl;
return;
}
shm = mmap(NULL, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_file, 0);
shm = mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_file, 0);
if (shm == (void*) -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to mmap shm_file: "
<< strerror(errno) << std::endl;
......@@ -542,8 +555,7 @@ private:
return;
}
print_debug_shm();
return;
//print_debug_shm();
//tell pusher plugin our PID so it can access our shared memory
//UNIX socket used for communication
......@@ -576,7 +588,7 @@ private:
sock = -1;
if (res == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to send message: "
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to send PID: "
<< strerror(errno) << std::endl;
return;
}
......@@ -644,23 +656,17 @@ private:
}
void finish_cb(Caliper* c, Channel* chn) {
//TODO terminate connection on pusher plugin side after timeout
if (shm != NULL) {
munmap(shm, shm_size);
munmap(shm, SHM_SIZE);
shm = NULL;
}
if(shm_file != -1) {
std::string bah("/cali_dcdb_");
bah += pid_str.c_str();
shm_unlink(bah.c_str());
shm_unlink((STR_PREFIX + pid_str).c_str());
close(shm_file);
shm_file = -1;
}
//TODO destroy semaphores
Log(1).stream() << chn->name() << ": DcdbPusher: "
<< snapshots_processed << " snapshots processed of which "
<< snapshots_failed << " failed." << std::endl;
......@@ -678,8 +684,8 @@ private:
DcdbPusher(Caliper* c, Channel* chn) :
shm(NULL),
shm_size(32 * 1024 * 1024),
shm_file(-1),
shm_bytes_written(0),
sock(-1),
pid_str(std::to_string(getpid())),
initialized(false) {
......
......@@ -29,7 +29,11 @@
#include <errno.h>
#include <stdio.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include "timestamp.h"
......@@ -38,6 +42,9 @@ CaliperSensorGroup::CaliperSensorGroup(const std::string& name) :
SensorGroupTemplate(name),
_socket(-1),
_connection(-1),
_shm(nullptr),
_shmFile(-1),
_shmFailCnt(0),
_globalMqttPrefix("") {
_lock.clear();
}
......@@ -46,6 +53,9 @@ CaliperSensorGroup::CaliperSensorGroup(const CaliperSensorGroup& other) :
SensorGroupTemplate(other),
_socket(-1),
_connection(-1),
_shm(nullptr),
_shmFile(-1),
_shmFailCnt(0),
_globalMqttPrefix(other._globalMqttPrefix) {
_lock.clear();
......@@ -117,7 +127,8 @@ void CaliperSensorGroup::execOnStop() {
}
void CaliperSensorGroup::read() {
if (_connection == -1) {
if (_shm == nullptr) {
//check if new application wants to send us its PID
_connection = accept(_socket, NULL, NULL);
if (_connection == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
......@@ -131,65 +142,154 @@ void CaliperSensorGroup::read() {
_sensors.clear();
_baseSensors.clear();
releaseSensors();
}
const size_t bufSize = 2048;
char buf[bufSize];
const size_t bufSize = 64;
char buf[bufSize];
//retrieve all messages currently available at the socket
while(true) {
const ssize_t nrec = recv(_connection, (void *) buf, bufSize, MSG_DONTWAIT);
const ssize_t nrec = recv(_connection, (void*) buf, bufSize, MSG_DONTWAIT);
//nrec==0 indicates that the connection was closed. Probably because Caliper terminated
if (nrec == 0) {
close(_connection);
_connection = -1;
LOG(debug) << _groupName << ": Connection closed";
close(_connection);
_connection = -1;
//Clean up sensorIndex for the next connection. Keep actual sensors until
//a new connection is received so any possibly remaining sensor values can get
//pushed in the meantime.
_sensorIndex.clear();
if (nrec <= 0) {
LOG(error) << _groupName << ": Connection accepted but got no message";
return;
//nrec==-1 indicates an error during recv()
//if errno==EAGAIN or errno==EWOULDBLOCK there are currently just no more messages available to receive
} else if (nrec == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
LOG(error) << _groupName << ": Recv failed: " << strerror(errno);
}
}
std::string pidStr(buf);
_shmFile = shm_open((STR_PREFIX + pidStr).c_str(), O_RDWR, 0666);
if (_shmFile == -1) {
LOG(error) << _groupName << ": Failed to open _shmFile";
return;
}
_shm = mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, _shmFile, 0);
if (_shm == (void*) -1) {
LOG(error) << _groupName << ": Failed to mmap _shmFile";
_shm = nullptr;
close(_shmFile);
_shmFile = -1;
return;
}
}
//get snapshot data from message queue in shared memory
size_t r_index;
size_t w_index;
sem_t* r_sem;
sem_t* w_sem;
snap_data* msg_queue;
r_sem = reinterpret_cast<sem_t*>(static_cast<char*>(_shm) + 2*sizeof(size_t));
w_sem = r_sem + 1;
msg_queue = reinterpret_cast<snap_data*>(w_sem + 1);
//TODO atomic load/stores instead of semaphore locking?
if (sem_wait(r_sem)) {
return;
}
r_index = *(reinterpret_cast<size_t*>(static_cast<char*>(_shm)));
sem_post(r_sem);
if (sem_wait(w_sem)) {
return;
}
w_index = *(reinterpret_cast<size_t*>(static_cast<char*>(_shm) + sizeof(size_t)));
sem_post(w_sem);
if (r_index == w_index && ++_shmFailCnt > SHM_MAX_RETRIES) {
_sensorIndex.clear();
sem_destroy(r_sem);
sem_destroy(w_sem);
munmap(_shm, SHM_SIZE);
_shm = nullptr;
close(_shmFile);
_shmFile = -1;
return;
}
_shmFailCnt = 0;
size_t nelems = 0;
snap_data snaps[MSGQ_SIZE];
if (r_index < w_index) {
nelems = w_index - r_index;
memcpy(snaps, &msg_queue[r_index+1], nelems*sizeof(snap_data));
} else {
nelems = MSGQ_SIZE - r_index + w_index;
size_t sep = MSGQ_SIZE - r_index - 1;
memcpy(snaps, &msg_queue[r_index+1], sep*sizeof(snap_data));
memcpy(&snaps[sep], msg_queue, (nelems-sep)*sizeof(snap_data));
}
//update r_index in _shm
if (sem_wait(r_sem)) {
return;
}
*(reinterpret_cast<size_t*>(static_cast<char*>(_shm))) = w_index;
sem_post(r_sem);
//actual message processing
std::string timestamp(buf);
std::string feName(&(buf[timestamp.length()+1])); //function OR event name
//TODO process snapshots
size_t addrCnt = *(reinterpret_cast<size_t*>(static_cast<char*>(_shm) + lookup_data_offset));
addr_data* addrPtr = reinterpret_cast<addr_data*>(&addrCnt + 1);;
for (size_t i = 0; i < nelems; ++i) {
snap_data sd = snaps[i];
reading_t reading;
reading.value = 1;
reading.timestamp = std::stoull(timestamp);
S_Ptr s;
auto it = _sensorIndex.find(feName);
if(it != _sensorIndex.end()) {
//we encountered this function or event name already
s = it->second;
} else {
//unknown function or event name --> create a new sensor
s = std::make_shared<CaliperSensorBase>(feName);
s->setMqtt(_globalMqttPrefix + _mqttPart + feName);
s->setName(s->getMqtt());
s->initSensor(_interval);
acquireSensors();
_sensors.push_back(s);
_baseSensors.push_back(s);
releaseSensors();
_sensorIndex.insert(std::make_pair(feName, s));
}
s->storeReading(reading);
reading.timestamp = sd.ts;
std::string sName("cpu" + std::to_string(sd.cpu) + '/');
uintptr_t pc = snaps[i].pc;
for(size_t j = 0; j < addrCnt; ++j) {
if (pc >= addrPtr->start_addr && pc <= addrPtr->end_addr) {
sName += addrPtr->pathname;
fsym_data* fsymPtr = reinterpret_cast<fsym_data*>(
reinterpret_cast<char*>(addrPtr) + addrPtr->fsym_offset);
for(size_t k = 0; k < addrPtr->fsym_count; ++k) {
if (pc >= fsymPtr->start_addr && pc <= fsymPtr->end_addr) {
sName += ':' + fsymPtr->name;
break;
}
++fsymPtr;
}
//store in sensors
//TODO aggregate values
S_Ptr s;
auto it = _sensorIndex.find(sName);
if(it != _sensorIndex.end()) {
//we encountered this function or event name already
s = it->second;
} else {
//unknown function or event name --> create a new sensor
s = std::make_shared<CaliperSensorBase>(sName);
s->setMqtt(_globalMqttPrefix + _mqttPart + sName);
s->setName(s->getMqtt());
s->initSensor(_interval);
acquireSensors();
_sensors.push_back(s);
_baseSensors.push_back(s);
releaseSensors();
_sensorIndex.insert(std::make_pair(sName, s));
}
s->storeReading(reading);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << " raw reading: \"" << reading.value << "\"";
LOG(debug) << _groupName << "::" << s->getName() << " raw reading: \"" << reading.value << "\"";
#endif
break;
}
++addrPtr;
}
//TODO what if pc was not within any range?
}
}
......
......@@ -33,6 +33,7 @@
#include "CaliperSensorBase.h"
#include <atomic>
#include <semaphore.h>
#include <unordered_map>
/**
......@@ -41,6 +42,62 @@
* @ingroup caliper
*/
class CaliperSensorGroup : public SensorGroupTemplate<CaliperSensorBase> {
/*******************************************************************************
* Keep in sync with DcdbPusher Caliper service
******************************************************************************/
#define SHM_MAX_RETRIES 5
#define MAX_SYMBOL_SIZE 512
#define MAX_PATH_SIZE 4096
#define MSGQ_SIZE 8192
#define STR_PREFIX "/cali_dcdb_"
#define SHM_SIZE (32*1024*1024)
typedef struct {
uintptr_t pc;
uint64_t ts;
unsigned short cpu;
} snap_data;
/* Entry for an executable symbol in the symbol table */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
char name[MAX_SYMBOL_SIZE];
} fsym_data;
/* Defines a contiguous executable memory block */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
uintptr_t offset; // offset as parsed from /proc//maps
size_t fsym_offset; // Offset in bytes from the address of this struct
// to the beginning of the associated symbol section
size_t fsym_count; // Number of symbols in this address range
char pathname[MAX_PATH_SIZE]; // Filepath + name of the binary where
// this memory range comes from or
// "[Anonymous]" if unknown
} addr_data;
/*
* Layout of shared-memory file used to communicate with Caliper service:
*
* //Communication queue, aka ring buffer:
* size_t r_index //points to the last read element
* size_t w_index //points to the last written element
* sem_t r_sem;
* sem_t w_sem;
* snap_data[MSGQ_SIZE]
*
* //symbol lookup data
* size_t addr_count
* addr_data[addr_count]
* addr_count * (fsym_data[addr_data.fsym_count])
*/
static constexpr size_t lookup_data_offset = 2*sizeof(size_t)
+ 2*sizeof(sem_t)
+ MSGQ_SIZE*sizeof(snap_data);
public:
CaliperSensorGroup(const std::string& name);
CaliperSensorGroup(const CaliperSensorGroup& other);
......@@ -68,9 +125,13 @@ private:
int _socket;
int _connection;
void* _shm;
int _shmFile;
size_t _shmFailCnt;
std::string _globalMqttPrefix;
std::atomic_flag _lock; ///< Lock to synchronize access to associated sensors
//TODO aggregate sensor values: store pair of S_Ptr and counter. Only push values at end of iteration
std::unordered_map<std::string, S_Ptr> _sensorIndex; ///< Additional sensor storage for fast lookup
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment