Commit f6c273f2 authored by Micha Müller's avatar Micha Müller
Browse files

Caliper-service rework WIP1

parent 1f28de13
......@@ -3,7 +3,8 @@
// LLNL-CODE-678900
// All rights reserved.
//
// For details, see https://github.com/scalability-llnl/Caliper.
// For LICENSE and other details of Caliper see:
// https://github.com/scalability-llnl/Caliper
//================================================================================
// Name : DcdbPusher.cpp
......@@ -55,19 +56,23 @@
#include "caliper/common/Log.h"
#include "caliper/common/RuntimeConfig.h"
#include <atomic>
#include <cstdio>
#include <cxxabi.h>
#include <errno.h>
#include <fcntl.h>
#include <features.h>
#include <sched.h>
#include <semaphore.h>
#include <string.h>
#include <unistd.h>
#include <vector>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <vector>
#include <libelf.h>
#include <gelf.h>
......@@ -78,39 +83,73 @@ namespace {
class DcdbPusher {
private:
static const ConfigSet::Entry s_configdata[];
/* General service attributes */
unsigned snapshots_processed = 0;
unsigned snapshots_failed = 0;
#define MAX_SYMBOL_SIZE 512
#define MAX_PATH_SIZE 4096
#define MSGQ_SIZE 8192
Attribute sampler_pc { Attribute::invalid };
Attribute timestamp { Attribute::invalid };
Attribute thread_id { Attribute::invalid };
int sock;
#define max_symbol_size 512
#define max_path_size 4096
typedef struct {
void* pc;
uint64_t ts;
unsigned short cpu;
} snap_data;
/* Entry for an executable symbol in the symbol table */
typedef struct {
void* start_addr;
void* end_addr;
char name[max_symbol_size];
char name[MAX_SYMBOL_SIZE];
} fun_symbol;
/* Defines a contiguous executable memory block */
typedef struct {
void* start_addr;
void* end_addr;
size_t sym_offset; // Offset pointing to the symbols for this memory block in the symbol table
size_t sym_offset; // Offset pointing to the symbols for this memory
// block in the symbol table
size_t sym_count; // Number of symbols in this address range
char pathname[max_path_size]; // Filepath + name of the binary where this memory range comes from or "[Anonymous]" if unknown
char pathname[MAX_PATH_SIZE]; // Filepath + name of the binary where
// this memory range comes from or "[Anonymous]" if unknown
} addr_range;
private:
static const ConfigSet::Entry s_configdata[];
/* General service attributes */
unsigned snapshots_processed = 0;
unsigned snapshots_failed = 0;
Attribute sampler_pc { Attribute::invalid };
Attribute timestamp { Attribute::invalid };
Attribute thread_id { Attribute::invalid };
/*
* Layout of shared-memory file used to communicate with dcdbpusher plugin:
*
* //Communication queue, aka ring buffer:
* size_t r_index
* size_t w_index
* snap_data[MSGQ_SIZE]
*
* //symbol lookup data
* size_t ar_count
* addr_range[ar_count]
* ar_count * (fun_symbol[addr_range.sym_count])
*/
void* shm; // pointer to shared memory object
size_t shm_size; // size of shm in bytes
int shm_file; // fd of the underlying shared memory file
int sock; // unix socket fd for initial shm setup communication
std::atomic_flag shm_wlock; // for thread safe writing to shm queue
// each thread has a local buffer for relevant snapshot data to reduce
// writes to shm queue as this requires locking
constexpr size_t shm_buf_size = 1024 / sizeof(snap_data);
thread_local size_t shm_buf_idx = 0;
thread_local snap_data shm_buf[shm_buf_size];
bool initialized;
/*
* Retrieve function symbols from an ELF file (binary or shared library) and
* store them in a file at a given offset
......@@ -128,7 +167,6 @@ private:
elf_version(EV_CURRENT);
fd = open(filename, O_RDONLY);
if (fd == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Could not open ELF file: "
<< strerror(errno) << std::endl;
......@@ -146,26 +184,46 @@ private:
}
data = elf_getdata(scn, NULL);
if (shdr.sh_entsize == 0) {
Log(1).stream() << chn->name() << ": DcdbPusher: Section size zero" << std::endl;
return 0;
}
count = shdr.sh_size / (shdr.sh_entsize ?: 1);
/* print the symbol names */
for (ii = 0; ii < count; ++ii) {
GElf_Sym sym;
gelf_getsym(data, ii, &sym);
char buf[MAX_SYMBOL_SIZE];
if (sym == NULL) {
Log(1).stream() << chn->name() << ": DcdbPusher: Got no symbol" << std::endl;
continue;
}
//TODO store to out_fd
//TODO update size entry in output file
gelf_getsym(data, ii, &sym);
//if (gelf_getsym(data, ii, &sym) == NULL) {
// Log(1).stream() << chn->name() << ": DcdbPusher: Got no symbol" << std::endl;
// continue;
//}
if (sym.st_info == STT_FUNC) {
//printf("%s\n", elf_strptr(elf, shdr.sh_link, sym.st_name));
char* symstr;
char* dsymstr;
int status = -1;
symstr = elf_strptr(elf, shdr.sh_link, sym.st_name);
/* Demangle if necessary. Require GNU v3 ABI by the "_Z" prefix. */
if (symstr[0] == '_' && symstr[1] == 'Z') {
dsymstr = abi::__cxa_demangle(symstr, NULL, NULL, &status);
}
if (status == 0) {
strncpy(buf, dsymstr, 512);
free((void*) dsymstr);
} else {
strncpy(buf, symstr, 512);
}
buf[511] = '\0';
//TODO store to out_fd
//TODO update size entry in output file
// TODO msync after write to shm file
printf("%s\n", buf);
entryCnt++;
}
}
......@@ -176,19 +234,17 @@ private:
}
/**
* Setup proc_map. Parse all address ranges and their pathnames which are
* Parse proc_map. Parse all address ranges and their pathnames which are
* marked as executable from /proc/self/maps.
*
* TODO extend to parse elf symbols
* TODO store information into shared memory file
* TODO demangle symbol names func_name = abi::__cxa_demangle(dlinfo.dli_sname, NULL, NULL, &status);
* TODO demangle different languages? (C, C++, Fortran, other?)
*/
bool parse_proc_map(const char* const shm_name, Channel* chn) {
FILE* file;
addr_range range;
char exec;
char buf[max_path_size];
char buf[MAX_PATH_SIZE];
if (!(file = fopen("/proc/self/maps", "r"))) {
Log(1).stream() << chn->name() << ": DcdbPusher: Could not open memory map: "
......@@ -196,6 +252,9 @@ private:
return false;
}
constexpr size_t addr_offset = 2*sizeof(size_t) + MSGQ_SIZE*sizeof(snap_data);
size_t addr_counter = 0;
while(fscanf(file, "%llx-%llx %*2c%1c%*s%*s%*s%*s%4096[^\n]",
&(range.start_addr),
&(range.end_addr),
......@@ -203,6 +262,7 @@ private:
buf) == 4) {
if (exec == 'x') {
//get rid of leading whitespaces
sscanf(buf, "%4096s", range.pathname);
if (range.pathname[0] == '\0') {
......@@ -214,6 +274,7 @@ private:
printf("\n");
}
// TODO store in shm file
// TODO msync after write to shm file
}
}
......@@ -244,6 +305,27 @@ private:
return;
}
const pid_t pid = getpid();
const std::string shm_name = "/cali_dcdb_" + std::to_string(pid);
shm_file = shm_open(shm_name.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
if (shm_file == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to open shm_file: "
<< strerror(errno) << std::endl;
return;
}
if (ftruncate(shm_file, shm_size)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to truncate shm_file: "
<< strerror(errno) << std::endl;
return;
}
shm = mmap(NULL, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_file, 0);
if (shm == (void*) -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to mmap shm_file: "
<< strerror(errno) << std::endl;
return;
}
if (!parse_proc_map("", chn)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to init proc_map"
<< std::endl;
......@@ -252,7 +334,7 @@ private:
sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if(sock == -1) {
if (sock == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to open socket: "
<< strerror(errno) << std::endl;
return;
......@@ -264,24 +346,26 @@ private:
addr.sun_family = AF_UNIX;
snprintf(&addr.sun_path[1], 91, "DCDBPusherCaliSocket");
if(connect(sock, (struct sockaddr*) &addr, sizeof(addr))) {
if (connect(sock, (struct sockaddr*) &addr, sizeof(addr))) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to connect socket: "
<< strerror(errno) << std::endl;
close(sock);
sock = -1;
return;
}
initialized = true;
}
void process_snapshot_cb(Caliper* c, Channel* chn, const SnapshotRecord*, const SnapshotRecord* sbuf) {
++snapshots_processed;
// sampler invoked snapshots are always signals, so better do nothing signal "un-safe"
// FIXME sampler invoked snapshots are always signals, so better do nothing signal "un-safe"
//if (c->is_signal()) {
// ++snapshots_failed;
// return;
//}
if (sock == -1) {
if (!initialized) {
++snapshots_failed;
return;
}
......@@ -318,6 +402,7 @@ private:
#endif
//TODO use socket only for initial communication. All other communication via shared memory
// TODO msync after write to shm file
size_t bufSize = 1024, bufCnt;
char buf[bufSize];
......@@ -334,14 +419,38 @@ private:
shutdown(sock, SHUT_WR);
close(sock);
}
// TODO close/unmap shm_file?
Log(1).stream() << chn->name() << ": DcdbPusher: "
<< snapshots_processed << " snapshots processed of which "
<< snapshots_failed << " failed." << std::endl;
}
DcdbPusher(Caliper* c, Channel* chn) {
sock = -1;
//TODO flush thread buffer on thread exit! possible with release_thread_cb?
// void create_thread_cb(Caliper* c, Channel* chn) {
// // init trace buffer on new threads
// acquire_tbuf(c, chn, true);
// }
//
// void release_thread_cb(Caliper* c, Channel* chn) {
// TraceBuffer* tbuf = acquire_tbuf(c, chn, false);
//
// if (tbuf) {
// tbuf->retired.store(true);
//
// std::lock_guard<util::spinlock>
// g(tbuf_lock);
//
// ++num_retired;
// }
// }
DcdbPusher(Caliper* c, Channel* chn) :
shm(NULL),
shm_size(2 * 1024 * 1024),
shm_file(-1),
sock(-1),
initialized(false) {
}
public:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment