Commit 1ba44cea authored by Alessio Netti's avatar Alessio Netti

Updated sensor cache implementation

- SensorMetadata objects do not rely on shared pointers anymore, results
in a 50% (or more) reduced memory usage
- Sensor cache files are written as CSV files, bypassing BOOST's PTREE
parsing
- With a local DB instance, retrieving 100k sensors from the cache is
now between 2X and 3X faster than querying Cassandra
parent eaa6e51a
......@@ -33,7 +33,7 @@ $ make depsinstall install
$ make depsinstall install doc
```
To run DCDB locally with the included default configuration files:
If you would like to enable DCDB's advanced caching features, add the -DUSE_SENSOR_CACHE parameter to the CXXFLAGS string in the config.mk file. To run DCDB locally with the included default configuration files:
```bash
//Change to the DCDB directory you created during installation
......
......@@ -142,6 +142,18 @@ static bool to_bool(const std::string& s) {
return s=="true" || s=="on";
}
/**
* @brief Converts the input boolean to a string
*
* @param b The input boolean
* @return The corresponding string value
*
* @ingroup globalconf
*/
static std::string bool_to_str(const bool& b) {
return (b ? "true" : "false");
}
/**
* @brief This class contains the logic to parse and store configurations in
* the form of BOOST INFO trees. The configuration parameters which are
......
......@@ -47,69 +47,64 @@ using namespace std;
class SensorMetadata {
public:
typedef enum {
IS_OPERATION_SET = 1,
IS_VIRTUAL_SET = 2,
INTEGRABLE_SET = 4,
MONOTONIC_SET = 8,
PUBLICNAME_SET = 16,
PATTERN_SET = 32,
UNIT_SET = 64,
SCALE_SET = 128,
TTL_SET = 256,
INTERVAL_SET = 512,
OPERATIONS_SET = 1024
} MetadataMask;
SensorMetadata() :
isOperation(nullptr),
isVirtual(nullptr),
integrable(nullptr),
monotonic(nullptr),
publicName(nullptr),
pattern(nullptr),
unit(nullptr),
scale(nullptr),
ttl(nullptr),
interval(nullptr),
operations(nullptr) {}
isOperation(false),
isVirtual(false),
integrable(false),
monotonic(false),
publicName(""),
pattern(""),
unit(""),
scale(1),
ttl(0),
interval(0),
operations(),
setMask(0) {}
SensorMetadata(const SensorMetadata& other) {
SensorMetadata();
if(other.isOperation)
setIsOperation(*other.isOperation);
if(other.isVirtual)
setIsVirtual(*other.isVirtual);
if(other.integrable)
setIntegrable(*other.integrable);
if(other.monotonic)
setMonotonic(*other.monotonic);
if(other.publicName)
setPublicName(*other.publicName);
if(other.pattern)
setPattern(*other.pattern);
if(other.unit)
setUnit(*other.unit);
if(other.scale)
setScale(*other.scale);
if(other.ttl)
setTTL(*other.ttl);
if(other.interval)
setInterval(*other.interval);
if(other.operations)
setOperations(*other.operations);
setMask = other.setMask;
isOperation = other.isOperation;
isVirtual = other.isVirtual;
integrable = other.integrable;
monotonic = other.monotonic;
publicName = other.publicName;
pattern = other.pattern;
unit = other.unit;
scale = other.scale;
ttl = other.ttl;
interval = other.interval;
operations = other.operations;
}
SensorMetadata& operator=(const SensorMetadata& other) {
if(other.isOperation)
setIsOperation(*other.isOperation);
if(other.isVirtual)
setIsVirtual(*other.isVirtual);
if(other.integrable)
setIntegrable(*other.integrable);
if(other.monotonic)
setMonotonic(*other.monotonic);
if(other.publicName)
setPublicName(*other.publicName);
if(other.pattern)
setPattern(*other.pattern);
if(other.unit)
setUnit(*other.unit);
if(other.scale)
setScale(*other.scale);
if(other.ttl)
setTTL(*other.ttl);
if(other.interval)
setInterval(*other.interval);
if(other.operations)
setOperations(*other.operations);
setMask = other.setMask;
isOperation = other.isOperation;
isVirtual = other.isVirtual;
integrable = other.integrable;
monotonic = other.monotonic;
publicName = other.publicName;
pattern = other.pattern;
unit = other.unit;
scale = other.scale;
ttl = other.ttl;
interval = other.interval;
operations = other.operations;
return *this;
}
......@@ -127,6 +122,64 @@ public:
parsePTREE(config);
}
/**
* @brief Parses a CSV string and stores the content in this object.
*
* If parsing fails, a InvalidArgument exception is thrown.
*
* @param payload CSV-encoded string containing metadata information
*/
void parseCSV(const string& payload) {
uint64_t fieldCtr = 0, oldPos = 0, newPos = 0;
std::string buf = "";
while(oldPos < payload.length() && (newPos = payload.find(",", oldPos)) != std::string::npos) {
if((newPos - oldPos) > 0) {
buf = payload.substr(oldPos, newPos - oldPos);
switch (fieldCtr) {
case 0 :
setScale(stod(buf));
break;
case 1 :
setIsOperation(to_bool(buf));
break;
case 2 :
setIsVirtual(to_bool(buf));
break;
case 3 :
setMonotonic(to_bool(buf));
break;
case 4 :
setIntegrable(to_bool(buf));
break;
case 5 :
setUnit(buf);
break;
case 6 :
setPublicName(buf);
break;
case 7 :
setPattern(buf);
break;
case 8 :
setInterval(stoull(buf) * 1000000);
break;
case 9 :
setTTL(stoull(buf) * 1000000);
break;
case 10 :
setOperations(_parseOperations(buf, ','));
oldPos = payload.length();
break;
}
}
fieldCtr++;
oldPos = newPos + 1;
}
if(fieldCtr < 11) {
throw std::invalid_argument("Wrong number of fields in CSV entry!");
}
}
/**
* @brief Parses a PTREE INFO block and stores the content in this object.
*
......@@ -175,6 +228,33 @@ public:
return output.str();
}
/**
* @brief Converts the content of this object into CSV format.
*
* @return String containing the CSV representation of this object
*/
string getCSV() const {
std::string buf = "";
if(setMask & SCALE_SET) {
std::ostringstream scaleStream;
scaleStream << scale;
buf += scaleStream.str() + ",";
} else {
buf += ",";
}
buf += (setMask & IS_OPERATION_SET) ? bool_to_str(isOperation) + "," : ",";
buf += (setMask & IS_VIRTUAL_SET) ? bool_to_str(isVirtual) + "," : ",";
buf += (setMask & MONOTONIC_SET) ? bool_to_str(monotonic) + "," : ",";
buf += (setMask & INTEGRABLE_SET) ? bool_to_str(integrable) + "," : ",";
buf += (setMask & UNIT_SET) ? unit + "," : ",";
buf += (setMask & PUBLICNAME_SET) ? publicName + "," : ",";
buf += (setMask & PATTERN_SET) ? pattern + "," : ",";
buf += (setMask & INTERVAL_SET) ? to_string(interval / 1000000) + "," : ",";
buf += (setMask & TTL_SET) ? to_string(ttl / 1000000) + "," : ",";
buf += (setMask & OPERATIONS_SET) ? _dumpOperations(',') + "," : ",";
return buf;
}
/**
* @brief Returns a sensorMetadata_t object from the internal map, converted into PTREE format.
*
......@@ -186,49 +266,50 @@ public:
return config;
}
const bool isValid() const { return publicName && pattern; }
const bool isValid() const { return setMask & (PUBLICNAME_SET | PATTERN_SET); }
//Getters and setters
const bool* getIsOperation() const { return isOperation.get(); }
const bool* getIsVirtual() const { return isVirtual.get(); }
const bool* getIntegrable() const { return integrable.get(); }
const bool* getMonotonic() const { return monotonic.get(); }
const string* getPublicName() const { return publicName.get(); }
const string* getPattern() const { return pattern.get(); }
const string* getUnit() const { return unit.get(); }
const double* getScale() const { return scale.get(); }
const uint64_t* getTTL() const { return ttl.get(); }
const uint64_t* getInterval() const { return interval.get(); }
const set<string>* getOperations() const { return operations.get(); }
const string getOperationsString() const { return operations ? _dumpOperations() : ""; }
const bool* getIsOperation() const { return (setMask & IS_OPERATION_SET) ? &isOperation : nullptr; }
const bool* getIsVirtual() const { return (setMask & IS_VIRTUAL_SET) ? &isVirtual : nullptr; }
const bool* getIntegrable() const { return (setMask & INTEGRABLE_SET) ? &integrable : nullptr; }
const bool* getMonotonic() const { return (setMask & MONOTONIC_SET) ? &monotonic : nullptr; }
const string* getPublicName() const { return (setMask & PUBLICNAME_SET) ? &publicName : nullptr; }
const string* getPattern() const { return (setMask & PATTERN_SET) ? &pattern : nullptr; }
const string* getUnit() const { return (setMask & UNIT_SET) ? &unit : nullptr; }
const double* getScale() const { return (setMask & SCALE_SET) ? &scale : nullptr; }
const uint64_t* getTTL() const { return (setMask & TTL_SET) ? &ttl : nullptr; }
const uint64_t* getInterval() const { return (setMask & INTERVAL_SET) ? &interval : nullptr; }
const set<string>* getOperations() const { return (setMask & OPERATIONS_SET) ? &operations : nullptr; }
const string getOperationsString() const { return (setMask & OPERATIONS_SET) ? _dumpOperations() : ""; }
void setIsOperation(bool o) { isOperation = std::make_shared<bool>(o); }
void setIsVirtual(bool v) { isVirtual = std::make_shared<bool>(v); }
void setIntegrable(bool i) { integrable = std::make_shared<bool>(i); }
void setMonotonic(bool m) { monotonic = std::make_shared<bool>(m); }
void setPublicName(string p) { publicName = std::make_shared<string>(p); }
void setPattern(string p) { pattern = std::make_shared<string>(p); }
void setUnit(string u) { unit = std::make_shared<string>(u); }
void setScale(double s) { scale = std::make_shared<double>(s); }
void setTTL(uint64_t t) { ttl = std::make_shared<uint64_t>(t); }
void setInterval(uint64_t i) { interval = std::make_shared<uint64_t>(i); }
void setIsOperation(bool o) { isOperation = o; setMask = setMask | IS_OPERATION_SET; }
void setIsVirtual(bool v) { isVirtual = v; setMask = setMask | IS_VIRTUAL_SET; }
void setIntegrable(bool i) { integrable = i; setMask = setMask | INTEGRABLE_SET; }
void setMonotonic(bool m) { monotonic = m; setMask = setMask | MONOTONIC_SET; }
void setPublicName(string p) { publicName = p; setMask = setMask | PUBLICNAME_SET; }
void setPattern(string p) { pattern = p; setMask = setMask | PATTERN_SET; }
void setUnit(string u) { unit = u; setMask = setMask | UNIT_SET; }
void setScale(double s) { scale = s; setMask = setMask | SCALE_SET; }
void setTTL(uint64_t t) { ttl = t; setMask = setMask | TTL_SET; }
void setInterval(uint64_t i) { interval = i; setMask = setMask | INTERVAL_SET; }
void setOperations(const string& o) { setOperations(_parseOperations(o)); }
void clearOperations() { operations.reset(); }
void clearOperations() { operations.clear(); setMask = setMask & ~OPERATIONS_SET; }
// Merges a set of operations with the local one
void setOperations(const set<string>& o) {
if(!operations)
operations = std::make_shared<set<string>>(o);
if(setMask & OPERATIONS_SET)
operations.insert(o.begin(), o.end());
else
operations->insert(o.begin(), o.end());
operations = set<string>(o);
setMask = setMask | OPERATIONS_SET;
}
// Adds a single operation. Requires the publicName field to be set.
// Here the operation is assumed to be the full sensor name, from which the actual operation name is extracted.
bool addOperation(const string& opName) {
if (publicName && publicName->length()>0 && opName.length()>publicName->length()
&& !opName.compare(0, publicName->length(), *publicName)) {
setOperations(opName.substr(publicName->length()));
if ((setMask & PUBLICNAME_SET) && publicName.length()>0 && opName.length()>publicName.length()
&& !opName.compare(0, publicName.length(), publicName)) {
setOperations(opName.substr(publicName.length()));
return true;
}
else
......@@ -257,8 +338,8 @@ protected:
string out="";
// We re-write the vector into a string, this time properly formatted
string sepStr = string(1,sep);
if(operations) {
for (const auto &el : *operations)
if(setMask & OPERATIONS_SET) {
for (const auto &el : operations)
out += el + sepStr;
if (!out.empty() && out.back() == sep)
out.erase(out.size() - 1, 1);
......@@ -269,46 +350,46 @@ protected:
// Dumps the contents of "s" in "config"
void _dumpPTREE(boost::property_tree::ptree& config) const {
config.clear();
if(scale) {
if(setMask & SCALE_SET) {
std::ostringstream scaleStream;
scaleStream << *scale;
scaleStream << scale;
config.push_back(boost::property_tree::ptree::value_type("scale", boost::property_tree::ptree(scaleStream.str())));
}
if(isOperation)
config.push_back(boost::property_tree::ptree::value_type("isOperation", boost::property_tree::ptree(*isOperation ? "true" : "false")));
if(isVirtual)
config.push_back(boost::property_tree::ptree::value_type("isVirtual", boost::property_tree::ptree(*isVirtual ? "true" : "false")));
if(monotonic)
config.push_back(boost::property_tree::ptree::value_type("monotonic", boost::property_tree::ptree(*monotonic ? "true" : "false")));
if(integrable)
config.push_back(boost::property_tree::ptree::value_type("integrable", boost::property_tree::ptree(*integrable ? "true" : "false")));
if(unit)
config.push_back(boost::property_tree::ptree::value_type("unit", boost::property_tree::ptree(*unit)));
if(publicName)
config.push_back(boost::property_tree::ptree::value_type("publicName", boost::property_tree::ptree(*publicName)));
if(pattern)
config.push_back(boost::property_tree::ptree::value_type("pattern", boost::property_tree::ptree(*pattern)));
if(interval)
config.push_back(boost::property_tree::ptree::value_type("interval", boost::property_tree::ptree(to_string(*interval / 1000000))));
if(ttl)
config.push_back(boost::property_tree::ptree::value_type("ttl", boost::property_tree::ptree(to_string(*ttl / 1000000))));
if(operations)
if(setMask & IS_OPERATION_SET)
config.push_back(boost::property_tree::ptree::value_type("isOperation", boost::property_tree::ptree(bool_to_str(isOperation))));
if(setMask & IS_VIRTUAL_SET)
config.push_back(boost::property_tree::ptree::value_type("isVirtual", boost::property_tree::ptree(bool_to_str(isVirtual))));
if(setMask & MONOTONIC_SET)
config.push_back(boost::property_tree::ptree::value_type("monotonic", boost::property_tree::ptree(bool_to_str(monotonic))));
if(setMask & INTEGRABLE_SET)
config.push_back(boost::property_tree::ptree::value_type("integrable", boost::property_tree::ptree(bool_to_str(integrable))));
if(setMask & UNIT_SET)
config.push_back(boost::property_tree::ptree::value_type("unit", boost::property_tree::ptree(unit)));
if(setMask & PUBLICNAME_SET)
config.push_back(boost::property_tree::ptree::value_type("publicName", boost::property_tree::ptree(publicName)));
if(setMask & PATTERN_SET)
config.push_back(boost::property_tree::ptree::value_type("pattern", boost::property_tree::ptree(pattern)));
if(setMask & INTERVAL_SET)
config.push_back(boost::property_tree::ptree::value_type("interval", boost::property_tree::ptree(to_string(interval / 1000000))));
if(setMask & TTL_SET)
config.push_back(boost::property_tree::ptree::value_type("ttl", boost::property_tree::ptree(to_string(ttl / 1000000))));
if(setMask & OPERATIONS_SET)
config.push_back(boost::property_tree::ptree::value_type("operations", boost::property_tree::ptree(_dumpOperations())));
}
// Protected class members
std::shared_ptr<bool> isOperation;
std::shared_ptr<bool> isVirtual;
std::shared_ptr<bool> integrable;
std::shared_ptr<bool> monotonic;
std::shared_ptr<string> publicName;
std::shared_ptr<string> pattern;
std::shared_ptr<string> unit;
std::shared_ptr<double> scale;
std::shared_ptr<uint64_t> ttl;
std::shared_ptr<uint64_t> interval;
std::shared_ptr<set<string>> operations;
bool isOperation;
bool isVirtual;
bool integrable;
bool monotonic;
string publicName;
string pattern;
string unit;
double scale;
uint64_t ttl;
uint64_t interval;
set<string> operations;
uint64_t setMask;
};
// ---------------------------------------------------------------------------
......
......@@ -1993,9 +1993,7 @@ SCError SensorConfigImpl::saveNamesToFile(const std::string &path, const std::li
SensorMetadata sm;
sm.setPublicName(p);
sm.setPattern(p);
stringBuffer = sm.getJSON();
stringBuffer.erase(std::remove(stringBuffer.begin(), stringBuffer.end(), '\n'), stringBuffer.end());
stringBuffer.erase(std::remove(stringBuffer.begin(), stringBuffer.end(), ' '), stringBuffer.end());
stringBuffer = sm.getCSV();
cacheFile << stringBuffer << std::endl;
}
} catch(const std::exception& e) {
......@@ -2026,9 +2024,7 @@ SCError SensorConfigImpl::saveMetadataToFile(const std::string &path, const std:
stringBuffer = std::to_string(TimeStamp().getRaw()) + "," + "true" + "," + std::to_string(publicSensors.size());
cacheFile << stringBuffer << std::endl;
for(const auto& p : publicSensors) {
stringBuffer = PublicSensor::publicSensorToMetadata(p).getJSON();
stringBuffer.erase(std::remove(stringBuffer.begin(), stringBuffer.end(), '\n'), stringBuffer.end());
stringBuffer.erase(std::remove(stringBuffer.begin(), stringBuffer.end(), ' '), stringBuffer.end());
stringBuffer = PublicSensor::publicSensorToMetadata(p).getCSV();
cacheFile << stringBuffer << std::endl;
}
} catch(const std::exception& e) {
......@@ -2066,11 +2062,13 @@ SCError SensorConfigImpl::loadNamesFromFile(const std::string &path, std::list<s
publicSensors.clear();
while(std::getline(cacheFile, stringBuffer)) {
SensorMetadata sm;
sm.parseJSON(stringBuffer);
if(sm.isValid()) {
publicSensors.push_back(*sm.getPublicName());
sCtr++;
}
try {
sm.parseCSV(stringBuffer);
if (sm.isValid()) {
publicSensors.push_back(*sm.getPublicName());
sCtr++;
}
} catch(const std::exception& e) {}
}
cacheFile.close();
......@@ -2107,11 +2105,13 @@ SCError SensorConfigImpl::loadMetadataFromFile(const std::string &path, std::lis
publicSensors.clear();
while(std::getline(cacheFile, stringBuffer)) {
SensorMetadata sm;
sm.parseJSON(stringBuffer);
if(sm.isValid()) {
publicSensors.push_back(PublicSensor::metadataToPublicSensor(sm));
sCtr++;
}
try {
sm.parseCSV(stringBuffer);
if (sm.isValid()) {
publicSensors.push_back(PublicSensor::metadataToPublicSensor(sm));
sCtr++;
}
} catch(const std::exception& e) {}
}
cacheFile.close();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment