2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit 1fa777cf authored by Micha Müller's avatar Micha Müller
Browse files

Caliper-service WIP7

-fixes on both sides
-finish first implementation of pusher plugin
-still requires quality of life improvements
parent 02abd6cd
......@@ -81,11 +81,12 @@ using namespace cali;
namespace {
#define MAX_SYMBOL_SIZE 512
#define MAX_PATH_SIZE 4096
#define MAX_SYMBOL_SIZE 512 //symbol names are not limited by any means but our memory is
#define MAX_PATH_SIZE 4096 //linux paths are not allowed to be longer than 4096 chars
#define MSGQ_SIZE 8192
#define STR_PREFIX "/cali_dcdb_"
#define SHM_SIZE (32*1024*1024)
#define SOCK_NAME "DCDBPusherCaliSocket"
typedef struct {
uintptr_t pc;
......@@ -172,6 +173,8 @@ private:
* @param dest_ptr Pointer to memory where to store fsym_data. Will be modified
* to point behind the last written element on return.
* @return The number of symbol entries written.
* //TODO include debug symbols
* //TODO check if symbols exceeding MAX_SYMBOL_SIZE overwrite succeeding syms
*/
size_t write_function_symbols(const char* const filename,
const uintptr_t start_addr,
......@@ -405,12 +408,11 @@ private:
}
void print_debug_shm() {
const size_t& addr_cnt = *(reinterpret_cast<size_t*>(static_cast<char*>(shm)
+ 2*sizeof(size_t) + MSGQ_SIZE*sizeof(snap_data)));
const size_t& addr_cnt = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + lookup_data_offset));
const addr_data* addr_ptr = reinterpret_cast<const addr_data*>(&addr_cnt + 1);
const fsym_data* fsym_ptr = reinterpret_cast<const fsym_data*>(addr_ptr + addr_cnt);
size_t sym_cnt = 0;
for (size_t i = 0; i < addr_cnt; ++i) {
printf("Mem range %s: %llx-%llx contains %d symbols:\n", addr_ptr->pathname,
addr_ptr->start_addr,
......@@ -426,11 +428,12 @@ private:
++fsym_ptr;
}
printf("\n");
sym_cnt += addr_ptr->fsym_count;
++addr_ptr;
}
//debug
printf("Shm: %p, fsym_ptr:%p\n", shm, (void*) fsym_ptr);
printf("%d ranges with overall %d symbols\n", addr_cnt, sym_cnt);
}
/**
......@@ -456,33 +459,31 @@ private:
if (sem_wait(r_sem)) {
return false;
}
r_index = *(reinterpret_cast<size_t*>(static_cast<char*>(shm)));
sem_post(r_sem);
if (sem_trywait(w_sem)) {
return false;
}
size_t& w_index = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + sizeof(size_t)));
bool ret = false;
size_t& w_index = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + sizeof(size_t)));
const size_t elem_avail = w_index < r_index ? (r_index - w_index - 1) :
(MSGQ_SIZE - w_index + r_index - 1);
if (w_index < r_index) {
if (shm_buf_size <= (r_index - w_index - 1)) {
memcpy(&msg_queue[w_index+1], shm_buf, shm_buf_size*sizeof(snap_data));
w_index += shm_buf_size;
ret = true;
}
} else {
if (shm_buf_size <= (MSGQ_SIZE - w_index + r_index - 1)) {
if (elem_avail >= shm_buf_size) {
if ((w_index + shm_buf_size) >= MSGQ_SIZE) {
//wrap around end of queue
size_t sep = MSGQ_SIZE - w_index - 1;
memcpy(&msg_queue[w_index+1], shm_buf, sep*sizeof(snap_data));
memcpy(msg_queue, &shm_buf[sep], (shm_buf_size-sep)*sizeof(snap_data));
w_index += shm_buf_size;
w_index %= MSGQ_SIZE;
ret = true;
} else {
memcpy(&msg_queue[w_index+1], shm_buf, shm_buf_size*sizeof(snap_data));
w_index += shm_buf_size;
}
ret = true;
}
sem_post(w_sem);
......@@ -549,13 +550,13 @@ private:
return;
}
if (sem_init(r_sem, 1, 1)) {
if (sem_init(w_sem, 1, 1)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to init w_sem: "
<< strerror(errno) << std::endl;
return;
}
//print_debug_shm();
print_debug_shm();
//tell pusher plugin our PID so it can access our shared memory
//UNIX socket used for communication
......@@ -571,7 +572,7 @@ private:
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
snprintf(&addr.sun_path[1], 91, "DCDBPusherCaliSocket");
snprintf(&addr.sun_path[1], 91, SOCK_NAME);
if (connect(sock, (struct sockaddr*) &addr, sizeof(addr))) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to connect socket: "
......@@ -581,7 +582,7 @@ private:
return;
}
ssize_t res = send(sock, pid_str.c_str(), pid_str.length(), 0);
ssize_t res = send(sock, pid_str.c_str(), pid_str.length() + 1, 0);
shutdown(sock, SHUT_WR);
close(sock);
......
......@@ -96,7 +96,7 @@ bool CaliperSensorGroup::execOnStart() {
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
snprintf(&addr.sun_path[1], 91, "DCDBPusherCaliSocket");
snprintf(&addr.sun_path[1], 91, SOCK_NAME);
if(bind(_socket, (struct sockaddr*) &addr, sizeof(addr))) {
LOG(error) << _groupName << ": Failed to bind socket: " << strerror(errno);
......@@ -199,16 +199,22 @@ void CaliperSensorGroup::read() {
w_index = *(reinterpret_cast<size_t*>(static_cast<char*>(_shm) + sizeof(size_t)));
sem_post(w_sem);
if (r_index == w_index && ++_shmFailCnt > SHM_MAX_RETRIES) {
_sensorIndex.clear();
//are new elements there at all?
if (r_index == w_index) {
++_shmFailCnt;
if (_shmFailCnt > SHM_MAX_RETRIES) {
//"Timeout". We assume that the application terminated
_sensorIndex.clear();
sem_destroy(r_sem);
sem_destroy(w_sem);
sem_destroy(r_sem);
sem_destroy(w_sem);
munmap(_shm, SHM_SIZE);
_shm = nullptr;
close(_shmFile);
_shmFile = -1;
munmap(_shm, SHM_SIZE);
_shm = nullptr;
close(_shmFile);
_shmFile = -1;
}
LOG(debug) << "No data available (failCnt=" << _shmFailCnt << ")";
return;
}
......@@ -234,32 +240,33 @@ void CaliperSensorGroup::read() {
sem_post(r_sem);
//TODO process snapshots
size_t addrCnt = *(reinterpret_cast<size_t*>(static_cast<char*>(_shm) + lookup_data_offset));
addr_data* addrPtr = reinterpret_cast<addr_data*>(&addrCnt + 1);;
LOG(debug) << "Processing " << nelems << " snapshots";
for (size_t i = 0; i < nelems; ++i) {
snap_data sd = snaps[i];
reading_t reading;
reading.value = 1;
reading.timestamp = sd.ts;
reading.timestamp = snaps[i].ts;
std::string sName("cpu" + std::to_string(sd.cpu) + '/');
std::string sName("cpu" + std::to_string(snaps[i].cpu) + '/');
uintptr_t pc = snaps[i].pc;
const size_t addrCnt = *(reinterpret_cast<size_t*>(static_cast<char*>(_shm)
+ lookup_data_offset));
const addr_data* const addrs = reinterpret_cast<addr_data*>(static_cast<char*>(_shm)
+ lookup_data_offset + sizeof(size_t));
for(size_t j = 0; j < addrCnt; ++j) {
if (pc >= addrPtr->start_addr && pc <= addrPtr->end_addr) {
sName += addrPtr->pathname;
fsym_data* fsymPtr = reinterpret_cast<fsym_data*>(
reinterpret_cast<char*>(addrPtr) + addrPtr->fsym_offset);
for(size_t k = 0; k < addrPtr->fsym_count; ++k) {
if (pc >= fsymPtr->start_addr && pc <= fsymPtr->end_addr) {
sName += ':' + fsymPtr->name;
if (pc >= addrs[j].start_addr && pc <= addrs[j].end_addr) {
sName += addrs[j].pathname;
const fsym_data* const fsyms = reinterpret_cast<const fsym_data* const>(
reinterpret_cast<const char* const>(addrs) + addrs[j].fsym_offset);
for(size_t k = 0; k < addrs[j].fsym_count; ++k) {
if (pc >= fsyms[k].start_addr && pc <= fsyms[k].end_addr) {
sName += ':' + fsyms[k].name;
break;
}
++fsymPtr;
}
} //It's OK if we found no symbol. There are possibly none
//store in sensors
//TODO aggregate values
......@@ -287,7 +294,6 @@ void CaliperSensorGroup::read() {
#endif
break;
}
++addrPtr;
}
//TODO what if pc was not within any range?
}
......
......@@ -44,14 +44,15 @@
class CaliperSensorGroup : public SensorGroupTemplate<CaliperSensorBase> {
/*******************************************************************************
* Keep in sync with DcdbPusher Caliper service
* Common defines. Keep in sync with DcdbPusher Caliper service
******************************************************************************/
#define SHM_MAX_RETRIES 5
#define MAX_SYMBOL_SIZE 512
#define MAX_PATH_SIZE 4096
#define SHM_MAX_RETRIES 10 //TODO make configurable
#define MAX_SYMBOL_SIZE 512 //symbol names are not limited by any means but our memory is
#define MAX_PATH_SIZE 4096 //linux paths are not allowed to be longer than 4096 chars
#define MSGQ_SIZE 8192
#define STR_PREFIX "/cali_dcdb_"
#define SHM_SIZE (32*1024*1024)
#define SOCK_NAME "DCDBPusherCaliSocket"
typedef struct {
uintptr_t pc;
......@@ -97,6 +98,10 @@ typedef struct {
static constexpr size_t lookup_data_offset = 2*sizeof(size_t)
+ 2*sizeof(sem_t)
+ MSGQ_SIZE*sizeof(snap_data);
/*******************************************************************************
* End of common defines
******************************************************************************/
public:
CaliperSensorGroup(const std::string& name);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment