Commit 45380821 authored by Micha Müller's avatar Micha Müller
Browse files

WIP: Caliper plugin: support event snapshots

-can send event data from cali service to pusher plugin
-refactored sample data symbol lookup: is now done by cali service
-cali service only sends timestamp and data-string to pusher plugin
-outstanding TODO: push event data into separate Cassandra table
parent 40f82fd2
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -6,7 +6,7 @@ else()
set(CMAKE_C_STANDARD_REQUIRED ON)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
- set(CMAKE_CXX_STANDARD 11)
+ set(CMAKE_CXX_STANDARD 17)
set(CMAKE_C_STANDARD 99)
endif()
--- a/src/services/CMakeLists.txt 2019-06-07 10:33:20.036516054 +0200
+++ b/src/services/CMakeLists.txt 2019-06-06 17:57:54.096623744 +0200
@@ -55,6 +55,7 @@
......
......@@ -699,7 +699,8 @@ Explanation of the values specific for this plugin:
| Value | Explanation |
|:----- |:----------- |
| timeout | Number of read cycles after which an Caliper-application is assumed to be terminated if no new values have been received. Connection (shared memory) is teared down on timeout.
| maxSensors | To limit indefinite memory usage by the creation of new Sensor object one can specify a threshold here. If the number of sensors exceeds this value, they will be cleared. Default is 500.
| timeout | Number of read cycles after which an Caliper-application is assumed to be terminated if no new values have been received. Connection (shared memory) is teared down on timeout. Default is 15.
## Metadata Management <a name="metadataManagement"></a>
......
global {
mqttPrefix /caliper
mqttPrefix /System/Rack/Chassis/Node
}
group cali {
interval 100
mqttprefix 01
;mqttPart is set to /caliper by default and should be left this way unless required otherwise
;mqttpart 01
maxSensors 1000
timeout 15
}
......@@ -43,6 +43,7 @@ void CaliperConfigurator::sensorBase(CaliperSensorBase& s, CFG_VAL config) {
void CaliperConfigurator::sensorGroup(CaliperSensorGroup& s, CFG_VAL config) {
s.setGlobalMqttPrefix(_mqttPrefix);
ADD {
ATTRIBUTE("maxSensors", setMaxSensorNum);
ATTRIBUTE("timeout", setTimeout);
}
}
......@@ -74,7 +74,8 @@ public:
return *this;
}
void printConfig(LOG_LEVEL ll, LOGGER& lg) {
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' ');
/* nothing to print */
}
......
......@@ -43,19 +43,24 @@
CaliperSensorGroup::CaliperSensorGroup(const std::string& name) :
SensorGroupTemplate(name),
_maxSensorNum(500),
_timeout(15),
_socket(-1),
_connection(-1),
_globalMqttPrefix("") {
_mqttPart = "/caliper";
_buf = static_cast<char*>(malloc(MSGQ_SIZE));
_lock.clear();
}
CaliperSensorGroup::CaliperSensorGroup(const CaliperSensorGroup& other) :
SensorGroupTemplate(other),
_maxSensorNum(other._maxSensorNum),
_timeout(other._timeout),
_socket(-1),
_connection(-1),
_globalMqttPrefix(other._globalMqttPrefix) {
_buf = static_cast<char*>(malloc(MSGQ_SIZE));
_lock.clear();
//SensorGroupTemplate already copy constructed _sensor
......@@ -66,6 +71,7 @@ CaliperSensorGroup::CaliperSensorGroup(const CaliperSensorGroup& other) :
CaliperSensorGroup::~CaliperSensorGroup() {
_sensorIndex.clear();
free(_buf);
for(auto& it : _processes) {
munmap(it.shm, SHM_SIZE);
......@@ -75,6 +81,7 @@ CaliperSensorGroup::~CaliperSensorGroup() {
CaliperSensorGroup& CaliperSensorGroup::operator=(const CaliperSensorGroup& other) {
SensorGroupTemplate::operator=(other);
_maxSensorNum = other._maxSensorNum;
_timeout = other._timeout;
_socket = -1;
_connection = -1;
......@@ -190,7 +197,7 @@ void CaliperSensorGroup::read() {
}
if (_processes.size() == 0) {
//Currently no active processes are connected. We use this chance to
//Currently no previous processes are connected. We use this chance to
//clear all previous sensors (we cannot clear them process-wise as
//there is no possibility to tell which sensors belongs to which process).
//If their values have not been pushed by now they are lost.
......@@ -204,6 +211,16 @@ void CaliperSensorGroup::read() {
_processes.push_back(std::move(ci));
}
//clean up sensors if required
//If they still contain unpushed values, the will get lost
if (_sensorIndex.size() > _maxSensorNum) {
_sensorIndex.clear();
acquireSensors();
_sensors.clear();
_baseSensors.clear();
releaseSensors();
}
bool doCleanUp = false;
for(auto& it : _processes) {
//get snapshot data from message queue in shared memory
......@@ -214,12 +231,11 @@ void CaliperSensorGroup::read() {
sem_t* w_sem;
sem_t* u_sem;
snap_data* msg_queue;
char* msg_queue;
r_sem = reinterpret_cast<sem_t*>(static_cast<char*>(it.shm) + 2*sizeof(size_t));
w_sem = r_sem + 1;
u_sem = w_sem + 1;
msg_queue = reinterpret_cast<snap_data*>(u_sem + 1);
msg_queue = reinterpret_cast<char*>(w_sem + 1);
//TODO atomic load/stores instead of semaphore locking?
if (sem_wait(r_sem)) {
......@@ -242,7 +258,6 @@ void CaliperSensorGroup::read() {
//"Timeout". We assume that the application terminated
sem_destroy(r_sem);
sem_destroy(w_sem);
sem_destroy(u_sem);
munmap(it.shm, SHM_SIZE);
it.shm = nullptr;
......@@ -254,17 +269,16 @@ void CaliperSensorGroup::read() {
}
it.shmFailCnt = 0;
size_t nelems = 0;
snap_data snaps[MSGQ_SIZE];
size_t bufSize= 0;
if (r_index < w_index) {
nelems = w_index - r_index;
memcpy(snaps, &msg_queue[r_index+1], nelems*sizeof(snap_data));
bufSize = w_index - r_index;
memcpy(_buf, &msg_queue[r_index+1], bufSize);
} else {
nelems = MSGQ_SIZE - r_index + w_index;
bufSize = MSGQ_SIZE - r_index + w_index;
size_t sep = MSGQ_SIZE - r_index - 1;
memcpy(snaps, &msg_queue[r_index+1], sep*sizeof(snap_data));
memcpy(&snaps[sep], msg_queue, (nelems-sep)*sizeof(snap_data));
memcpy(_buf, &msg_queue[r_index+1], sep);
memcpy(&_buf[sep], msg_queue, bufSize-sep);
}
//update r_index in shm
......@@ -274,91 +288,53 @@ void CaliperSensorGroup::read() {
*(reinterpret_cast<size_t*>(static_cast<char*>(it.shm))) = w_index;
sem_post(r_sem);
//LOG(debug) << _groupName << "Processing " << nelems << " snapshots";
//protect access to symbol data
if (sem_wait(u_sem)) {
continue;
}
//LOG(debug) << _groupName << "Processing " << bufSize << " bytes";
std::unordered_map<std::string, std::pair<S_Ptr, reading_t>> cache;
for (size_t i = 0; i < nelems; ++i) {
for (size_t bufIdx = 0; bufIdx < bufSize;) {
reading_t reading;
reading.value = 1;
reading.timestamp = snaps[i].ts;
std::string sName("cpu" + std::to_string(snaps[i].cpu) + '/');
uintptr_t pc = snaps[i].pc;
const size_t addrCnt = *(reinterpret_cast<size_t*>(static_cast<char*>(it.shm)
+ lookup_data_offset));
const addr_data* const addrs = reinterpret_cast<addr_data*>(static_cast<char*>(it.shm)
+ lookup_data_offset + sizeof(size_t));
bool foundRange = false;
for(size_t j = 0; j < addrCnt; ++j) {
if (pc >= addrs[j].start_addr && pc <= addrs[j].end_addr) {
foundRange = true;
sName += addrs[j].pathname;
const fsym_data* fsyms = reinterpret_cast<const fsym_data*>(
reinterpret_cast<const char*>(&addrs[j]) + addrs[j].fsym_offset);
for(size_t k = 0; k < addrs[j].fsym_count; ++k) {
if (pc >= fsyms->start_addr && pc <= fsyms->end_addr) {
++fsyms;
sName += "::";
sName += reinterpret_cast<const char*>(fsyms);
break;
}
size_t str_size = fsyms->str_size;
++fsyms;
fsyms = reinterpret_cast<const fsym_data*>(
reinterpret_cast<const char*>(fsyms) + str_size);
} //It's OK if we found no symbol. There are possibly none associated to this range
//store in sensors
S_Ptr s;
auto it = _sensorIndex.find(sName);
if(it != _sensorIndex.end()) {
//we encountered this function or event name already
s = it->second;
} else {
//unknown function or event name --> create a new sensor
s = std::make_shared<CaliperSensorBase>(sName);
s->setMqtt(_globalMqttPrefix + _mqttPart + sName);
s->setName(s->getMqtt());
s->initSensor(_interval);
acquireSensors();
_sensors.push_back(s);
_baseSensors.push_back(s);
releaseSensors();
_sensorIndex.insert(std::make_pair(sName, s));
}
// temporarily store and aggregate value in the cache
if (cache.find(sName) == cache.end()) {
//no cache entry yet
cache[sName] = std::make_pair(s, reading);
} else {
//update cache entry. Use timestamp of last aggregated value
cache[sName].second.value += reading.value;
if (reading.timestamp > cache[sName].second.timestamp) {
cache[sName].second.timestamp = reading.timestamp;
}
}
break;
}
reading.timestamp = *(reinterpret_cast<uint64_t*>(&_buf[bufIdx]));
bufIdx += sizeof(uint64_t);
std::string data = std::string(&_buf[bufIdx]);
bufIdx += data.size() + 1;
std::string cpu = "/" + data.substr(0, data.find_first_of('/'));
std::string top = data.substr(data.find_first_of('/'));
//store in sensors
S_Ptr s;
auto it = _sensorIndex.find(data);
if(it != _sensorIndex.end()) {
//we encountered this function or event already
s = it->second;
} else {
//unknown function or event --> create a new sensor
s = std::make_shared<CaliperSensorBase>(data);
s->setMqtt(_globalMqttPrefix + cpu + _mqttPart + top);
s->setName(s->getMqtt());
s->initSensor(_interval);
acquireSensors();
_sensors.push_back(s);
_baseSensors.push_back(s);
releaseSensors();
_sensorIndex.insert(std::make_pair(data, s));
}
if (!foundRange) {
//PC was not within any range --> tell the update service to do his job
//We let the current snapshot pass unprocessed
LOG(debug) << _groupName << ": symbol index miss. Requesting rebuild";
*(reinterpret_cast<int*>(static_cast<char*>(it.shm) + lookup_data_offset - sizeof(int))) = 1;
// temporarily store and aggregate value in the cache
if (cache.find(data) == cache.end()) {
//no cache entry yet
cache[data] = std::make_pair(s, reading);
} else {
//update cache entry. Use timestamp of last aggregated value
cache[data].second.value += reading.value;
if (reading.timestamp > cache[data].second.timestamp) {
cache[data].second.timestamp = reading.timestamp;
}
}
}
sem_post(u_sem);
//flush cache
for (auto& it : cache) {
......@@ -382,7 +358,8 @@ void CaliperSensorGroup::read() {
}
}
void CaliperSensorGroup::printGroupConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Timeout: " << _timeout;
void CaliperSensorGroup::printGroupConfig(LOG_LEVEL ll, unsigned leadingSpaces) {
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << "Timeout: " << _timeout;
//nothing special to print
}
......@@ -46,65 +46,31 @@ class CaliperSensorGroup : public SensorGroupTemplate<CaliperSensorBase> {
/*******************************************************************************
* Common defines. Keep in sync with DcdbPusher Caliper service
******************************************************************************/
#define MAX_SYMBOL_SIZE 4096 //symbol names are not limited by any means but our memory is
#define MAX_PATH_SIZE 4096 //linux paths are not allowed to be longer than 4096 chars
#define MSGQ_SIZE 8192
#define MSGQ_SIZE 16*1024*1024
#define STR_PREFIX "/cali_dcdb_"
#define SHM_SIZE (32*1024*1024)
#define SHM_SIZE (17*1024*1024)
#define SOCK_NAME "DCDBPusherCaliSocket"
typedef struct {
uintptr_t pc;
uint64_t ts;
unsigned short cpu;
} snap_data;
/* Entry for an executable symbol in the symbol table */
//we cannot have variable length arrays in structs without mallocing them...
//therefore every symbol data entry is implicitly followed by a string
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
size_t str_size; // including terminating NUL byte
//char name[str_size];
} fsym_data;
/* Defines a contiguous executable memory block */
typedef struct {
uintptr_t start_addr;
uintptr_t end_addr;
uintptr_t offset; // offset as parsed from /proc//maps
size_t fsym_offset; // Offset in bytes from the address of this struct
// to the beginning of the associated symbol section
size_t fsym_count; // Number of symbols in this address range
char pathname[MAX_PATH_SIZE]; // Filepath + name of the binary where
// this memory range comes from or
// "[Anonymous]" if unknown
} addr_data;
/*
* Queue entry layout:
* {
* uint64_t timestamp
* char[] data (NUL terminated string with at most max_dat_size bytes)
* }
*/
/*
* Layout of shared-memory file used to communicate with Caliper service:
*
* //Communication queue, aka ring buffer:
* size_t r_index //points to the last read element
* size_t w_index //points to the last written element
* size_t r_index //points to the last read byte
* size_t w_index //points to the last written byte
* sem_t r_sem; // atomic access to r_index
* sem_t w_sem; // atomic access to w_index
* sem_t u_sem; // atomic access to symbol data and updateSymbolData.
* snap_data[MSGQ_SIZE]
*
* //TODO could we profit from some alignment here?
* int updateSymbolData;
*
* //symbol lookup data
* size_t addr_count
* addr_data[addr_count]
* addr_count * (fsym_data[addr_data.fsym_count])
* char[MSGQ_SIZE] //contains variable length entries
* //TODO do not exceed SHM_SIZE
*/
static constexpr size_t lookup_data_offset = 2*sizeof(size_t)
+ 3*sizeof(sem_t)
+ MSGQ_SIZE*sizeof(snap_data)
+ sizeof(int);
/*******************************************************************************
* End of common defines
******************************************************************************/
......@@ -128,12 +94,14 @@ public:
_lock.clear(std::memory_order_release);
}
void printGroupConfig(LOG_LEVEL ll) final override;
void printGroupConfig(LOG_LEVEL ll, unsigned leadingSpaces=12) final override;
void setTimeout(const std::string& timeout) { _timeout = stoul(timeout); }
void setGlobalMqttPrefix(const std::string& prefix) { _globalMqttPrefix = prefix; }
void setMaxSensorNum(const std::string& maxSensorNum) { _maxSensorNum = stoul(maxSensorNum); }
void setTimeout(const std::string& timeout) { _timeout = stoul(timeout); }
void setGlobalMqttPrefix(const std::string& prefix) { _globalMqttPrefix = prefix; }
unsigned short getTimeout() const { return _timeout; }
unsigned getMaxSensorNum() const { return _maxSensorNum; }
unsigned short getTimeout() const { return _timeout; }
private:
void read() final override;
......@@ -145,7 +113,9 @@ private:
size_t shmFailCnt;
};
unsigned short _timeout;
unsigned _maxSensorNum; ///< Clear associated sensors if we exceed this threshold to limit memory usage
unsigned short _timeout; ///< Number of consecutive read cycles with empty queue before Cali process is assumed dead
char* _buf; ///< Local buffer for temporal storage of shm queue contents
int _socket;
int _connection;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment