Commit d5beecce authored by Alessio Netti's avatar Alessio Netti

SensorMetadata structure now based on std::shared_ptr members

- Allows to encode "not set" states for single members
parent ce5173fc
......@@ -637,7 +637,7 @@ protected:
s->setName(s->getMqtt());
SensorMetadata* sm = s->getMetadata();
if(sm) {
if(sm->isOperation) {
if(sm->getIsOperation() && *sm->getIsOperation()) {
s->clearMetadata();
if(u.getInputs().size() != 1) {
LOG(error) << _operatorName << " " << op.getName() << ": Ambiguous operation field for sensor " << s->getName();
......@@ -645,15 +645,16 @@ protected:
}
// Replacing the metadata to publish the sensor as an operation of its corresponding input
SensorMetadata smNew;
smNew.publicName = u.getInputs()[0]->getMqtt();
smNew.setPublicName(u.getInputs()[0]->getMqtt());
smNew.setPattern(u.getInputs()[0]->getMqtt());
smNew.addOperation(s->getMqtt());
s->setMetadata(smNew);
} else {
sm->publicName = s->getMqtt();
sm->pattern = s->getMqtt();
sm->isVirtual = false;
if (sm->interval == 0)
sm->interval = (unsigned long long) op.getInterval() * 1000000;
sm->setPublicName(s->getMqtt());
sm->setPattern(s->getMqtt());
sm->setIsVirtual(false);
if (!sm->getInterval())
sm->setInterval((uint64_t)op.getInterval() * 1000000);
}
}
}
......
......@@ -152,8 +152,8 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
for (const auto& in : subUnit->getInputs()) {
if( _scaling_factor == 1){
SensorMetadata buffer;
if(_queryEngine.queryMetadata(in->getName(), buffer)){
_scaling_factor = buffer.scale;
if(_queryEngine.queryMetadata(in->getName(), buffer) && buffer.getScale()){
_scaling_factor = *buffer.getScale();
LOG(debug) << "PerSystSql Operator " << _name << " using scaling factor of " << _scaling_factor;
}
}
......
......@@ -129,7 +129,7 @@ void SMUCNGPerfOperator::compute(U_Ptr unit) {
auto inputs = unit->getInputs();
auto timestamp = getTimestamp() - _go_back_ns;
for(auto& outSensor : unit->getOutputs()){
if(outSensor->getMetadata() == nullptr){
if(outSensor->getMetadata() == nullptr || !outSensor->getMetadata()->getScale()) {
LOG(error) << "No metadata defined, sensor " << outSensor->getName() << " can't compute anything.";
continue;
}
......@@ -180,7 +180,7 @@ void SMUCNGPerfOperator::computeMetricPerSecond(std::vector<SMUCNGPtr> &inputs,
SMUCSensorBase::Metric_t metric = found->second;
query(inputs[_metricToPosition[metric]]->getName(), timestamp, _buffers[0]);
reading_t metricpersec;
if(_buffers[0].size() > 0 && calculateMetricPerSec(_buffers[0][0], _measuring_interval_s, metricpersec, outSensor->getMetadata()->scale)){
if(_buffers[0].size() > 0 && calculateMetricPerSec(_buffers[0][0], _measuring_interval_s, metricpersec, *outSensor->getMetadata()->getScale())){
outSensor->storeReading(metricpersec);
}
}
......@@ -199,7 +199,7 @@ void SMUCNGPerfOperator::computeMetricRatio(std::vector<SMUCNGPtr>& inputs, SMUC
query(inputs[_metricToPosition[metric_divisor]]->getName(), timestamp, divisor);
bool wascalced = false;
reading_t ratio;
if (dividend.size() > 0 && divisor.size() > 0 && calculateMetricRatio(dividend[0], divisor[0], ratio, outSensor->getMetadata()->scale)) {
if (dividend.size() > 0 && divisor.size() > 0 && calculateMetricRatio(dividend[0], divisor[0], ratio, *outSensor->getMetadata()->getScale())) {
outSensor->storeReading(ratio);
}
}
......@@ -217,12 +217,12 @@ void SMUCNGPerfOperator::computeProfileMetric(std::vector<SMUCNGPtr>& inputs, SM
if(queryMetrics.size()==2){ //_buffer[0] and _buffer[1] should have been filled
auto second_value = computeSum(_buffers[1]);
if(second_value != 0){
result.value = value / (outSensor->getMetadata()->scale * static_cast<double>(second_value));
result.value = value / (*outSensor->getMetadata()->getScale() * static_cast<double>(second_value));
result.timestamp = _buffers[0][0].timestamp;
outSensor->storeReading(result);
}
} else { //only one buffer was filled
result.value = value / (outSensor->getMetadata()->scale * (_interval/1000.0));
result.value = value / (*outSensor->getMetadata()->getScale() * (_interval/1000.0));
result.timestamp = _buffers[0][0].timestamp;
outSensor->storeReading(result);
}
......@@ -265,7 +265,7 @@ void SMUCNGPerfOperator::computeFREQUENCY(std::vector<SMUCNGPtr>& inputs, SMUCNG
query(inputs[_metricToPosition[SMUCSensorBase::CLOCKS]]->getName(), timestamp, clocks);
query(inputs[_metricToPosition[SMUCSensorBase::CLOCKS_REF]]->getName(), timestamp, clocks_ref);
reading_t frequency;
if( clocks.size() > 0 && clocks_ref.size() > 0 && calculateFrequency(clocks_ref[0],clocks[0], MIN_FREQ_MHZ, MAX_FREQ_MHZ, frequency, outSensor->getMetadata()->scale)) {
if( clocks.size() > 0 && clocks_ref.size() > 0 && calculateFrequency(clocks_ref[0],clocks[0], MIN_FREQ_MHZ, MAX_FREQ_MHZ, frequency, *outSensor->getMetadata()->getScale())) {
outSensor->storeReading(frequency);
}
}
......@@ -306,31 +306,31 @@ void SMUCNGPerfOperator::computeFLOPS(std::vector<SMUCNGPtr>& inputs, SMUCNGPtr&
if(flop_metric == SMUCSensorBase::FLOPS) {
if (calculateFlopsPerSec(scalar_double, scalar_single, packed128_double,
packed128_single, packed256_double, packed256_single,
packed512_double, packed512_single, result, outSensor->getMetadata()->scale, _measuring_interval_s) ) {
packed512_double, packed512_single, result, *outSensor->getMetadata()->getScale(), _measuring_interval_s) ) {
outSensor->storeReading(result);
}
} else if(flop_metric == SMUCSensorBase::PACKED_FLOPS){
if (calculatePackedFlopsPerSec(packed128_double, packed128_single,
packed256_double, packed256_single, packed512_double,
packed512_single, result, outSensor->getMetadata()->scale, _measuring_interval_s)) {
packed512_single, result, *outSensor->getMetadata()->getScale(), _measuring_interval_s)) {
outSensor->storeReading(result);
}
} else if(flop_metric == SMUCSensorBase::VECTORIZED_RATIO) {
if(calculateVectorizationRatio(scalar_double, scalar_single, packed128_double,
packed128_single, packed256_double, packed256_single,
packed512_double, packed512_single, result, outSensor->getMetadata()->scale)) {
packed512_double, packed512_single, result, *outSensor->getMetadata()->getScale())) {
outSensor->storeReading(result);
}
} else if (flop_metric == SMUCSensorBase::AVX512_TOVECTORIZED_RATIO) {
if (calculateAVX512FlopsToVectorizedRatio(packed128_double,
packed128_single, packed256_double, packed256_single,
packed512_double, packed512_single, result, outSensor->getMetadata()->scale )) {
packed512_double, packed512_single, result, *outSensor->getMetadata()->getScale())) {
outSensor->storeReading(result);
}
} else if (flop_metric == SMUCSensorBase::SINGLE_PRECISION_TO_TOTAL_RATIO) {
if(calculateSP_TO_TOTAL_RATIO(scalar_double, scalar_single, packed128_double,
packed128_single, packed256_double, packed256_single,
packed512_double, packed512_single, result, outSensor->getMetadata()->scale)){
packed512_double, packed512_single, result, *outSensor->getMetadata()->getScale())){
outSensor->storeReading(result);
}
}
......@@ -356,7 +356,7 @@ void SMUCNGPerfOperator::computeL3HIT_TO_L3MISS_RATIO(std::vector<SMUCNGPtr>& in
query(inputs[_metricToPosition[SMUCSensorBase::MEM_LOAD_RETIRED_L3_HIT]]->getName(), timestamp, l3_hits);
query(inputs[_metricToPosition[SMUCSensorBase::MEM_LOAD_RETIRED_L3_MISS]]->getName(), timestamp, l3_load_miss);
reading_t l3hitToMissRatio;
if (l3_misses.size() > 0 && l3_hits.size() > 0 && l3_load_miss.size() > 0 && calculateL3HitToL3MissRatio(l3_misses[0], l3_hits[0], l3_load_miss[0], l3hitToMissRatio, outSensor->getMetadata()->scale)) {
if (l3_misses.size() > 0 && l3_hits.size() > 0 && l3_load_miss.size() > 0 && calculateL3HitToL3MissRatio(l3_misses[0], l3_hits[0], l3_load_miss[0], l3hitToMissRatio, *outSensor->getMetadata()->getScale())) {
outSensor->storeReading(l3hitToMissRatio);
}
}
......@@ -377,7 +377,7 @@ void SMUCNGPerfOperator::computeMEMORY_BANDWIDTH(std::vector<SMUCNGPtr>& inputs,
query(inputs[_metricToPosition[SMUCSensorBase::CAS_COUNT_WRITE3]]->getName(), timestamp, mem_counters);
query(inputs[_metricToPosition[SMUCSensorBase::CAS_COUNT_WRITE4]]->getName(), timestamp, mem_counters);
query(inputs[_metricToPosition[SMUCSensorBase::CAS_COUNT_WRITE5]]->getName(), timestamp, mem_counters);
if(mem_counters.size() > 0 && calculateMemoryBandwidth(mem_counters, memory_bw, _measuring_interval_s, outSensor->getMetadata()->scale)){
if(mem_counters.size() > 0 && calculateMemoryBandwidth(mem_counters, memory_bw, _measuring_interval_s, *outSensor->getMetadata()->getScale())){
outSensor->storeReading(memory_bw);
}
......
......@@ -56,7 +56,8 @@ bool AnalyticsController::initialize(Configuration& settings, const string& conf
if(_manager->probe(_configPath, "collectagent.conf")) {
vector<string> topics;
for(const auto& kv : _metadataStore->getMap())
topics.push_back(kv.second.pattern);
if(kv.second.isValid())
topics.push_back(*kv.second.getPattern());
// Building the sensor navigator
try {
......@@ -163,10 +164,10 @@ bool AnalyticsController::publishSensors() {
for (const auto &u : op->getUnits())
for (const auto &s : u->getBaseOutputs()) {
if (s->getPublish()) {
if (s->getMetadata()) {
if (s->getMetadata() && s->getMetadata()->isValid()) {
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(*s->getMetadata());
err = _dcdbCfg->publishSensor(ps);
_metadataStore->store(s->getMetadata()->pattern, *s->getMetadata());
_metadataStore->store(*s->getMetadata()->getPattern(), *s->getMetadata());
} else {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
}
......@@ -216,7 +217,8 @@ bool AnalyticsController::rebuildSensorNavigator() {
for (const auto &s : publicSensors)
if (!s.is_virtual) {
sBuf = Configuration::publicSensorToMetadata(s);
topics.push_back(sBuf.pattern);
if(sBuf.isValid())
topics.push_back(*sBuf.getPattern());
}
publicSensors.clear();
......
......@@ -273,9 +273,11 @@ int mqttCallback(SimpleMQTTMessage *msg)
LOG(error) << "Invalid metadata packed received!";
return 1;
}
if(sm.isValid()) {
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
err = mySensorConfig->publishSensor(ps);
metadataStore->store(sm.pattern, sm);
metadataStore->store(*sm.getPattern(), sm);
}
} else {
err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
}
......@@ -703,7 +705,8 @@ int main(int argc, char* const argv[]) {
for (const auto &s : publicSensors)
if (!s.is_virtual) {
sBuf = Configuration::publicSensorToMetadata(s);
metadataStore->store(sBuf.pattern, sBuf);
if(sBuf.isValid())
metadataStore->store(*sBuf.getPattern(), sBuf);
}
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
......
......@@ -76,18 +76,26 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
DCDB::PublicSensor Configuration::metadataToPublicSensor(const SensorMetadata& sm) {
DCDB::PublicSensor ps;
ps.name = sm.publicName;
ps.is_virtual = sm.isVirtual;
ps.pattern = sm.pattern;
ps.unit = sm.unit;
ps.scaling_factor = sm.scale;
ps.ttl = sm.ttl;
ps.interval = sm.interval;
ps.operations = sm.operations;
if(sm.getPublicName())
ps.name = *sm.getPublicName();
if(sm.getIsVirtual())
ps.is_virtual = *sm.getIsVirtual();
if(sm.getPattern())
ps.pattern = *sm.getPattern();
if(sm.getUnit())
ps.unit = *sm.getUnit();
if(sm.getScale())
ps.scaling_factor = *sm.getScale();
if(sm.getTTL())
ps.ttl = *sm.getTTL();
if(sm.getInterval())
ps.interval = *sm.getInterval();
if(sm.getOperations())
ps.operations = *sm.getOperations();
uint64_t sensorMask = 0;
if(sm.integrable)
if(sm.getIntegrable() && *sm.getIntegrable())
sensorMask = sensorMask | INTEGRABLE;
if(sm.monotonic)
if(sm.getMonotonic() && *sm.getMonotonic())
sensorMask = sensorMask | MONOTONIC;
ps.sensor_mask = sensorMask;
return ps;
......@@ -95,19 +103,20 @@ DCDB::PublicSensor Configuration::metadataToPublicSensor(const SensorMetadata& s
SensorMetadata Configuration::publicSensorToMetadata(const DCDB::PublicSensor& ps) {
SensorMetadata sm;
sm.publicName = ps.name;
sm.isVirtual = ps.is_virtual;
sm.setPublicName(ps.name);
sm.setIsVirtual(ps.is_virtual);
// Stripping whitespace from the sensor pattern in the SID
sm.pattern = ps.pattern;
boost::algorithm::trim(sm.pattern);
string stripPattern = ps.pattern;
boost::algorithm::trim(stripPattern);
sm.setPattern(stripPattern);
sm.unit = ps.unit;
sm.scale = ps.scaling_factor;
sm.ttl = ps.ttl;
sm.interval = ps.interval;
sm.operations = ps.operations;
sm.integrable = ps.sensor_mask & INTEGRABLE;
sm.monotonic = ps.sensor_mask & MONOTONIC;
sm.setUnit(ps.unit);
sm.setScale(ps.scaling_factor);
sm.setTTL(ps.ttl);
sm.setInterval(ps.interval);
sm.setOperations(ps.operations);
sm.setIntegrable(ps.sensor_mask & INTEGRABLE);
sm.setMonotonic(ps.sensor_mask & MONOTONIC);
return sm;
}
This diff is collapsed.
......@@ -451,11 +451,11 @@ class ConfiguratorTemplate : public ConfiguratorInterface {
s->setName(s->getMqtt());
SensorMetadata *sm = s->getMetadata();
if (sm) {
sm->publicName = s->getMqtt();
sm->pattern = s->getMqtt();
sm->isVirtual = false;
if(sm->interval==0)
sm->interval = (unsigned long long)g->getInterval() * 1000000;
sm->setPublicName(s->getMqtt());
sm->setPattern(s->getMqtt());
sm->setIsVirtual(false);
if(!sm->getInterval())
sm->setInterval((uint64_t)g->getInterval() * 1000000);
}
}
g->releaseSensors();
......
......@@ -472,11 +472,11 @@ class ConfiguratorTemplateEntity : public ConfiguratorTemplate<SBase, SGroup> {
s->setName(s->getMqtt());
SensorMetadata *sm = s->getMetadata();
if (sm) {
sm->publicName = s->getMqtt();
sm->pattern = s->getMqtt();
sm->isVirtual = false;
if (sm->interval == 0)
sm->interval = (unsigned long long)g->getInterval() * 1000000;
sm->setPublicName(s->getMqtt());
sm->setPattern(s->getMqtt());
sm->setIsVirtual(false);
if (!sm->getInterval())
sm->setInterval((uint64_t)g->getInterval() * 1000000);
}
}
g->releaseSensors();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment