Commit 1a6374d2 authored by Carla Guillen's avatar Carla Guillen
Browse files

Merge branch 'development' of https://gitlab.lrz.de/dcdb/dcdb into development

Local work will be merged to branch
parents 0380961c d7eb5d20
......@@ -34,7 +34,8 @@ LIBS = -L../lib \
-lboost_date_time \
-lboost_log_setup \
-lboost_log \
-lboost_regex
-lboost_regex \
-rdynamic
TARGET = collectagent
......
......@@ -493,13 +493,13 @@ int main(int argc, char* const argv[]) {
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
analyticsController->setCache(&mySensorCache);
if(!analyticsController->initialize(settings, argv[argc - 1]))
return EXIT_FAILURE;
queryEngine.setFilter(analyticsSettings.filter);
queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setJobQueryCallback(jobQueryCallback);
if(!analyticsController->initialize(settings, argv[argc - 1]))
return EXIT_FAILURE;
LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
LOG_VAR(vLogLevel) << "----- Configuration -----";
......
......@@ -56,7 +56,6 @@
#include "caliper/common/Log.h"
#include "caliper/common/RuntimeConfig.h"
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <cxxabi.h>
......@@ -82,37 +81,43 @@ using namespace cali;
namespace {
class DcdbPusher {
#define MAX_SYMBOL_SIZE 512
#define MAX_PATH_SIZE 4096
#define MSGQ_SIZE 8192
typedef struct {
uintptr_t pc;
uint64_t ts;
unsigned short cpu;
} snap_data;
/* Entry for an executable symbol in the symbol table */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
char name[MAX_SYMBOL_SIZE];
} fsym_data;
/* Defines a contiguous executable memory block */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
uintptr_t offset; // offset as parsed from /proc//maps
size_t fsym_offset; // Offset in bytes from the address of this struct
// to the beginning of the associated symbol section
size_t fsym_count; // Number of symbols in this address range
char pathname[MAX_PATH_SIZE]; // Filepath + name of the binary where
// this memory range comes from or
// "[Anonymous]" if unknown
} addr_data;
typedef struct {
uintptr_t pc;
uint64_t ts;
unsigned short cpu;
} snap_data;
/* Entry for an executable symbol in the symbol table */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
char name[MAX_SYMBOL_SIZE];
} fsym_data;
/* Defines a contiguous executable memory block */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
uintptr_t offset; // offset as parsed from /proc//maps
size_t fsym_offset; // Offset in bytes from the address of this struct
// to the beginning of the associated symbol section
size_t fsym_count; // Number of symbols in this address range
char pathname[MAX_PATH_SIZE]; // Filepath + name of the binary where
// this memory range comes from or
// "[Anonymous]" if unknown
} addr_data;
// each thread has a local buffer for relevant snapshot data to reduce
// writes to shm queue as this requires locking
static constexpr size_t shm_buf_size = 1024 / sizeof(snap_data);
static thread_local size_t shm_buf_idx;
static thread_local snap_data shm_buf[shm_buf_size];
class DcdbPusher {
private:
......@@ -130,8 +135,10 @@ private:
* Layout of shared-memory file used to communicate with dcdbpusher plugin:
*
* //Communication queue, aka ring buffer:
* size_t r_index
* size_t w_index
* size_t r_index //points to the last read element
* size_t w_index //points to the last written element
* sem_t r_sem;
* sem_t w_sem;
* snap_data[MSGQ_SIZE]
*
* //symbol lookup data
......@@ -139,19 +146,16 @@ private:
* addr_data[addr_count]
* addr_count * (fsym_data[addr_data.fsym_count])
*/
static constexpr size_t lookup_data_offset = 2*sizeof(size_t)
+ 2*sizeof(sem_t)
+ MSGQ_SIZE*sizeof(snap_data);
//TODO close shm file at end
void* shm; // pointer to shared memory object
size_t shm_size; // size of shm in bytes
int shm_file; // fd of the underlying shared memory file
int sock; // unix socket fd for initial shm setup communication
std::atomic_flag shm_wlock; // for thread safe writing to shm queue
// each thread has a local buffer for relevant snapshot data to reduce
// writes to shm queue as this requires locking
static constexpr size_t shm_buf_size = 1024 / sizeof(snap_data);
static thread_local size_t shm_buf_idx;
static thread_local snap_data shm_buf[shm_buf_size];
const std::string pid_str; // PID at process start
bool initialized;
/*
......@@ -231,7 +235,6 @@ private:
int status = -1;
fsym_data symdat;
//TODO sym.st_name == STN_UNDEF?
symstr = elf_strptr(elf, shdr.sh_link, sym.st_name);
/* Demangle if necessary. Require GNU v3 ABI by the "_Z" prefix. */
......@@ -242,8 +245,10 @@ private:
if (status == 0) {
strncpy(symdat.name, dsymstr, 512);
free((void*) dsymstr);
} else {
} else if (symstr != NULL) {
strncpy(symdat.name, symstr, 512);
} else {
symdat.name[0] = '\0';
}
symdat.name[511] = '\0';
......@@ -298,8 +303,7 @@ private:
addr_data* addr_ptr;
//some pointer arithmetic for the beginning to get appropriate start pointers
size_t& addr_cnt = *(reinterpret_cast<size_t*>(static_cast<char*>(shm)
+ 2*sizeof(size_t) + MSGQ_SIZE*sizeof(snap_data)));
size_t& addr_cnt = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + lookup_data_offset));
addr_data* const addr_start = reinterpret_cast<addr_data*>(&addr_cnt + 1);
addr_ptr = addr_start;
......@@ -409,6 +413,67 @@ private:
printf("Shm: %p, fsym_ptr:%p\n", shm, (void*) fsym_ptr);
}
/**
* Writes the thread's snap_data buffer into shm
*/
bool flush_buf() {
if (shm_buf_idx == 0) {
return true;
}
size_t r_index;
sem_t* r_sem;
sem_t* w_sem;
snap_data* msg_queue;
r_sem = reinterpret_cast<sem_t*>(static_cast<char*>(shm) + 2*sizeof(size_t));
w_sem = r_sem + 1;
msg_queue = reinterpret_cast<snap_data*>(w_sem + 1);
//TODO atomic load/stores instead of semaphore locking?
if (sem_wait(r_sem)) {
return false;
}
r_index = *(reinterpret_cast<size_t*>(static_cast<char*>(shm)));
sem_post(r_sem);
if (sem_trywait(w_sem)) {
return false;
}
size_t& w_index = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + sizeof(size_t)));
if (w_index < r_index) {
if (shm_buf_size <= (r_index - w_index - 1)) {
memcpy(&(msg_queue[w_index+1]), shm_buf, shm_buf_size*sizeof(snap_data));
w_index += shm_buf_size;
} else {
goto fail;
}
} else {
if (shm_buf_size <= (MSGQ_SIZE - w_index + r_index - 1)) {
size_t sep = MSGQ_SIZE - w_index - 1;
memcpy(&(msg_queue[w_index+1]), shm_buf, sep*sizeof(snap_data));
memcpy(msg_queue, &(shm_buf[sep]), (shm_buf_size-sep)*sizeof(snap_data));
w_index += shm_buf_size;
w_index %= MSGQ_SIZE;
} else {
goto fail;
}
}
sem_post(w_sem);
return true;
fail:
sem_post(w_sem);
return false;
}
void post_init_cb(Caliper* c, Channel* chn) {
// Check if required services sampler, timestamp and
// pthread are active by searching for identifying attributes.
......@@ -427,10 +492,9 @@ private:
return;
}
const pid_t pid = getpid();
const std::string shm_name = "/cali_dcdb_" + std::to_string(pid);
shm_file = shm_open(shm_name.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
std::string bah("/cali_dcdb_");
bah += pid_str.c_str();
shm_file = shm_open(bah.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
if (shm_file == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to open shm_file: "
<< strerror(errno) << std::endl;
......@@ -445,6 +509,7 @@ private:
if (shm == (void*) -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to mmap shm_file: "
<< strerror(errno) << std::endl;
shm = NULL;
return;
}
......@@ -454,9 +519,34 @@ private:
return;
}
//init r/w_index
*(reinterpret_cast<size_t*>(static_cast<char*>(shm))) = MSGQ_SIZE - 1;
*(reinterpret_cast<size_t*>(static_cast<char*>(shm) + sizeof(size_t))) = 0;
//TODO fit shm size to used memory?
sem_t* r_sem;
sem_t* w_sem;
r_sem = reinterpret_cast<sem_t*>(static_cast<char*>(shm) + 2*sizeof(size_t));
w_sem = r_sem + 1;
if (sem_init(r_sem, 1, 1)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to init r_sem: "
<< strerror(errno) << std::endl;
return;
}
if (sem_init(r_sem, 1, 1)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to init w_sem: "
<< strerror(errno) << std::endl;
return;
}
print_debug_shm();
return;
//tell pusher plugin our PID so it can access our shared memory
//UNIX socket used for communication
sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (sock == -1) {
......@@ -473,12 +563,24 @@ private:
if (connect(sock, (struct sockaddr*) &addr, sizeof(addr))) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to connect socket: "
<< strerror(errno) << std::endl;
<< strerror(errno) << std::endl;
close(sock);
sock = -1;
return;
}
ssize_t res = send(sock, pid_str.c_str(), pid_str.length(), 0);
shutdown(sock, SHUT_WR);
close(sock);
sock = -1;
if (res == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to send message: "
<< strerror(errno) << std::endl;
return;
}
initialized = true;
}
......@@ -509,82 +611,96 @@ private:
// *timestamp (from snapshot)
// *program counter (from snapshot)
// *cpu we are running on
unsigned long long time = timestamp_entry.value().to_uint();
unsigned long long pc = sampler_pc_entry.value().to_uint();
unsigned cpu = 0;
snap_data sdat;
sdat.ts = timestamp_entry.value().to_uint();
sdat.pc = sampler_pc_entry.value().to_uint();
sdat.cpu = 0;
#if __GLIBC_PREREQ(2, 29)
if (getcpu(&cpu, NULL)) {
unsigned cpuUInt;
if (getcpu(&cpuUInt, NULL)) {
Log(1).stream() << chn->name() << ": DcdbPusher: getcpu() failed" << std::endl;
} else {
sdat.cpu = cpuUInt;
}
#else
int cpuInt = sched_getcpu();
if (cpuInt != -1) {
cpu = cpuInt;
sdat.cpu = cpuInt;
} else {
Log(1).stream() << chn->name() << ": DcdbPusher: sched_getcpu() failed" << std::endl;
}
#endif
//TODO use socket only for initial communication. All other communication via shared memory
// TODO msync after write to shm file
size_t bufSize = 1024, bufCnt;
char buf[bufSize];
if (send(sock, buf, bufCnt, 0) == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to send message: "
<< strerror(errno) << std::endl;
++snapshots_failed;
shm_buf[shm_buf_idx] = sdat;
if (++shm_buf_idx == shm_buf_size) {
if(!flush_buf()) {
//try again on next snapshot. Will lose this snapshot though
--shm_buf_idx;
++snapshots_failed;
}
shm_buf_idx = 0;
}
}
void finish_cb(Caliper* c, Channel* chn) {
if(sock != -1) {
shutdown(sock, SHUT_WR);
close(sock);
//TODO terminate connection on pusher plugin side after timeout
if (shm != NULL) {
munmap(shm, shm_size);
shm = NULL;
}
// TODO close/unmap shm_file?
if(shm_file != -1) {
std::string bah("/cali_dcdb_");
bah += pid_str.c_str();
shm_unlink(bah.c_str());
close(shm_file);
shm_file = -1;
}
//TODO destroy semaphores
Log(1).stream() << chn->name() << ": DcdbPusher: "
<< snapshots_processed << " snapshots processed of which "
<< snapshots_failed << " failed." << std::endl;
}
//TODO flush thread buffer on thread exit! possible with release_thread_cb?
// void create_thread_cb(Caliper* c, Channel* chn) {
// // init trace buffer on new threads
// acquire_tbuf(c, chn, true);
// }
//
// void release_thread_cb(Caliper* c, Channel* chn) {
// TraceBuffer* tbuf = acquire_tbuf(c, chn, false);
//
// if (tbuf) {
// tbuf->retired.store(true);
//
// std::lock_guard<util::spinlock>
// g(tbuf_lock);
//
// ++num_retired;
// }
// }
void create_thread_cb(Caliper* c, Channel* chn) {
shm_buf_idx = 0;
}
void release_thread_cb(Caliper* c, Channel* chn) {
if (!flush_buf()) {
snapshots_failed += shm_buf_idx;
}
}
DcdbPusher(Caliper* c, Channel* chn) :
shm(NULL),
shm_size(64 * 1024 * 1024),
shm_size(32 * 1024 * 1024),
shm_file(-1),
sock(-1),
pid_str(std::to_string(getpid())),
initialized(false) {
shm_buf_idx = 0;
}
public:
~DcdbPusher() { }
~DcdbPusher() {}
static void dcdbpusher_register(Caliper* c, Channel* chn) {
DcdbPusher* instance = new DcdbPusher(c, chn);
chn->events().create_thread_evt.connect(
[instance](Caliper* c, Channel* chn){
instance->create_thread_cb(c, chn);
});
chn->events().release_thread_evt.connect(
[instance](Caliper* c, Channel* chn){
instance->release_thread_cb(c, chn);
});
chn->events().post_init_evt.connect(
[instance](Caliper* c, Channel* chn){
instance->post_init_cb(c, chn);
......
......@@ -145,7 +145,7 @@ void PhysicalSensorCache::populate(Connection* connection, SensorConfig& sc, uin
CassStatement* statement = NULL;
CassFuture *future = NULL;
const CassPrepared* prepared = nullptr;
const char* queryBefore = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ts DESC LIMIT " PSC_READ_BEHIND;
const char* queryBefore = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT " PSC_READ_BEHIND;
const char* queryAfter = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts > ? LIMIT " PSC_READ_AHEAD;
/* Query before... */
......
......@@ -235,7 +235,7 @@ void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DC
functName = match[1].str();
str = match[2].str();
}
boost::regex sensorRegex("([^\\%]+)\\%?([^\\%]*)", boost::regex::extended);
boost::regex sensorRegex("([^\\@]+)\\@?([^\\@]*)", boost::regex::extended);
if(boost::regex_search(str, match, sensorRegex)) {
sensorName = match[1].str();
modifierStr = match[2].str();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment