Commit e629215b authored by Micha Mueller's avatar Micha Mueller
Browse files

Caliper service: send values via Unix Socket to dcdbpusher

parent a0874ea2
...@@ -54,6 +54,12 @@ ...@@ -54,6 +54,12 @@
#include "caliper/common/Log.h" #include "caliper/common/Log.h"
#include "caliper/common/RuntimeConfig.h" #include "caliper/common/RuntimeConfig.h"
#include <errno.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
using namespace cali; using namespace cali;
namespace { namespace {
...@@ -77,6 +83,8 @@ private: ...@@ -77,6 +83,8 @@ private:
Attribute symbol_fun { Attribute::invalid }; Attribute symbol_fun { Attribute::invalid };
Attribute timestamp { Attribute::invalid }; Attribute timestamp { Attribute::invalid };
int sock;
void post_init_cb(Caliper* c, Channel* chn) { void post_init_cb(Caliper* c, Channel* chn) {
// manually issue a pre-flush event to set up the SymbolLookup service // manually issue a pre-flush event to set up the SymbolLookup service
chn->events().pre_flush_evt(c, chn, nullptr); chn->events().pre_flush_evt(c, chn, nullptr);
...@@ -98,12 +106,36 @@ private: ...@@ -98,12 +106,36 @@ private:
return; return;
} }
sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if(sock == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to open socket: "
<< strerror(errno) << std::endl;
return;
}
sockaddr_un addr;
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
snprintf(&addr.sun_path[1], 91, "DCDBPusherCaliSocket");
if(connect(sock, (struct sockaddr*) &addr, sizeof(addr))) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to connect socket: "
<< strerror(errno) << std::endl;
close(sock);
sock = -1;
return;
}
initialized = true; initialized = true;
} }
void process_snapshot_cb(Caliper* c, Channel* chn, const SnapshotRecord*, const SnapshotRecord* sbuf) { void process_snapshot_cb(Caliper* c, Channel* chn, const SnapshotRecord*, const SnapshotRecord* sbuf) {
++snapshots_processed; ++snapshots_processed;
//TODO check if signal safety is required //TODO check if signal safety is required
//TODO sampler invoked snapshots are always signals, so better do
// nothing signal "un-safe"
//if (c->is_signal()) { //if (c->is_signal()) {
// ++snapshots_failed; // ++snapshots_failed;
// return; // return;
...@@ -127,24 +159,48 @@ private: ...@@ -127,24 +159,48 @@ private:
// symbollookup data // symbollookup data
chn->events().postprocess_snapshot(c, chn, rec); chn->events().postprocess_snapshot(c, chn, rec);
Log(1).stream() << chn->name() << ": DcdbPusher: printing snapshot" << std::endl; const size_t bufSize = 2048;
char buf[bufSize];
std::string time;
std::string value;
for (const Entry& e : rec) { for (const Entry& e : rec) {
//print attribute values for timestamp and looked up function symbol name //print attribute values for timestamp and looked up function symbol name
cali_id_t entryId = c->get_attribute(e.attribute()).id(); cali_id_t entryId = c->get_attribute(e.attribute()).id();
if (entryId == symbol_fun.id() || entryId == timestamp.id()) { if (entryId == timestamp.id()) {
time = e.value().to_string();
} else if (entryId == symbol_fun.id()) {
value = e.value().to_string();
}
}
Log(1).stream() << chn->name() << ": DcdbPusher: " if ((value.length() + time.length() + 2) > 2048) {
<< c->get_attribute(e.attribute()).name() Log(1).stream() << chn->name() << ": DcdbPusher: value exceeding buffer size" << std::endl;
<< " = " << e.value().to_string() ++snapshots_failed;
<< std::endl; return;
} }
Log(1).stream() << chn->name() << ": DcdbPusher: sending value "
<< value << " (" << time << ")" << std::endl << std::endl;
strncpy(buf, time.c_str(), time.length()+1);
strncpy(&buf[time.length()+1], value.c_str(), value.length()+1);
if (send(sock, buf, value.length() + time.length() + 2, 0) == -1) {
Log(1).stream() << chn->name() << ": DcdbPusher: Failed to send message: "
<< strerror(errno) << std::endl;
++snapshots_failed;
} }
Log(1).stream() << chn->name() << std::endl;
} }
void finish_cb(Caliper* c, Channel* chn) { void finish_cb(Caliper* c, Channel* chn) {
if(initialized) {
shutdown(sock, SHUT_WR);
close(sock);
}
if (threshold_exceeded > 0) { if (threshold_exceeded > 0) {
Log(1).stream() << chn->name() << ": DcdbPusher: threshold was exceeded " Log(1).stream() << chn->name() << ": DcdbPusher: threshold was exceeded "
<< threshold_exceeded << " times." << std::endl; << threshold_exceeded << " times." << std::endl;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment