Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 82e687b6 authored by Micha Müller's avatar Micha Müller
Browse files

Caliper-service rework WIP5

-service stores snapshot data now in shared memory
-Caliper service finished for the moment
-Pusher plugin adaption outstanding...
parent 1ffc6ba1
......@@ -56,7 +56,6 @@
#include "caliper/common/Log.h"
#include "caliper/common/RuntimeConfig.h"
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <cxxabi.h>
......@@ -82,37 +81,43 @@ using namespace cali;
namespace {
class DcdbPusher {
#define MAX_SYMBOL_SIZE 512
#define MAX_PATH_SIZE 4096
#define MSGQ_SIZE 8192
typedef struct {
uintptr_t pc;
uint64_t ts;
unsigned short cpu;
} snap_data;
/* Entry for an executable symbol in the symbol table */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
char name[MAX_SYMBOL_SIZE];
} fsym_data;
/* Defines a contiguous executable memory block */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
uintptr_t offset; // offset as parsed from /proc//maps
size_t fsym_offset; // Offset in bytes from the address of this struct
// to the beginning of the associated symbol section
size_t fsym_count; // Number of symbols in this address range
char pathname[MAX_PATH_SIZE]; // Filepath + name of the binary where
// this memory range comes from or
// "[Anonymous]" if unknown
} addr_data;
typedef struct {
uintptr_t pc;
uint64_t ts;
unsigned short cpu;
} snap_data;
/* Entry for an executable symbol in the symbol table */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
char name[MAX_SYMBOL_SIZE];
} fsym_data;
/* Defines a contiguous executable memory block */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
uintptr_t offset; // offset as parsed from /proc//maps
size_t fsym_offset; // Offset in bytes from the address of this struct
// to the beginning of the associated symbol section
size_t fsym_count; // Number of symbols in this address range
char pathname[MAX_PATH_SIZE]; // Filepath + name of the binary where
// this memory range comes from or
// "[Anonymous]" if unknown
} addr_data;
// each thread has a local buffer for relevant snapshot data to reduce
// writes to shm queue as this requires locking
static constexpr size_t shm_buf_size = 1024 / sizeof(snap_data);
static thread_local size_t shm_buf_idx;
static thread_local snap_data shm_buf[shm_buf_size];
class DcdbPusher {
private:
......@@ -130,8 +135,10 @@ private:
* Layout of shared-memory file used to communicate with dcdbpusher plugin:
*
* //Communication queue, aka ring buffer:
* size_t r_index
* size_t w_index
* size_t r_index //points to the last read element
* size_t w_index //points to the last written element
* sem_t r_sem;
* sem_t w_sem;
* snap_data[MSGQ_SIZE]
*
* //symbol lookup data
......@@ -139,19 +146,16 @@ private:
* addr_data[addr_count]
* addr_count * (fsym_data[addr_data.fsym_count])
*/
static constexpr size_t lookup_data_offset = 2*sizeof(size_t)
+ 2*sizeof(sem_t)
+ MSGQ_SIZE*sizeof(snap_data);
//TODO close shm file at end
void* shm; // pointer to shared memory object
size_t shm_size; // size of shm in bytes
int shm_file; // fd of the underlying shared memory file
int sock; // unix socket fd for initial shm setup communication
std::atomic_flag shm_wlock; // for thread safe writing to shm queue
// each thread has a local buffer for relevant snapshot data to reduce
// writes to shm queue as this requires locking
static constexpr size_t shm_buf_size = 1024 / sizeof(snap_data);
static thread_local size_t shm_buf_idx;
static thread_local snap_data shm_buf[shm_buf_size];
const std::string pid_str; // PID at process start
bool initialized;
/*
......@@ -231,7 +235,6 @@ private:
int status = -1;
fsym_data symdat;
//TODO sym.st_name == STN_UNDEF?
symstr = elf_strptr(elf, shdr.sh_link, sym.st_name);
/* Demangle if necessary. Require GNU v3 ABI by the "_Z" prefix. */
......@@ -242,8 +245,10 @@ private:
if (status == 0) {
strncpy(symdat.name, dsymstr, 512);
free((void*) dsymstr);
} else {
} else if (symstr != NULL) {
strncpy(symdat.name, symstr, 512);
} else {
symdat.name[0] = '\0';
}
symdat.name[511] = '\0';
......@@ -298,8 +303,7 @@ private:
addr_data* addr_ptr;
//some pointer arithmetic for the beginning to get appropriate start pointers
size_t& addr_cnt = *(reinterpret_cast<size_t*>(static_cast<char*>(shm)
+ 2*sizeof(size_t) + MSGQ_SIZE*sizeof(snap_data)));
size_t& addr_cnt = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + lookup_data_offset));
addr_data* const addr_start = reinterpret_cast<addr_data*>(&addr_cnt + 1);
addr_ptr = addr_start;
......@@ -409,6 +413,67 @@ private:
printf("Shm: %p, fsym_ptr:%p\n", shm, (void*) fsym_ptr);
}
/**
* Writes the thread's snap_data buffer into shm
*/
bool flush_buf() {
if (shm_buf_idx == 0) {
return true;
}
size_t r_index;
sem_t* r_sem;
sem_t* w_sem;
snap_data* msg_queue;
r_sem = reinterpret_cast<sem_t*>(static_cast<char*>(shm) + 2*sizeof(size_t));
w_sem = r_sem + 1;
msg_queue = reinterpret_cast<snap_data*>(w_sem + 1);
//TODO atomic load/stores instead of semaphore locking?
if (sem_wait(r_sem)) {
return false;
}
r_index = *(reinterpret_cast<size_t*>(static_cast<char*>(shm)));
sem_post(r_sem);
if (sem_trywait(w_sem)) {
return false;
}
size_t& w_index = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + sizeof(size_t)));
if (w_index < r_index) {
if (shm_buf_size <= (r_index - w_index - 1)) {
memcpy(&(msg_queue[w_index+1]), shm_buf, shm_buf_size*sizeof(snap_data));
w_index += shm_buf_size;
} else {
goto fail;
}
} else {
if (shm_buf_size <= (MSGQ_SIZE - w_index + r_index - 1)) {
size_t sep = MSGQ_SIZE - w_index - 1;
memcpy(&(msg_queue[w_index+1]), shm_buf, sep*sizeof(snap_data));
memcpy(msg_queue, &(shm_buf[sep]), (shm_buf_size-sep)*sizeof(snap_data));
w_index += shm_buf_size;
w_index %= MSGQ_SIZE;
} else {
goto fail;
}
}
sem_post(w_sem);
return true;
fail:
sem_post(w_sem);
return false;
}
void post_init_cb(Caliper* c, Channel* chn) {
// Check if required services sampler, timestamp and
// pthread are active by searching for identifying attributes.
......@@ -427,10 +492,9 @@ private:
return;
}
const pid_t pid = getpid();
const std::string shm_name = "/cali_dcdb_" + std::to_string(pid);
shm_file = shm_open(shm_name.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
std::string bah("/cali_dcdb_");
bah += pid_str.c_str();
shm_file = shm_open(bah.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
if (shm_file == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to open shm_file: "
<< strerror(errno) << std::endl;
......@@ -445,6 +509,7 @@ private:
if (shm == (void*) -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to mmap shm_file: "
<< strerror(errno) << std::endl;
shm = NULL;
return;
}
......@@ -454,9 +519,34 @@ private:
return;
}
//init r/w_index
*(reinterpret_cast<size_t*>(static_cast<char*>(shm))) = MSGQ_SIZE - 1;
*(reinterpret_cast<size_t*>(static_cast<char*>(shm) + sizeof(size_t))) = 0;
//TODO fit shm size to used memory?
sem_t* r_sem;
sem_t* w_sem;
r_sem = reinterpret_cast<sem_t*>(static_cast<char*>(shm) + 2*sizeof(size_t));
w_sem = r_sem + 1;
if (sem_init(r_sem, 1, 1)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to init r_sem: "
<< strerror(errno) << std::endl;
return;
}
if (sem_init(r_sem, 1, 1)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to init w_sem: "
<< strerror(errno) << std::endl;
return;
}
print_debug_shm();
return;
//tell pusher plugin our PID so it can access our shared memory
//UNIX socket used for communication
sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (sock == -1) {
......@@ -473,12 +563,24 @@ private:
if (connect(sock, (struct sockaddr*) &addr, sizeof(addr))) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to connect socket: "
<< strerror(errno) << std::endl;
<< strerror(errno) << std::endl;
close(sock);
sock = -1;
return;
}
ssize_t res = send(sock, pid_str.c_str(), pid_str.length(), 0);
shutdown(sock, SHUT_WR);
close(sock);
sock = -1;
if (res == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to send message: "
<< strerror(errno) << std::endl;
return;
}
initialized = true;
}
......@@ -509,82 +611,96 @@ private:
// *timestamp (from snapshot)
// *program counter (from snapshot)
// *cpu we are running on
unsigned long long time = timestamp_entry.value().to_uint();
unsigned long long pc = sampler_pc_entry.value().to_uint();
unsigned cpu = 0;
snap_data sdat;
sdat.ts = timestamp_entry.value().to_uint();
sdat.pc = sampler_pc_entry.value().to_uint();
sdat.cpu = 0;
#if __GLIBC_PREREQ(2, 29)
if (getcpu(&cpu, NULL)) {
unsigned cpuUInt;
if (getcpu(&cpuUInt, NULL)) {
Log(1).stream() << chn->name() << ": DcdbPusher: getcpu() failed" << std::endl;
} else {
sdat.cpu = cpuUInt;
}
#else
int cpuInt = sched_getcpu();
if (cpuInt != -1) {
cpu = cpuInt;
sdat.cpu = cpuInt;
} else {
Log(1).stream() << chn->name() << ": DcdbPusher: sched_getcpu() failed" << std::endl;
}
#endif
//TODO use socket only for initial communication. All other communication via shared memory
// TODO msync after write to shm file
size_t bufSize = 1024, bufCnt;
char buf[bufSize];
if (send(sock, buf, bufCnt, 0) == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to send message: "
<< strerror(errno) << std::endl;
++snapshots_failed;
shm_buf[shm_buf_idx] = sdat;
if (++shm_buf_idx == shm_buf_size) {
if(!flush_buf()) {
//try again on next snapshot. Will lose this snapshot though
--shm_buf_idx;
++snapshots_failed;
}
shm_buf_idx = 0;
}
}
void finish_cb(Caliper* c, Channel* chn) {
if(sock != -1) {
shutdown(sock, SHUT_WR);
close(sock);
//TODO terminate connection on pusher plugin side after timeout
if (shm != NULL) {
munmap(shm, shm_size);
shm = NULL;
}
// TODO close/unmap shm_file?
if(shm_file != -1) {
std::string bah("/cali_dcdb_");
bah += pid_str.c_str();
shm_unlink(bah.c_str());
close(shm_file);
shm_file = -1;
}
//TODO destroy semaphores
Log(1).stream() << chn->name() << ": DcdbPusher: "
<< snapshots_processed << " snapshots processed of which "
<< snapshots_failed << " failed." << std::endl;
}
//TODO flush thread buffer on thread exit! possible with release_thread_cb?
// void create_thread_cb(Caliper* c, Channel* chn) {
// // init trace buffer on new threads
// acquire_tbuf(c, chn, true);
// }
//
// void release_thread_cb(Caliper* c, Channel* chn) {
// TraceBuffer* tbuf = acquire_tbuf(c, chn, false);
//
// if (tbuf) {
// tbuf->retired.store(true);
//
// std::lock_guard<util::spinlock>
// g(tbuf_lock);
//
// ++num_retired;
// }
// }
void create_thread_cb(Caliper* c, Channel* chn) {
shm_buf_idx = 0;
}
void release_thread_cb(Caliper* c, Channel* chn) {
if (!flush_buf()) {
snapshots_failed += shm_buf_idx;
}
}
DcdbPusher(Caliper* c, Channel* chn) :
shm(NULL),
shm_size(64 * 1024 * 1024),
shm_size(32 * 1024 * 1024),
shm_file(-1),
sock(-1),
pid_str(std::to_string(getpid())),
initialized(false) {
shm_buf_idx = 0;
}
public:
~DcdbPusher() { }
~DcdbPusher() {}
static void dcdbpusher_register(Caliper* c, Channel* chn) {
DcdbPusher* instance = new DcdbPusher(c, chn);
chn->events().create_thread_evt.connect(
[instance](Caliper* c, Channel* chn){
instance->create_thread_cb(c, chn);
});
chn->events().release_thread_evt.connect(
[instance](Caliper* c, Channel* chn){
instance->release_thread_cb(c, chn);
});
chn->events().post_init_evt.connect(
[instance](Caliper* c, Channel* chn){
instance->post_init_cb(c, chn);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment