Commit ae109036 authored by Carla Guillen's avatar Carla Guillen
Browse files

Merge branch 'development' of https://gitlab.lrz.de/dcdb/dcdb into development

parents be0defe1 f45d3c62
......@@ -161,13 +161,14 @@ private:
+ 3*sizeof(sem_t)
+ MSGQ_SIZE*sizeof(snap_data)
+ sizeof(int);
//TODO test dynamic rebuild at runtime
//TODO investigate consequences of fork() on Caliper - DCDBPusher construct
void* shm; // pointer to shared memory object
int shm_file; // fd of the underlying shared memory file
size_t shm_bytes_written; // keep track if we exceed shm_size
int sock; // unix socket fd for initial shm setup communication
size_t sus_cycle = 15;
std::atomic<bool> run_sus;
std::thread sus; //symbol update service
......@@ -175,14 +176,16 @@ private:
bool initialized;
/*
* Retrieve function symbols from an ELF file and
* store them in a file at a given offset
* Retrieve function symbols from an ELF file and store them at a given
* pointer location.
*
* @param filename ELF file (binary or shared library) to retrieve symbol info from
* @param start_addr Only store symbols whose address is in between start_addr and end_addr
* @param end_addr See start_addr
* @param offset Offset of start_addr from the beginning of the file
* @param dest_ptr Pointer to memory where to store fsym_data. Will be modified
* to point behind the last written element on return.
* @param chn Caliper Channel for logging output.
* @return The number of symbol entries written.
*
* --NOTE--
......@@ -226,6 +229,7 @@ private:
elf = elf_begin(fd, ELF_C_READ, NULL);
gelf_getehdr(elf, &ehdr);
//check if ELF file type is supported
if (ehdr.e_type != ET_DYN && ehdr.e_type != ET_EXEC) {
// we should only encounter executables and shared libraries during runtime
// we could not process other types anyway
......@@ -234,23 +238,26 @@ private:
}
sym_offset = (ehdr.e_type == ET_DYN ? (start_addr - offset) : 0);
//search for symtab (= symbol table) section
while ((scn = elf_nextscn(elf, scn)) != NULL) {
gelf_getshdr(scn, &shdr);
if (shdr.sh_type == SHT_SYMTAB) {
/* found symbol table */
// found symbol table
data = elf_getdata(scn, NULL);
break;
}
}
//check if symtab section found and if it is usable
if (scn == NULL || shdr.sh_entsize == 0) {
Log(1).stream() << chn->name() << ": DcdbPusher: \"" << filename
<< "\": No symbol table present. Falling back to dynamic symtab." << std::endl;
scn = NULL;
//Fall back to dynamic symbol table. This one should always be present
while ((scn = elf_nextscn(elf, scn)) != NULL) {
gelf_getshdr(scn, &shdr);
if (shdr.sh_type == SHT_DYNSYM) {
/* found dynamic symbol table */
// found dynamic symbol table
data = elf_getdata(scn, NULL);
break;
}
......@@ -271,11 +278,10 @@ private:
for (int ii = 0; ii < count; ++ii) {
GElf_Sym sym;
gelf_getsym(data, ii, &sym);
//if (gelf_getsym(data, ii, &sym) == NULL) {
// Log(1).stream() << chn->name() << ": DcdbPusher: Got no symbol" << std::endl;
// continue;
//}
if (gelf_getsym(data, ii, &sym) == NULL) {
Log(1).stream() << chn->name() << ": DcdbPusher: Got no symbol" << std::endl;
continue;
}
if (GELF_ST_TYPE(sym.st_info) != STT_FUNC || //only interested in symbols related to executable code
sym.st_shndx == SHN_UNDEF || //external symbol
......@@ -286,7 +292,6 @@ private:
//resolve symbol name
char* symstr;
char* dsymstr;
int status = -1;
fsym_data symdat;
char symdat_name[MAX_SYMBOL_SIZE];
......@@ -294,7 +299,8 @@ private:
symstr = elf_strptr(elf, shdr.sh_link, sym.st_name);
if (symstr != NULL) {
/* Demangle if necessary. Require GNU v3 ABI by the "_Z" prefix. */
// Demangle if necessary. Require GNU v3 ABI by the "_Z" prefix.
int status = -1;
if (symstr[0] == '_' && symstr[1] == 'Z') {
dsymstr = abi::__cxa_demangle(symstr, NULL, NULL, &status);
}
......@@ -328,7 +334,7 @@ private:
// sym.st_size);
shm_bytes_written += sizeof(fsym_data);
shm_bytes_written += strlen(symdat_name) + 1;
shm_bytes_written += symdat.str_size;
if (shm_bytes_written <= SHM_SIZE) {
memcpy(dest_ptr, &symdat, sizeof(fsym_data));
++dest_ptr;
......@@ -339,11 +345,9 @@ private:
} else {
//unlikely (at least it should be)
shm_bytes_written -= sizeof(fsym_data);
shm_bytes_written -= strlen(symdat_name) + 1;
shm_bytes_written -= symdat.str_size;
Log(1).stream() << chn->name() << ": DcdbPusher: Not enough shared memory!" << std::endl;
elf_end(elf);
close(fd);
return entryCnt;
break;
}
} else {
// printf("Symbol %s out of mem range (%llx-%llx, size %llx)\n", symdat.name,
......@@ -383,10 +387,11 @@ private:
char buf[MAX_PATH_SIZE];
addr_data* addr_ptr;
shm_bytes_written = lookup_data_offset;
shm_bytes_written = lookup_data_offset + sizeof(size_t);
//some pointer arithmetic for the beginning to get appropriate start pointers
size_t& addr_cnt = *(reinterpret_cast<size_t*>(static_cast<char*>(shm) + lookup_data_offset));
addr_data* const addr_start = reinterpret_cast<addr_data*>(&addr_cnt + 1);
addr_cnt = 0;
addr_ptr = addr_start;
//read mapped address ranges from /proc/self/maps
......@@ -449,6 +454,8 @@ private:
if (addr_ptr->pathname[0] == '/') {
//debug
//printf("Parsing symbols for %s (%llx-%llx; %llx)\n", addr_ptr->pathname, addr_ptr->start_addr, addr_ptr->end_addr, addr_ptr->offset);
//write_function_symbols() updates shm_bytes_written as side effect
size_t tmp = shm_bytes_written;
addr_ptr->fsym_count = write_function_symbols(addr_ptr->pathname,
addr_ptr->start_addr,
......@@ -477,25 +484,31 @@ private:
}
/**
* Symbol update service background thread. Whenever the DCDB Pusher plugin
* could not resolve a snapshot it notifies us and this thread comes into
* play. Updates the symbol data in the shared memory file.
* Stop symbol update service background thread.
*/
void startSUS(Channel* chn) {
if (run_sus.load(std::memory_order_acquire) ) {
void stopSUS() {
if (run_sus.load(std::memory_order_acquire)) {
run_sus.store(false, std::memory_order_release);
if (sus.joinable()) {
sus.join();
}
}
}
/**
* Symbol update service background thread. Whenever the DCDB Pusher plugin
* could not resolve a snapshot it notifies us and this thread comes into
* play. Updates the symbol data in the shared memory file.
*/
void startSUS(Channel* chn) {
stopSUS();
run_sus.store(true, std::memory_order_release);
sus = std::thread([=]() {
//to increase responsiveness on termination we sleep only 1 second at most
//however, to reduce update overhead we only check the update flag every 15 seconds
//To increase responsiveness on termination we sleep only 1 second at most.
//However, to reduce update overhead we only check the update flag every 15 seconds
unsigned short cnt = 0;
constexpr unsigned short maxCnt = 15;
while (run_sus.load(std::memory_order_acquire)) {
if (cnt == maxCnt) {
if (cnt == sus_cycle) {
sem_t* u_sem = reinterpret_cast<sem_t*>(static_cast<char*>(shm)
+ 2*sizeof(size_t)
+ 2*sizeof(sem_t));
......@@ -506,6 +519,7 @@ private:
int& updateSymbolData = *(reinterpret_cast<int*>(static_cast<char*>(shm) + lookup_data_offset - sizeof(int)));
if (updateSymbolData != 0) {
Log(1).stream() << chn->name() << ": DcdbPusher: Updating symbol index" << std::endl;
if (!setup_shm(chn)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to setup shm"
<< std::endl;
......@@ -666,7 +680,9 @@ private:
w_sem = r_sem + 1;
u_sem = w_sem + 1;
if (sem_init(r_sem, 1, 1) || sem_init(w_sem, 1, 1) || sem_init(u_sem, 1, 1)) {
if (sem_init(r_sem, 1, 1) ||
sem_init(w_sem, 1, 1) ||
sem_init(u_sem, 1, 1)) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to init semaphore: "
<< strerror(errno) << std::endl;
return;
......@@ -719,7 +735,7 @@ private:
void process_snapshot_cb(Caliper* c, Channel* chn, const SnapshotRecord*, const SnapshotRecord* sbuf) {
++snapshots_processed;
// FIXME sampler invoked snapshots are always signals, so better do nothing signal "un-safe"
// NOTE sampler invoked snapshots are always signals, so better do nothing signal "un-safe"
//if (c->is_signal()) {
// ++snapshots_failed;
// return;
......@@ -771,18 +787,14 @@ private:
//try again on next snapshot. Will lose this snapshot though
--shm_buf_idx;
++snapshots_failed;
} else {
shm_buf_idx = 0;
}
shm_buf_idx = 0;
}
}
void finish_cb(Caliper* c, Channel* chn) {
if (run_sus.load(std::memory_order_acquire)) {
run_sus.store(false, std::memory_order_release);
if (sus.joinable()) {
sus.join();
}
}
stopSUS();
if (shm != NULL) {
munmap(shm, SHM_SIZE);
......@@ -819,6 +831,10 @@ private:
pid_str(std::to_string(getpid())),
initialized(false) {
shm_buf_idx = 0;
ConfigSet cfg = chn->config().init("dcdbpusher", s_configdata);
sus_cycle = cfg.get("sus_cycle").to_uint();
}
public:
......@@ -854,6 +870,14 @@ public:
}
}; // class DcdbPusher
const ConfigSet::Entry DcdbPusher::s_configdata[] = {
{ "sus_cycle", CALI_TYPE_UINT, "15",
"Symbol update service cycle in seconds (time between checks if update required",
"Symbol update service cycle in seconds (time between checks if update required" },
ConfigSet::Terminator
};
} // namespace
namespace cali {
......
......@@ -20,7 +20,8 @@
7. [OPA](#opa)
1. [counterData](#opaCounterData)
8. [ProcFS](#procfs)
9. [Writing own plugins](#writingOwnPlugins)
9. [Caliper](#caliper)
10. [Writing own plugins](#writingOwnPlugins)
## Introduction <a name="introduction"></a>
DCDB (DataCenter DataBase) is a database to collect various (sensor-)values of a datacenter for further analysis.
......@@ -418,6 +419,7 @@ Explanation of the values specific for the IPMI plugin:
#### Footnotes <a name="ipmiFootnotes"></a>
<a name="ipmifn1">**1**</a>: &ensp; Use lsb > msb values if response is Little-endian (LSB first), use lsb < msb values if response is Big-Endian (MSB first). Maximum length is 8 bytes.
## Perf-event <a name="perf"></a>
The Perfevent functionality is tasked with collecting data from the CPUs various performance counters (PMUs).
......@@ -501,6 +503,7 @@ The existence of the perf_event_paranoid file is the official method for determi
<a name="fn2">**2**</a>: &ensp; If type is *PERF_TYPE_RAW*, then a custom "raw" config value is needed. Most CPUs support events that are not covered by the "generalized" events. These are implementation defined; see your CPU manual (for example the Intel Volume 3B documentation or the AMD BIOS and Kernel Developer Guide). The libpfm4 library can be used to translate from the name in the architectural manual to the raw hex value perf_event_open() expects in this field.
<a name="fn3">**3**</a>: &ensp; Custom type and Config values can be specified to use the PMU of a specific device. The necessary configuration parameters can be obtained from the type and config files the respective in /sys/devices/<device> tree.
## snmp <a name="snmp"></a>
The SNMP plugin enables dcdbpusher to talk with devices which have an SNMP agent running and query requests from them. A SNMP sensor corresponds to a single value as identified by the unique OID. Sensors are aggregated by connections. See the exemplary snmp.conf file in the `config/` directory.
......@@ -664,6 +667,27 @@ The "type" field can be inferred for each sensor by simply checking the underlyi
Additional CPU-related metrics (that may be introduced in future versions of the Linux kernel) are not supported by the DCDB ProcFS plugin.
Note that for /proc/meminfo instances, an additional synthetic sensor of type "MemUsed" can be defined. This sensor will automatically extract the amount of used memory from the MemTotal and MemFree values present in meminfo files.
## Caliper <a name="caliper"></a>
The Caliper plugin collects application sample data and therefore allows for application performance analysis in retrospect. The plugin receives program counter (PC) values at periodic time intervals from the [Caliper](https://github.com/LLNL/Caliper) framework and tries to resolve the PC to a symbol name (aka function name) during runtime. Currently, this plugin is intended to get insight into usage of provided system libraries used by applications.
This plugin is special as it does not work on its own but also requires a corresponding Caliper framework service running on application side. Please see Caliper's [official documentation](https://software.llnl.gov/Caliper/) for an exhaustive introduction.
### Caliper framework side
Caliper is an application introspection system. Its functionality stems from so called services. To work with the Pusher plugin the custom Dcdbpusher service for Caliper is required as well as the stock pthread and sampler service. Further on, a patched version of the stock timestamp Caliper service is required for nanosecond precision.
Caliper has to be integrated into the application. This can be done either manually from the application developer or more automated by the system administrator by "hijacking" applications, e.g. overwriting main methods before execution. For the dcdbpusher service it is sufficient to use the Caliper framework just once, i.e. initialize it somewhere. However, one can still use the full functionality of Caliper services at own will in parallel.
The dcdbpusher service retrieves all symbol (function name) data from the application and associated libraries and stores it in a file shared with the Pusher plugin. The service processes snapshots from the sampler on a per-thread basis. It retrieves all required data (program counter, cpu and timestamp) and makes the data accessible for the Pusher plugin via a queue realized in the shared memory file.
### Pusher plugin side
The pusher plugin serves as data sink for the snapshot data from the Caliper service. It can handle multiple different applications at once. However, it is mainly intended for only one application with multiple threads/(MPI-)processes.
The plugin consumes the PC (snapshot) data from shared memory and resolves it to function names via the provided shared symbol data. If a PC value could not be resolved it requests a rebuild of the symbol data index. Every read cycle the plugin consumes all snapshots available in the queue of a process.
From every snapshot the plugin builds a name of the form CPU/BinaryFile::functionName. CPU is the cpu number where the snapshot was captured, BinaryFile the full path of the executable or library the PC resolves to and functionName the symbol within the binary (optional, as functionName cannot be resolved always). For every unique name a new sensor is created. The number of encounters of a sensor name during one read cycle gets stored in the sensor from where it will be pushed to the CollectAgent. Therefore the read cycle interval also determines the granularity of the sampling data. A lower interval results in more fine-grained sampling data resolution but also requires more memory in the storage backend. After an application terminates and "disconnects" the corresponding sensors may get cleared.
Explanation of the values specific for this plugin:
| Value | Explanation |
|:----- |:----------- |
| timeout | Number of read cycles after which an Caliper-application is assumed to be terminated if no new values have been received. Connection (shared memory) is teared down on timeout.
## Writing own plugins <a name="writingOwnPlugins"></a>
First make sure you read the [plugins](#plugins) section.
......
......@@ -5,4 +5,5 @@ global {
group cali {
interval 100
mqttprefix 01
timeout 15
}
......@@ -42,4 +42,7 @@ void CaliperConfigurator::sensorBase(CaliperSensorBase& s, CFG_VAL config) {
void CaliperConfigurator::sensorGroup(CaliperSensorGroup& s, CFG_VAL config) {
s.setGlobalMqttPrefix(_mqttPrefix);
ADD {
ATTRIBUTE("timeout", setTimeout);
}
}
......@@ -41,6 +41,7 @@
CaliperSensorGroup::CaliperSensorGroup(const std::string& name) :
SensorGroupTemplate(name),
_timeout(15),
_socket(-1),
_connection(-1),
_globalMqttPrefix("") {
......@@ -49,6 +50,7 @@ CaliperSensorGroup::CaliperSensorGroup(const std::string& name) :
CaliperSensorGroup::CaliperSensorGroup(const CaliperSensorGroup& other) :
SensorGroupTemplate(other),
_timeout(other._timeout),
_socket(-1),
_connection(-1),
_globalMqttPrefix(other._globalMqttPrefix) {
......@@ -71,6 +73,7 @@ CaliperSensorGroup::~CaliperSensorGroup() {
CaliperSensorGroup& CaliperSensorGroup::operator=(const CaliperSensorGroup& other) {
SensorGroupTemplate::operator=(other);
_timeout = other._timeout;
_socket = -1;
_connection = -1;
_globalMqttPrefix = other._globalMqttPrefix;
......@@ -224,7 +227,8 @@ void CaliperSensorGroup::read() {
//are new elements there at all?
if (r_index == w_index) {
++it.shmFailCnt;
if (it.shmFailCnt > SHM_MAX_RETRIES) {
if (it.shmFailCnt > _timeout) {
LOG(debug) << _groupName << ": Removing process (Timeout)";
//"Timeout". We assume that the application terminated
sem_destroy(r_sem);
sem_destroy(w_sem);
......@@ -236,7 +240,6 @@ void CaliperSensorGroup::read() {
it.shmFile = -1;
doCleanUp = true;
}
LOG(debug) << "No data available (failCnt=" << it.shmFailCnt << ")";
continue;
}
......@@ -261,7 +264,7 @@ void CaliperSensorGroup::read() {
*(reinterpret_cast<size_t*>(static_cast<char*>(it.shm))) = w_index;
sem_post(r_sem);
LOG(debug) << "Processing " << nelems << " snapshots";
//LOG(debug) << _groupName << "Processing " << nelems << " snapshots";
//protect access to symbol data
if (sem_wait(u_sem)) {
......@@ -282,8 +285,10 @@ void CaliperSensorGroup::read() {
const addr_data* const addrs = reinterpret_cast<addr_data*>(static_cast<char*>(it.shm)
+ lookup_data_offset + sizeof(size_t));
bool foundRange = false;
for(size_t j = 0; j < addrCnt; ++j) {
if (pc >= addrs[j].start_addr && pc <= addrs[j].end_addr) {
foundRange = true;
sName += addrs[j].pathname;
const fsym_data* fsyms = reinterpret_cast<const fsym_data*>(
......@@ -299,7 +304,7 @@ void CaliperSensorGroup::read() {
++fsyms;
fsyms = reinterpret_cast<const fsym_data*>(
reinterpret_cast<const char*>(fsyms) + str_size);
} //It's OK if we found no symbol. There are possibly none
} //It's OK if we found no symbol. There are possibly none associated to this range
//store in sensors
S_Ptr s;
......@@ -327,16 +332,21 @@ void CaliperSensorGroup::read() {
cache[sName] = std::make_pair(s, reading);
} else {
//update cache entry. Use timestamp of last aggregated value
//FIXME timestamp is not guaranteed to be the temporal latest as snapshots are buffered thread-wise in Caliper
cache[sName].second.value += reading.value;
cache[sName].second.timestamp = reading.timestamp;
if (reading.timestamp > cache[sName].second.timestamp) {
cache[sName].second.timestamp = reading.timestamp;
}
}
break;
}
}
//PC was not within any range --> tell the update service to do his job
//for the moment we just let this snapshot pass unprocessed
*(reinterpret_cast<int*>(static_cast<char*>(it.shm) + lookup_data_offset - sizeof(int))) = 1;
if (!foundRange) {
//PC was not within any range --> tell the update service to do his job
//We let the current snapshot pass unprocessed
LOG(debug) << _groupName << ": symbol index miss. Requesting rebuild";
*(reinterpret_cast<int*>(static_cast<char*>(it.shm) + lookup_data_offset - sizeof(int))) = 1;
}
}
sem_post(u_sem);
......@@ -363,5 +373,6 @@ void CaliperSensorGroup::read() {
}
void CaliperSensorGroup::printGroupConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Timeout: " << _timeout;
//nothing special to print
}
......@@ -46,7 +46,6 @@ class CaliperSensorGroup : public SensorGroupTemplate<CaliperSensorBase> {
/*******************************************************************************
* Common defines. Keep in sync with DcdbPusher Caliper service
******************************************************************************/
#define SHM_MAX_RETRIES 15 //TODO make configurable
#define MAX_SYMBOL_SIZE 4096 //symbol names are not limited by any means but our memory is
#define MAX_PATH_SIZE 4096 //linux paths are not allowed to be longer than 4096 chars
#define MSGQ_SIZE 8192
......@@ -131,15 +130,14 @@ public:
void printGroupConfig(LOG_LEVEL ll) final override;
void setTimeout(const std::string& timeout) { _timeout = stoul(timeout); }
void setGlobalMqttPrefix(const std::string& prefix) { _globalMqttPrefix = prefix; }
unsigned short getTimeout() const { return _timeout; }
private:
void read() final override;
///@name Helper
///{
///}
struct caliInstance { /**< Bundle all variables required to read values from
one running Caliper instance */
void* shm;
......@@ -147,6 +145,8 @@ private:
size_t shmFailCnt;
};
unsigned short _timeout;
int _socket;
int _connection;
std::string _globalMqttPrefix;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment