Commit de7a1044 authored by Alessio Netti's avatar Alessio Netti
Browse files

Merge remote-tracking branch 'origin/development' into development

parents c5a7f5a7 b6c1d9f6
......@@ -55,10 +55,18 @@
#include "caliper/common/Log.h"
#include "caliper/common/RuntimeConfig.h"
#include <Symtab.h>
#include <LineInformation.h>
#include <Function.h>
#include <AddrLookup.h>
#include <atomic>
#include <cstdio>
#include <cxxabi.h>
#include <dlfcn.h>
#include <errno.h>
#include <features.h>
#include <mutex>
#include <sched.h>
#include <string.h>
#include <sys/socket.h>
......@@ -68,6 +76,9 @@
using namespace cali;
using namespace Dyninst;
using namespace SymtabAPI;
namespace {
class DcdbPusher {
......@@ -76,16 +87,17 @@ private:
static const ConfigSet::Entry s_configdata[];
/* General service attributes */
unsigned snapshots_processed = 0;
unsigned snapshots_failed = 0;
Attribute sampler_pc { Attribute::invalid };
Attribute sampler_fun { Attribute::invalid };
Attribute timestamp { Attribute::invalid };
Attribute thread_id { Attribute::invalid };
int sock;
/* For binary name look-up */
typedef struct {
unsigned long long start_addr;
unsigned long long end_addr;
......@@ -94,11 +106,64 @@ private:
std::vector<addr_range> proc_map; // buffer for memory regions from /proc/self/map for faster lookup
//for thread safety
//for thread safe binary look-up
std::atomic_flag writer_lock;
std::atomic_flag reader_lock;
std::atomic<unsigned> proc_map_readers;
/* For function name look-up */
// The function name look-up code is stolen from the SymbolLookup (SL) service.
// SL service is intended to be triggered on snapshot flush and can cause problems
// if triggered on snapshot-processing, therefore we do our own look-up.
// As side-effect we can avoid overhead for all the additional SL service look-up
// stuff which is not required here.
AddressLookup* m_lookup;
std::mutex m_lookup_mutex;
unsigned m_num_lookups;
unsigned m_num_failed;
#if 0
/**
* Look up to which function the program counter points.
* Logic borrowed from SymbolLookup service.
*/
std::string lookup_function(unsigned long long pc) {
SymtabAPI::Function* function = 0;
std::string funcname = "UNKNOWN";
bool ret_func = false;
{
std::lock_guard<std::mutex>
g(m_lookup_mutex);
if (!m_lookup)
return "UNKNOWN";
Symtab* symtab;
Offset offset;
bool ret = m_lookup->getOffset(pc, symtab, offset);
if (ret)
ret_func = symtab->getContainingFunction(offset, function);
++m_num_lookups;
}
if (ret_func && function) {
auto it = function->pretty_names_begin();
if (it != function->pretty_names_end())
funcname = *it;
}
if (!ret_func)
++m_num_failed; // not locked, doesn't matter too much if it's slightly off
return funcname;
}
#endif
/**
* Look up to which object file a given program counter points. This is done
......@@ -106,31 +171,32 @@ private:
* matching address range is found we rebuild the internal cache and try
* once again.
*/
std::string lookup_pathname(unsigned long long pc, Channel* chn, bool retry=false) {
void lookup_pathname(unsigned long long pc, char* const buf, Channel* chn, bool retry=false) {
//we get blocked here if proc_map is currently rebuild
while(reader_lock.test_and_set(std::memory_order_acquire)) { }
proc_map_readers++;
reader_lock.clear(std::memory_order_release);
std::string pn = "";
for(const auto& entry : proc_map) {
if (pc >= entry.start_addr && pc <= entry.end_addr) {
pn = entry.pathname;
break;
strncpy(buf, entry.pathname.c_str(), 2032);
proc_map_readers--;
return;
}
}
proc_map_readers--;
if (pn == "" && !retry) {
if (!retry) {
Log(1).stream() << chn->name() << ": DcdbPusher: Rebuilding proc_map" << std::endl;
setup_proc_map(chn);
} else {
return pn;
buf[0] = '\0';
return;
}
return lookup_pathname(pc, chn, true);
lookup_pathname(pc, buf, chn, true);
return;
}
/**
......@@ -202,25 +268,20 @@ private:
}
void post_init_cb(Caliper* c, Channel* chn) {
// manually issue a pre-flush event to set up the SymbolLookup service
chn->events().pre_flush_evt(c, chn, nullptr);
// Check if required services sampler, symbollookup, timestamp and
// Check if required services sampler, timestamp and
// pthread are active by searching for identifying attributes.
//TODO support data collection if event triggered snapshots are used
sampler_pc = c->get_attribute("cali.sampler.pc");
sampler_fun = c->get_attribute("source.function#cali.sampler.pc");
timestamp = c->get_attribute("time.timestamp");
thread_id = c->get_attribute("pthread.id");
if (sampler_pc == Attribute::invalid ||
sampler_fun == Attribute::invalid ||
timestamp == Attribute::invalid ||
thread_id == Attribute::invalid) {
Log(1).stream() << chn->name() << ": DcdbPusher: not all required services "
"sampler, symbollookup, timestamp and pthread are running." << std::endl;
"sampler, timestamp and pthread are running." << std::endl;
return;
}
......@@ -230,6 +291,25 @@ private:
return;
}
{
//TODO refresh may be necessary for the same reasons proc_map has to be rebuild
//FIXME should probably go into own function.
std::lock_guard<std::mutex>
g(m_lookup_mutex);
if (!m_lookup) {
m_lookup = AddressLookup::createAddressLookup();
if (!m_lookup) {
Log(0).stream() << "DcdbPusher: Could not create address lookup object"
<< std::endl;
return;
}
m_lookup->refresh();
}
}
sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if(sock == -1) {
......@@ -273,49 +353,61 @@ private:
return;
}
std::vector<Entry> rec = sbuf->to_entrylist();
Entry timestamp_entry = sbuf->get(timestamp);
Entry sampler_pc_entry = sbuf->get(sampler_pc);
// manually issue a postprocess-snapshot event to enrich snapshot with
// symbollookup data
chn->events().postprocess_snapshot(c, chn, rec);
unsigned long long time = timestamp_entry.value().to_uint();
unsigned long long pc = sampler_pc_entry.value().to_uint();
char buf[2048];
std::string time;
std::string value;
std::string func_name;
std::string file_name;
size_t bufCnt = 0;
size_t bufSize = 4096;
char buf[bufSize];
char file_name[2032];
char* func_name = NULL;
unsigned cpu = 0;
for (const Entry& e : rec) {
bufCnt += snprintf(buf, 21, "%llu", time);
bufCnt++; //count terminating NUL char
//look up required attribute values in snapshot
cali_id_t entryId = c->get_attribute(e.attribute()).id();
if (bufCnt > 21) {
Log(1).stream() << chn->name() << ": DcdbPusher: Timestamp has more digits than expected" << std::endl;
}
if (entryId == timestamp.id()) {
time = e.value().to_string();
} else if (entryId == sampler_fun.id()) {
func_name = e.value().to_string();
} else if (entryId == sampler_pc.id()) {
unsigned long long pc = e.value().to_uint();
file_name = lookup_pathname(pc, chn);
}
lookup_pathname(pc, file_name, chn);
//lookup_function(pc, func_name);
void* f = (void*) pc;
Dl_info dlinfo;
if (dladdr(f, &dlinfo) && dlinfo.dli_sname != NULL) {
#if 0
Log(1).stream() << chn->name() << ": DcdbPusher: dladdr() retrieved \""
<< dlinfo.dli_fname << "::" << dlinfo.dli_sname << "\"" << std::endl;
#endif
//TODO clean up, use pre-allocated buffer for demangling
int status;
func_name = abi::__cxa_demangle(dlinfo.dli_sname, NULL, NULL, &status);
} else {
Log(1).stream() << chn->name() << ": DcdbPusher: dladdr() failed" << std::endl;
}
#if __GLIBC_PREREQ(2, 29)
unsigned cpu;
if (!getcpu(&cpu, NULL)) {
value = "/cpu" + std::to_string(cpu);
if (getcpu(&cpu, NULL)) {
Log(1).stream() << chn->name() << ": DcdbPusher: getcpu() failed" << std::endl;
}
#else
int cpu = sched_getcpu();
if (cpu != -1) {
value = "/cpu" + std::to_string(cpu);
int cpuInt = sched_getcpu();
if (cpuInt != -1) {
cpu = cpuInt;
} else {
Log(1).stream() << chn->name() << ": DcdbPusher: sched_getcpu() failed" << std::endl;
}
#endif
value += "/" + file_name + "/" + func_name;
size_t tCnt = bufCnt;
bufCnt += snprintf(&buf[bufCnt], bufSize-bufCnt, "/cpu%u/%s::%s", cpu, file_name, func_name);
bufCnt++; //count terminating NUL char
if ((value.length() + time.length() + 2) > 2048) {
if (bufCnt > bufSize) {
Log(1).stream() << chn->name() << ": DcdbPusher: value exceeding buffer size" << std::endl;
++snapshots_failed;
return;
......@@ -323,13 +415,10 @@ private:
#if 0
Log(1).stream() << chn->name() << ": DcdbPusher: Sending \""
<< value << "\" (" << time << ")" << std::endl << std::endl;
<< &buf[tCnt] << "\" (" << buf << ")" << std::endl << std::endl;
#endif
strncpy(buf, time.c_str(), time.length()+1);
strncpy(&buf[time.length()+1], value.c_str(), value.length()+1);
if (send(sock, buf, value.length() + time.length() + 2, 0) == -1) {
if (send(sock, buf, bufCnt, 0) == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to send message: "
<< strerror(errno) << std::endl;
++snapshots_failed;
......@@ -344,6 +433,8 @@ private:
}
Log(1).stream() << chn->name() << ": DcdbPusher: "
<< m_num_lookups << " address lookups, "
<< m_num_failed << " failed." << std::endl
<< snapshots_processed << " snapshots processed of which "
<< snapshots_failed << " failed." << std::endl;
}
......
......@@ -17,7 +17,7 @@
sbuf->append(timestamp_attr.id(),
- Variant(cali_make_variant_from_uint(chrono::system_clock::to_time_t(chrono::system_clock::now()))));
+ Variant(cali_make_variant_from_uint(
+ chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now().time_since_epoch()).count())));
+ chrono::duration_cast<chrono::nanoseconds>(chrono::system_clock::now().time_since_epoch()).count())));
}
void post_init_cb(Caliper* c, Channel* chn) {
......@@ -166,7 +166,7 @@ void CaliperSensorGroup::read() {
reading_t reading;
reading.value = 1;
reading.timestamp = US_TO_NS(std::stoull(timestamp)); //Caliper timestamps are microseconds since UNIX epoch
reading.timestamp = std::stoull(timestamp);
S_Ptr s;
auto it = _sensorIndex.find(feName);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment