Commit 2e7d3d68 authored by Alessio Netti's avatar Alessio Netti
Browse files

Bugfixes

- Textual MQTT topics working in dcdbpusher and data analytics framework
- REST API still not working properly
- Testing on the collectagent not performed yet
parent 746484e5
......@@ -246,13 +246,13 @@ $(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.installed: $(DCDBDEPSPATH)/freeipm
$(DCDBDEPSPATH)/net-snmp-$(NET-SNMP_VERSION)/.built: $(DCDBDEPSPATH)/net-snmp-$(NET-SNMP_VERSION)/.patched
@echo ""
@echo "Building net-SNMP library..."
cd $(@D) && ./configure --prefix=$(DCDBDEPLOYPATH) --with-default-snmp-version=3 --with-defaults --with-logfile=none --with-persistent-directory=$(DCDBDEPLOYPATH)/var/net-snmp --disable-embedded-perl --disable-perl-cc-checks --without-perl-modules --disable-agent --disable-applications --disable-manuals --disable-scripts --disable-mibs
cd $(@D) && make -j $(MAKETHREADS) && touch $(@)
#cd $(@D) && ./configure --prefix=$(DCDBDEPLOYPATH) --with-default-snmp-version=3 --with-defaults --with-logfile=none --with-persistent-directory=$(DCDBDEPLOYPATH)/var/net-snmp --disable-embedded-perl --disable-perl-cc-checks --without-perl-modules --disable-agent --disable-applications --disable-manuals --disable-scripts --disable-mibs
#cd $(@D) && make -j $(MAKETHREADS) && touch $(@)
$(DCDBDEPSPATH)/net-snmp-$(NET-SNMP_VERSION)/.installed: $(DCDBDEPSPATH)/net-snmp-$(NET-SNMP_VERSION)/.built | $(DCDBDEPLOYPATH)
@echo ""
@echo "Installing net-SNMP library..."
cd $(@D) && make install && touch $(@)
#cd $(@D) && make install && touch $(@)
%-build: deps
@echo "Building $*"
......
......@@ -51,10 +51,7 @@ protected:
const string OUTPUT_BLOCK = "output";
const string ALL_CLAUSE = "all";
const string ALL_REC_CLAUSE = "all-recursive";
const std::string SENSOR_PATTERN = "(?i)<sensor>";
const std::string GROUP_PATTERN = "(?i)<group>";
public:
/**
......@@ -66,7 +63,6 @@ public:
_baseName("INVALID"),
_cfgPath(""),
_mqttPrefix(""),
_sensorPattern(""),
_cacheInterval(900000) {}
/**
......@@ -105,7 +101,6 @@ public:
*/
virtual void setGlobalSettings(const pluginSettings_t& pluginSettings) final {
_mqttPrefix = pluginSettings.mqttPrefix;
_sensorPattern = pluginSettings.sensorPattern;
_cacheInterval = pluginSettings.cacheInterval;
}
......@@ -117,7 +112,6 @@ public:
void printConfig(LOG_LEVEL ll) final {
LOG_VAR(ll) << " General: ";
LOG_VAR(ll) << " MQTT-Prefix: " << (_mqttPrefix != "" ? _mqttPrefix : "DEFAULT");
LOG_VAR(ll) << " Sensor Pattern: " << (_sensorPattern != "" ? _sensorPattern : "DEFAULT");
LOG_VAR(ll) << " Cache interval: " << _cacheInterval << " ms";
//prints plugin specific configurator attributes and entities if present
......@@ -402,7 +396,9 @@ protected:
if(!an.getTemplate()) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode, _mqttPrefix + an.getMqttPart(), !an.getStreaming());
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode,
MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()),
!an.getStreaming());
}
catch (const std::exception &e) {
LOG(error) << _analyzerName << " " << an.getName() << ": Error when creating units: " << e.what();
......@@ -411,7 +407,7 @@ protected:
}
for (auto &u: *units) {
if(!constructSensorNames(*u, an)) {
if(!constructSensorTopics(*u, an)) {
an.clearUnits();
delete units;
return false;
......@@ -529,9 +525,7 @@ protected:
_cacheInterval = stoul(global.second.data());
LOG(debug) << " Using own caching interval " << _cacheInterval << " [s]";
_cacheInterval *= 1000;
} else if (boost::iequals(global.first, "sensorpattern")) {
_sensorPattern = global.second.data();
}
}
}
global(config.get_child("global"));
}
......@@ -539,39 +533,17 @@ protected:
}
/**
* @brief Adjusts the names of the sensors
* @brief Adjusts the topics and names of the sensors
*
* Names are modified according to the sensorPattern specified in the global
* settings. Operates in tandem with the auto-publish feature.
* Names are set according to the corresponding topic.
*
* @return true if successful, false otherwise
*/
//TODO: switch to textual MQTT topics and use only MQTTPrefix
bool constructSensorNames(UnitTemplate<SBase>& u, Analyzer& an) {
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN);
boost::cmatch match;
//TODO: move sensorpattern checks somewhere else
if(_sensorPattern == "")
return true;
else if (!boost::regex_search(_sensorPattern.c_str(), match, sensorReg)) {
LOG(error) << "Invalid sensor naming pattern " << _sensorPattern << ". You must at least include <sensor>!";
return false;
}
bool constructSensorTopics(UnitTemplate<SBase>& u, Analyzer& an) {
std::string name;
// Performing name construction
for(auto& s: u.getOutputs()) {
name = s->getName();
// If the unit is related to a system component all of its sensors will have their names adjusted already
// If it is root, then we apply normal auto-publish. This means that adding the analyzer's name
// or plugin ID do not always work, as of now
if( u.getName() == SensorNavigator::rootKey) {
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, an.getName());
}
s->setName(name);
}
for(auto& s: u.getOutputs())
s->setName(s->getMqtt());
return true;
}
......@@ -589,8 +561,6 @@ protected:
std::string _cfgPath;
// Default MQTT prefix to be used when creating output sensors
std::string _mqttPrefix;
// String used to construct sensor names
std::string _sensorPattern;
// Interval in seconds for the cache of each sensor
unsigned int _cacheInterval;
// The vector of analyzers, in the form of pointers to AnalyzerInterface objects
......
......@@ -108,7 +108,8 @@ public:
* @param ll Logging level at which the configuration is printed
*/
virtual void printConfig(LOG_LEVEL ll) override {
LOG_VAR(ll) << " MQTT part: " << (_mqttPart != "" ? _mqttPart : "DEFAULT");
if(_mqttPart!="")
LOG_VAR(ll) << " MQTT part: " << _mqttPart;
LOG_VAR(ll) << " Sync readings: " << (_sync ? "enabled" : "disabled");
LOG_VAR(ll) << " Streaming mode: " << (_streaming ? "enabled" : "disabled");
LOG_VAR(ll) << " Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
......
......@@ -120,7 +120,7 @@ public:
set<string> *sensors = new set<string>();
if( level <= -1 )
sensors->insert(s);
sensors->insert(replace ? s : SensorNavigator::rootKey);
else {
//Ensuring that only the "filter" clause matches is enough, since we already checked for the entire
// < > configuration block in parseNodeLevelString
......@@ -180,8 +180,6 @@ public:
// We iterate over the units, and resolve their inputs and outputs starting from the prototype definitions
vector<shared_ptr<UnitTemplate<SBase>>> *unitObjects = new vector<shared_ptr<UnitTemplate<SBase>>>();
// If the unit pattern refers to root, we can resolve the unit immediately even if it is on demand
//TODO: remove this distinction after switching to textual MQTT topics, and don't resolve root unit
if(!ondemand || unitLevel==-1)
for(const auto& u : *units) {
try {
......@@ -263,13 +261,13 @@ protected:
// and then verify whether "node" belongs to this set or not.
bool nodeBelongsToPattern(const string& node, const string& unit) {
set<string>* units = resolveNodeLevelString(unit, SensorNavigator::rootKey, false);
if(!units->count(node)) {
if(units->count(node)) {
delete units;
return false;
return true;
}
else {
delete units;
return true;
return false;
}
}
......@@ -353,9 +351,9 @@ protected:
uOut.setSinkPath(u + "_" + sPath);
}
}
// If we are not using units (only unit is root, out of the hierarchy) we build sensors like in samplers
// If we are not using units (only unit is root, out of the hierarchy) we build sensors like in samplers
else
uOut.setMqtt(mqttPrefix + uOut.getMqtt());
uOut.setMqtt(MQTTChecker::formatTopic(mqttPrefix) + MQTTChecker::formatTopic(uOut.getMqtt()));
unitOutputs.push_back(make_shared<SBase>(uOut));
}
shared_ptr<UnitTemplate<SBase>> unPtr = make_shared<UnitTemplate<SBase>>(u, unitInputs, unitOutputs);
......
......@@ -37,13 +37,12 @@ bool AnalyticsController::initialize(Configuration& settings, const string& conf
// A sensor navigator is only built if data analytics plugins are expected to be instantiated
QueryEngine &_queryEngine = QueryEngine::getInstance();
if(_manager->probe(_configPath, "collectagent.conf")) {
vector <string> names, topics;
vector <string> topics;
list <DCDB::PublicSensor> publicSensors;
// Fetching sensor names and topics from the Cassandra datastore
_dcdbCfg->getPublicSensorsVerbose(publicSensors);
for (const auto &s : publicSensors)
if (!s.is_virtual) {
names.push_back(s.name);
topics.push_back(s.pattern);
}
publicSensors.clear();
......@@ -51,7 +50,7 @@ bool AnalyticsController::initialize(Configuration& settings, const string& conf
// Building the sensor navigator
try {
_navigator->setFilter(_settings.analyticsSettings.filter);
_navigator->buildTree(_settings.analyticsSettings.hierarchy, &names, &topics);
_navigator->buildTree(_settings.analyticsSettings.hierarchy, &topics);
} catch (const std::invalid_argument &e) {
LOG(error) << e.what();
LOG(error) << "Failed to build sensor hierarchy tree!";
......@@ -59,7 +58,6 @@ bool AnalyticsController::initialize(Configuration& settings, const string& conf
}
LOG(info) << "Built a sensor hierarchy tree of size " << _navigator->getTreeSize() << " and depth "
<< _navigator->getTreeDepth() << ".";
names.clear();
topics.clear();
// Assigning the newly-built sensor navigator to the QueryEngine
......
......@@ -40,9 +40,15 @@ public:
* @param cpuID The cpu ID associated to this topic (if any)
* @return The processed MQTT topic or suffix
*/
//TODO: integrate proper topic formatting
static std::string formatTopic(const std::string& topic, int cpuID=-1) {
return cpuID<0 ? topic : "cpu" + std::to_string(cpuID) + std::to_string(MQTT_SEP) + topic;
if(topic.empty()) return topic;
std::string newTopic = topic;
// Stripping off all forward slashes
while(!newTopic.empty() && newTopic.front()==MQTT_SEP) newTopic.erase(0, 1);
while(!newTopic.empty() && newTopic.back()==MQTT_SEP) newTopic.erase(newTopic.size()-1, 1);
// Adding the one front forward slash that we need
newTopic.insert(0, 1, MQTT_SEP);
return cpuID<0 ? newTopic : "/cpu" + std::to_string(cpuID) + newTopic;
}
/**
......
......@@ -259,7 +259,7 @@ public:
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << "Sensor: " << _name;
LOG_VAR(ll) << leading << " MQTT Topic: " << _mqtt;
//LOG_VAR(ll) << leading << " MQTT Topic: " << _mqtt;
LOG_VAR(ll) << leading << " Sink: " << (getSinkPath() != "" ? getSinkPath() : "none");
LOG_VAR(ll) << leading << " SubSampling: " << getSubsampling();
LOG_VAR(ll) << leading << (_skipConstVal ? " Skipping constant values" : " No skipping of constant values");
......
......@@ -11,6 +11,7 @@
#include <boost/regex.hpp>
#include <limits.h>
#include <algorithm>
#include "mqttchecker.h"
using namespace std;
......@@ -77,7 +78,7 @@ public:
* dots define the different levels in the architecture, and the corresponding list of regexes
* would be ["mpp3.", "rack\d{2}.", "chassis\d{2}.", "node\d{2}.", "cpu\d{2}"]. The last part
* of the sensor name is supposed to constitute a "leaf" in the tree. If the input vector is
* empty, the tree is built automatically by interpreting each "." in the sensor name as a tree
* empty, the tree is built automatically by interpreting each "/" in the sensor name as a tree
* level separator. The functionality implemented here requires careful naming of the sensors
* by developers.
*
......@@ -93,7 +94,7 @@ public:
*
* Overloaded version of the method by the same name. Takes as input a string containing a
* space-separated list of regular expressions defining the sensor hierarchy. If the string is
* empty, the tree is built automatically by interpreting each "." in the sensor name as a tree
* empty, the tree is built automatically by interpreting each "/" in the sensor name as a tree
* level separator.
*
* @param hierarchy String of comma-separated expressions that define the hierarchy
......
......@@ -28,8 +28,6 @@ bool GlobalConfiguration::readConfig() {
// ----- READING PLUGIN SETTINGS -----
if (boost::iequals(global.first, "mqttprefix")) {
pluginSettings.mqttPrefix = global.second.data();
if (pluginSettings.mqttPrefix[pluginSettings.mqttPrefix.length()-1] != '/')
pluginSettings.mqttPrefix.append("/");
} else if (boost::iequals(global.first, "autoPublish")) {
pluginSettings.autoPublish = to_bool(global.second.data());
} else if (boost::iequals(global.first, "tempdir")) {
......
......@@ -6,7 +6,7 @@
const string SensorNavigator::rootKey = "__root__";
const string SensorNavigator::templateKey = "__template__";
const char SensorNavigator::pathSeparator = '.';
const char SensorNavigator::pathSeparator = '/';
void SensorNavigator::clearTree() {
if(_sensorTree) {
......@@ -34,13 +34,8 @@ bool SensorNavigator::sensorExists(const string& node) {
string SensorNavigator::buildTopicForNode(const string& node, const string& suffix, int len) {
if(!_sensorTree || !_sensorTree->count(node) || isSensorNode(node))
throw domain_error("SensorNavigator: node not found in tree!");
string nodePrefix = getNodeTopic(node);
int cnt = (int)std::count(nodePrefix.begin(), nodePrefix.end(), '/');
if( (int)nodePrefix.length() + (int)suffix.length() - cnt > len)
throw invalid_argument("SensorNavigator: cannot build topic, too many characters!");
return nodePrefix + string(len - nodePrefix.length() - suffix.length() + cnt, 'F') + suffix;
return MQTTChecker::formatTopic(getNodeTopic(node)) + MQTTChecker::formatTopic(suffix);
}
bool SensorNavigator::isSensorNode(const string& node) {
......@@ -317,7 +312,6 @@ void SensorNavigator::_addSensor(const string& name, const string& topic) {
}
else if(_usingTopics) {
// Intersecting the topics from the new sensor and the existing hierarchical node
//TODO: evaluate alternatives. This system is finnicky and depends too much on user configurations
int pCtr = 0;
Node& target = _sensorTree->at(last);
while( pCtr < (int)topic.length() && pCtr < (int)target.topic.length() && target.topic[pCtr] == topic[pCtr])
......@@ -339,10 +333,11 @@ void SensorNavigator::_addSensor(const string& name, const string& topic) {
void SensorNavigator::_addAutoSensor(const string& name, const string& topic) {
string last=rootKey, prev="";
size_t sepPos = -1;
//sepPos=0 implies that the first character is skipped when searching for separators
size_t sepPos = 0;
int d=0;
if(name.empty() || name[0] == pathSeparator || name[name.size()-1] == pathSeparator) {
if(name.empty() || name[name.size()-1] == pathSeparator) {
clearTree();
throw invalid_argument("SensorNavigator: sensor " + name + " does not describe a valid tree path!");
}
......
......@@ -262,16 +262,15 @@ int main(int argc, char** argv) {
// Preparing the SensorNavigator
if(_analyticsManager->probe(argv[argc-1], "dcdbpusher.conf")) {
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector <std::string> names, topics;
vector <std::string> topics;
for (const auto &p : _configuration->getPlugins())
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
for (const auto &s : g->getSensors())
topics.push_back(s->getMqtt());
}
try {
navigator->setFilter(analyticsSettings.filter);
navigator->buildTree(analyticsSettings.hierarchy, &names, &topics);
navigator->buildTree(analyticsSettings.hierarchy, &topics);
topics.clear();
LOG(info) << "Built a sensor hierarchy tree of size " << navigator->getTreeSize() << " and depth " << navigator->getTreeDepth() << ".";
_queryEngine.setNavigator(navigator);
_queryEngine.triggerUpdate();
......
......@@ -25,8 +25,6 @@ void TesterConfigurator::sensorGroup(TesterSensorGroup& s, CFG_VAL config) {
s.setNumSensors(stoull(val.second.data()));
} else if (boost::iequals(val.first, "startValue")) {
s.setValue(stoi(val.second.data()));
} else if (boost::iequals(val.first, "mqttStart")) {
s.setMqttStart(val.second.data());
}
}
......
......@@ -15,7 +15,6 @@ using namespace std;
TesterSensorGroup::TesterSensorGroup(const std::string& name) :
SensorGroupTemplate(name),
_value(0),
_mqttStart("00"),
_numSensors(0) {}
// Overriding assignment operator so that sensors are not copy-constructed
......@@ -26,7 +25,6 @@ TesterSensorGroup& TesterSensorGroup::operator=(const TesterSensorGroup& other)
_numSensors = other._numSensors;
_value = other._value;
_mqttStart = other._mqttStart;
return *this;
}
......@@ -85,5 +83,4 @@ void TesterSensorGroup::readAsync() {
void TesterSensorGroup::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Value: " << _value;
LOG_VAR(ll) << " Num Sensors: " << _numSensors;
LOG_VAR(ll) << " MQTT start: " << _mqttStart;
}
......@@ -22,11 +22,9 @@ public:
void stop() override;
void setValue(long long n) { _value = n; }
void setMqttStart(std::string m) { _mqttStart = m; }
void setNumSensors(unsigned int n) { _numSensors = n; }
long long getValue() { return _value; }
std::string getMqttStart() { return _mqttStart; }
unsigned int getNumSensors() { return _numSensors; }
void printConfig(LOG_LEVEL ll) override;
......@@ -36,7 +34,6 @@ private:
void readAsync() override;
long long _value;
std::string _mqttStart;
unsigned int _numSensors;
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment