11.08., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

Commit 045c4a13 authored by Alessio Netti's avatar Alessio Netti

Merge remote-tracking branch 'remotes/origin/development'

parents c95b8c6d a6b645a6
......@@ -51,7 +51,7 @@ $ ./dcdbpusher config/
## References
* Alessio Netti, Micha Mueller, Axel Auweter, Carla Guillen, Michael Ott, Daniele Tafani and Martin Schulz. _From Facility to Application Sensor Data: Modular, Continuous and Holistic Monitoring with DCDB_. Proceedings of the International Conference for High Performance Computing, Networking, Storage, and Analysis (SC) 2019. [arXiv pre-print available here.](https://arxiv.org/abs/1906.07509)
* Alessio Netti, Micha Mueller, Carla Guillen, Michael Ott, Daniele Tafani, Gence Ozer and Martin Schulz. _DCDB Wintermute: Enabling Online and Holistic Operational Data Analytics on HPC Systems_. Proceedings of the International Parallel and Distributed Processing Symposium (IPDPS) 2020, _submitted_. [arXiv pre-print available here.](https://arxiv.org/abs/1910.06156)
* Alessio Netti, Micha Mueller, Carla Guillen, Michael Ott, Daniele Tafani, Gence Ozer and Martin Schulz. _DCDB Wintermute: Enabling Online and Holistic Operational Data Analytics on HPC Systems_. Proceedings of the International Symposium on High-Performance Parallel and Distributed Computing (HPDC) 2020. [arXiv pre-print available here.](https://arxiv.org/abs/1910.06156)
## Contact, Copyright and License
......
......@@ -225,7 +225,7 @@ protected:
// The job unit is generated as a hierarchical unit
jobUnit = unitGen.generateFromTemplate(uTemplate, jobTopic, jobData.nodes, this->_mqttPart, this->_enforceTopics, this->_relaxed);
// Initializing sensors if necessary
jobUnit->init(this->_cacheSize);
jobUnit->init(this->_interval);
this->addToUnitCache(jobUnit);
}
return jobUnit;
......
......@@ -231,7 +231,7 @@ public:
OperatorInterface::init(io);
for(const auto u : _units)
u->init(_cacheSize);
u->init(_interval);
this->execOnInit();
}
......@@ -350,7 +350,7 @@ public:
addToUnitCache(tempUnit);
}
// Initializing sensors if necessary
tempUnit->init(_cacheSize);
tempUnit->init(_interval);
compute(tempUnit);
retrieveAndFlush(outMap, tempUnit);
} catch(const exception& e) {
......
......@@ -59,9 +59,9 @@ public:
/**
* @brief Initializes the sensors in the unit
*
* @param cacheSize size of the sensor cache
* @param interval Sampling interval in milliseconds
*/
virtual void init(unsigned int cacheSize) = 0;
virtual void init(unsigned int interval) = 0;
/**
* @brief Sets the name of this unit
......
......@@ -177,16 +177,16 @@ public:
/**
* @brief Initializes the sensors in the unit
*
* @param cacheSize size of the sensor cache
* @param interval Sampling interval in milliseconds
*/
void init(unsigned int cacheSize) override {
void init(unsigned int interval) override {
for(const auto s : _outputs)
if (!s->isInit())
s->initSensor(cacheSize);
s->initSensor(interval);
for (const auto &su : _subUnits)
for (const auto s : su->getOutputs())
if (!s->isInit())
s->initSensor(cacheSize);
s->initSensor(interval);
}
/**
......
......@@ -60,7 +60,7 @@ void ClassifierOperator::compute(U_Ptr unit) {
_currentClass = (int)_currentTarget;
_responseSet->push_back(_currentClass);
if ((uint64_t)_trainingSet->size().height >= _trainingSamples + _targetDistance)
trainRandomForest();
trainRandomForest(true);
}
if(_rForest.empty() || !(_rForest->isTrained() || (_trainingPending && _streaming)))
throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");
......
......@@ -120,7 +120,7 @@ void RegressorOperator::compute(U_Ptr unit) {
_trainingSet->push_back(*_currentfVector);
_responseSet->push_back(_currentTarget);
if ((uint64_t)_trainingSet->size().height >= _trainingSamples + _targetDistance)
trainRandomForest();
trainRandomForest(false);
}
if(_rForest.empty() || !(_rForest->isTrained() || (_trainingPending && _streaming)))
throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");
......@@ -132,7 +132,7 @@ void RegressorOperator::compute(U_Ptr unit) {
}
}
void RegressorOperator::trainRandomForest() {
void RegressorOperator::trainRandomForest(bool categorical) {
if(!_trainingSet || _rForest.empty())
throw std::runtime_error("Operator " + _name + ": cannot perform training, missing model!");
if((uint64_t)_responseSet->size().height <= _targetDistance)
......@@ -141,8 +141,16 @@ void RegressorOperator::trainRandomForest() {
*_responseSet = _responseSet->rowRange(_targetDistance, _responseSet->size().height-1);
*_trainingSet = _trainingSet->rowRange(0, _trainingSet->size().height-1-_targetDistance);
shuffleTrainingSet();
if(!_rForest->train(*_trainingSet, cv::ml::ROW_SAMPLE, *_responseSet))
cv::Mat varType = cv::Mat(_trainingSet->size().width + 1, 1, CV_8U);
varType.setTo(cv::Scalar(cv::ml::VAR_NUMERICAL));
varType.at<unsigned char>(_trainingSet->size().width, 0) = categorical ? cv::ml::VAR_CATEGORICAL : cv::ml::VAR_NUMERICAL;
cv::Ptr<cv::ml::TrainData> td = cv::ml::TrainData::create(*_trainingSet, cv::ml::ROW_SAMPLE, *_responseSet, cv::noArray(), cv::noArray(), cv::noArray(), varType);
if(!_rForest->train(td))
throw std::runtime_error("Operator " + _name + ": model training failed!");
td.release();
delete _trainingSet;
_trainingSet = nullptr;
delete _responseSet;
......@@ -191,12 +199,13 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) {
uint64_t startTs = endTs - _aggregationWindow;
std::vector<RegressorSBPtr>& inputs = unit->getInputs();
for(idx=0; idx<inputs.size(); idx++) {
_mean=0; _std=0; _diffsum=0; _qtl25=0; _qtl75=0;
_mean=0; _std=0; _diffsum=0; _qtl25=0; _qtl75=0; _latest=0;
_buffer.clear();
if(!_queryEngine.querySensor(inputs[idx]->getName(), startTs, endTs, _buffer, false) || _buffer.empty())
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
_latest = _buffer.back().value;
if (inputs[idx]->getTrainingTarget())
_currentTarget = (float)_buffer.back().value;
_currentTarget = (float)_latest;
if(!inputs[idx]->getTrainingTarget() || _includeTarget) {
// Computing MEAN and SUM OF DIFFERENCES
......@@ -237,7 +246,7 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) {
_currentfVector->at<float>(fIdx + 2) = (float) _diffsum;
_currentfVector->at<float>(fIdx + 3) = (float) _qtl25;
_currentfVector->at<float>(fIdx + 4) = (float) _qtl75;
_currentfVector->at<float>(fIdx + 5) = (float) _buffer[_buffer.size() - 1].value;
_currentfVector->at<float>(fIdx + 5) = (float) _latest;
} else {
fIdx = idx * REG_NUMFEATURES;
_currentfVector->at<float>(fIdx) = 0.0f;
......
......@@ -77,7 +77,7 @@ protected:
virtual void compute(U_Ptr unit) override;
void computeFeatureVector(U_Ptr unit);
void trainRandomForest();
void trainRandomForest(bool categorical=false);
void shuffleTrainingSet();
std::string getImportances();
......@@ -103,6 +103,7 @@ protected:
int64_t _diffsum;
int64_t _qtl25;
int64_t _qtl75;
int64_t _latest;
// Helper struct to store importance pairs
struct ImportancePair {
......
......@@ -166,6 +166,14 @@ public:
*/
SCError unPublishSensor(const char* publicName);
/**
* @brief Removes one or more sensors from the list of public sensors using a wildcard.
*
* @param wildcard Wildcard used to identify the sensors to be removed
* @return See SCError.
*/
SCError unPublishSensorsByWildcard(const char* wildcard);
/**
* @brief Get the entire list of (virtual or non-virtual) public sensors.
*
......@@ -296,6 +304,14 @@ public:
*/
SCError clearOperations(std::string publicName);
/**
* @brief Removes all operations of all sensors matching a given wildcard.
*
* @param wildcard Wildcard to identify sensors whose operations must be cleared.
* @return See SCError.
*/
SCError clearOperationsByWildcard(std::string wildcard);
/**
* @brief Set a new sensor expression for a virtual sensor.
*
......
......@@ -64,6 +64,7 @@ public:
SCError publishSensor(const SensorMetadata& sensor);
SCError publishVirtualSensor(std::string publicName, std::string vSensorExpression, std::string vSensorId, TimeStamp tZero, uint64_t interval);
SCError unPublishSensor(std::string publicName);
SCError unPublishSensorsByWildcard(std::string wildcard);
SCError getPublicSensorNames(std::list<std::string>& publicSensors);
SCError getPublicSensorsVerbose(std::list<PublicSensor>& publicSensors);
......@@ -82,6 +83,7 @@ public:
SCError setSensorMask(std::string publicName, uint64_t mask);
SCError setOperations(std::string publicName, std::set<std::string> operations);
SCError clearOperations(std::string publicName);
SCError clearOperationsByWildcard(std::string wildcard);
SCError setTimeToLive(std::string publicName, uint64_t ttl);
SCError setSensorInterval(std::string publicName, uint64_t interval);
......
......@@ -112,6 +112,11 @@ SCError SensorConfig::unPublishSensor(const char* publicName)
return impl->unPublishSensor(publicName);
}
SCError SensorConfig::unPublishSensorsByWildcard(const char* wildcard)
{
return impl->unPublishSensorsByWildcard(wildcard);
}
SCError SensorConfig::getPublicSensorNames(std::list<std::string>& publicSensors)
{
return impl->getPublicSensorNames(publicSensors);
......@@ -181,6 +186,11 @@ SCError SensorConfig::clearOperations(std::string publicName)
return impl->clearOperations(publicName);
}
SCError SensorConfig::clearOperationsByWildcard(std::string wildcard)
{
return impl->clearOperationsByWildcard(wildcard);
}
SCError SensorConfig::setTimeToLive(std::string publicName, uint64_t ttl)
{
return impl->setTimeToLive(publicName, ttl);
......@@ -674,6 +684,21 @@ SCError SensorConfigImpl::unPublishSensor(std::string publicName)
return SC_OK;
}
SCError SensorConfigImpl::unPublishSensorsByWildcard(std::string wildcard)
{
std::list<PublicSensor> sensors;
if(getPublicSensorsByWildcard(sensors, wildcard.c_str())!=SC_OK)
return SC_UNKNOWNERROR;
for(const auto& s : sensors) {
if (unPublishSensor(s.name.c_str()) != SC_OK) {
return SC_UNKNOWNERROR;
}
}
return SC_OK;
}
SCError SensorConfigImpl::getPublicSensorNames(std::list<std::string>& publicSensors)
{
/* Check if the session is valid */
......@@ -1593,6 +1618,21 @@ SCError SensorConfigImpl::clearOperations(std::string publicName)
return error;
}
SCError SensorConfigImpl::clearOperationsByWildcard(std::string wildcard)
{
std::list<PublicSensor> sensors;
if(getPublicSensorsByWildcard(sensors, wildcard.c_str())!=SC_OK)
return SC_UNKNOWNERROR;
for(const auto& s : sensors) {
if(clearOperations(s.name) != SC_OK) {
return SC_UNKNOWNERROR;
}
}
return SC_OK;
}
SCError SensorConfigImpl::setTimeToLive(std::string publicName, uint64_t ttl)
{
SCError error = SC_UNKNOWNERROR;
......
......@@ -45,7 +45,7 @@ _dcdbconfig_options()
fi
elif [ "${COMP_WORDS[${toplevel_command_at}]}" = "sensor" ]; then
if [ "${num_args}" -eq "$((${toplevel_command_at}+2))" ]; then
comrep="publish vcreate list listpublic show scalingfactor unit integrable expression tzero frequency unpublish"
comrep="publish vcreate list listpublic show scalingfactor unit ttl integrable expression operations clearoperations clearoperationsw tzero frequency unpublish unpublishw"
fi
if [ "${COMP_WORDS[$((${toplevel_command_at}+1))]}" = "show" ] && [ "${num_args}" -eq "$((${toplevel_command_at}+3))" ]; then
comrep=$(dcdbconfig ${hostname_str} sensor list 2> /dev/null)
......@@ -74,6 +74,9 @@ _dcdbconfig_options()
if [ "${COMP_WORDS[$((${toplevel_command_at}+1))]}" = "unpublish" ] && [ "${num_args}" -eq "$((${toplevel_command_at}+3))" ]; then
comrep=$(dcdbconfig ${hostname_str} sensor list 2> /dev/null)
fi
if [ "${COMP_WORDS[$((${toplevel_command_at}+1))]}" = "unpublishw" ] && [ "${num_args}" -eq "$((${toplevel_command_at}+3))" ]; then
comrep=$(dcdbconfig ${hostname_str} sensor list 2> /dev/null)
fi
elif [ "${COMP_WORDS[${toplevel_command_at}]}" = "db" ]; then
if [ "${num_args}" -eq "$((${toplevel_command_at}+2))" ]; then
comrep="insert fuzzytrunc"
......
......@@ -71,7 +71,9 @@ void SensorAction::printHelp(int argc, char* argv[])
std::cout << " OPERATIONS <public name> <operation>,<operation>,..." << std::endl;
std::cout << " - Set operations for the sensor (e.g., avg, stddev,...)." << std::endl;
std::cout << " CLEAROPERATIONS <public name> - Remove all existing operations for the sensor." << std::endl;
std::cout << " CLEAROPERATIONSW <wildcard> - Remove operations from sensors using a wildcard." << std::endl;
std::cout << " UNPUBLISH <public name> - Unpublish a sensor." << std::endl;
std::cout << " UNPUBLISHW <wildcard> - Unpublish sensors using a wildcard." << std::endl;
}
/*
......@@ -196,13 +198,29 @@ int SensorAction::executeCommand(int argc, char* argv[], int argvidx, const char
}
doClearOperations(argv[argvidx+1]);
}
else if (strcasecmp(argv[argvidx], "CLEAROPERATIONSW") == 0) {
/* CLEAROPERATIONSW needs one more parameter */
if (argvidx+1 >= argc) {
std::cout << "CLEAROPERATIONSW needs one more parameter!" << std::endl;
goto executeCommandError;
}
doClearOperationsByWildcard(argv[argvidx+1]);
}
else if (strcasecmp(argv[argvidx], "UNPUBLISH") == 0) {
/* UNPUBLISH needs one more parameter */
if (argvidx+1 >= argc) {
if (argvidx + 1 >= argc) {
std::cout << "UNPUBLISH needs a parameter!" << std::endl;
goto executeCommandError;
}
doUnPublishSensor(argv[argvidx+1]);
doUnPublishSensor(argv[argvidx + 1]);
}
else if (strcasecmp(argv[argvidx], "UNPUBLISHW") == 0) {
/* UNPUBLISHW needs one more parameter */
if (argvidx+1 >= argc) {
std::cout << "UNPUBLISHW needs a parameter!" << std::endl;
goto executeCommandError;
}
doUnPublishSensorsByWildcard(argv[argvidx+1]);
}
else {
std::cout << "Invalid SENSOR command: " << argv[argvidx] << std::endl;
......@@ -611,6 +629,22 @@ void SensorAction::doClearOperations(const char* publicName)
}
}
void SensorAction::doClearOperationsByWildcard(const char* wildcard)
{
DCDB::SensorConfig sensorConfig(connection);
DCDB::SCError err = sensorConfig.clearOperationsByWildcard(wildcard);
switch (err) {
case DCDB::SC_OK:
break;
case DCDB::SC_INVALIDSESSION:
std::cout << "Invalid session!" << std::endl;
break;
default:
std::cout << "Internal error." << std::endl;
}
}
/*
* Unpublish a sensor
*/
......@@ -619,3 +653,9 @@ void SensorAction::doUnPublishSensor(const char* publicName)
DCDB::SensorConfig sensorConfig(connection);
sensorConfig.unPublishSensor(publicName);
}
void SensorAction::doUnPublishSensorsByWildcard(const char* wildcard)
{
DCDB::SensorConfig sensorConfig(connection);
sensorConfig.unPublishSensorsByWildcard(wildcard);
}
......@@ -59,7 +59,9 @@ protected:
void doTTL(const char* publicName, const char *ttl);
void doOperations(const char* publicName, const char *operations);
void doClearOperations(const char* publicName);
void doClearOperationsByWildcard(const char* wildcard);
void doUnPublishSensor(const char* publicName);
void doUnPublishSensorsByWildcard(const char* wildcard);
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment