Commit eaa6e51a authored by Alessio Netti's avatar Alessio Netti
Browse files

Implementation of sensor cache in libdcdb

- Keeps track of sensor metadata in a JSON file within .cache
- Implemented at the libdcdb level
- Makes use of a lock file to prevent concurrency issues
- Can be enabled via the USE_SENSOR_CACHE compile flag
parent 3c4f43c0
......@@ -39,6 +39,8 @@
#include <string>
#include <list>
#include <set>
#include <fstream>
#include <fcntl.h>
#include "connection.h"
#include "timestamp.h"
......@@ -52,6 +54,8 @@
#define MAX_PATTERN_LENGTH 64
#define SENSOR_CACHE_FILENAME "dcdb_sensor_cache_"
namespace DCDB {
/* Forward-declaration of the implementation-internal classes */
......@@ -111,6 +115,9 @@ typedef enum {
SC_INVALIDVSENSORID, /**< The virtual SensorID is invalid */
SC_WRONGTYPE, /**< You requested an operation for virtual sensors on a physical sensor or vice versa */
SC_UNKNOWNSENSOR, /**< The specified sensor is not known */
SC_OBSOLETECACHE, /**< Sensor cache is no longer valid */
SC_CACHEERROR, /**< Some error while reading the sensor cache occurred */
SC_PATHERROR, /**< Path-related error, likely due to permissions */
SC_UNKNOWNERROR /**< An unknown error occurred */
} SCError;
......
......@@ -29,6 +29,8 @@
#include <string>
#include <unordered_map>
#include <functional>
#include <unistd.h>
#include <sys/stat.h>
#include "cassandra.h"
......@@ -52,9 +54,13 @@ protected:
typedef std::unordered_map<std::string, PublicSensor> SensorMap_t;
SensorMap_t sensorMapByName;
std::list<std::string> sensorList;
std::string _clusterName;
bool _useCache;
bool validateSensorPattern(const char* sensorPattern);
bool validateSensorPublicName(std::string publicName);
int acquireCacheLock(const std::string& path, const bool write);
int releaseCacheLock(const int fd);
public:
SCError loadCache();
......@@ -87,6 +93,14 @@ public:
SCError getPublishedSensorsWritetime(uint64_t &ts);
SCError setPublishedSensorsWritetime(const uint64_t &ts);
SCError getClusterName(std::string &name);
SCError isSensorCacheValid(const std::string &path, const bool names, bool& isValid, uint64_t& entries);
SCError findSensorCachePath(std::string &path);
SCError saveNamesToFile(const std::string &path, const std::list<std::string>& publicSensors);
SCError saveMetadataToFile(const std::string &path, const std::list<PublicSensor>& publicSensors);
SCError loadNamesFromFile(const std::string &path, std::list<std::string>& publicSensors);
SCError loadMetadataFromFile(const std::string &path, std::list<PublicSensor>& publicSensors);
SensorConfigImpl(Connection* conn);
virtual ~SensorConfigImpl();
};
......
......@@ -328,6 +328,54 @@ SCError SensorConfigImpl::loadCache()
}
}
SCError SensorConfigImpl::getClusterName(std::string &name)
{
name = "";
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Fill the list with all public sensors */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT cluster_name FROM system.local;";
statement = cass_statement_new(query, 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc == CASS_OK) {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator)) {
const char* nameStr;
size_t nameStr_len;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "cluster_name"), &nameStr, &nameStr_len) == CASS_OK) {
name = std::string(nameStr, nameStr_len);
}
}
cass_result_free(result);
cass_iterator_free(iterator);
} else {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
}
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
SCError SensorConfigImpl::getPublishedSensorsWritetime(uint64_t &ts)
{
ts = 0;
......@@ -840,6 +888,14 @@ SCError SensorConfigImpl::unPublishSensorsByWildcard(std::string wildcard)
SCError SensorConfigImpl::getPublicSensorNames(std::list<std::string>& publicSensors)
{
#ifdef USE_SENSOR_CACHE
// If we have a valid cache file, we bypass querying the database
std::string cachePath = "";
if(findSensorCachePath(cachePath) == SC_OK && loadNamesFromFile(cachePath, publicSensors) == SC_OK) {
return SC_OK;
}
#endif
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
......@@ -898,11 +954,26 @@ SCError SensorConfigImpl::getPublicSensorNames(std::list<std::string>& publicSen
while(morePages);
cass_statement_free(statement);
#ifdef USE_SENSOR_CACHE
if(cachePath != "") {
saveNamesToFile(cachePath, publicSensors);
}
#endif
return SC_OK;
}
SCError SensorConfigImpl::getPublicSensorsVerbose(std::list<PublicSensor>& publicSensors)
{
#ifdef USE_SENSOR_CACHE
// If we have a valid cache file, we bypass querying the database
std::string cachePath = "";
if(findSensorCachePath(cachePath) == SC_OK && loadMetadataFromFile(cachePath, publicSensors) == SC_OK) {
return SC_OK;
}
#endif
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
......@@ -1037,6 +1108,13 @@ SCError SensorConfigImpl::getPublicSensorsVerbose(std::list<PublicSensor>& publi
while(morePages);
cass_statement_free(statement);
#ifdef USE_SENSOR_CACHE
if(cachePath != "") {
saveMetadataToFile(cachePath, publicSensors);
}
#endif
return SC_OK;
}
......@@ -1815,10 +1893,284 @@ SCError SensorConfigImpl::setSensorInterval(std::string publicName, uint64_t int
return error;
}
SCError SensorConfigImpl::isSensorCacheValid(const std::string &path, const bool names, bool& isValid, uint64_t& entries) {
isValid = false;
entries = 0;
uint64_t ts = 0;
std::string stringBuffer;
uint64_t cacheTs;
bool cacheNames;
uint64_t cacheEntries;
if (!session) {
return SC_INVALIDSESSION;
}
if(getPublishedSensorsWritetime(ts) != SC_OK) {
return SC_UNKNOWNERROR;
}
std::ifstream cacheFile(path);
if(!cacheFile.is_open()) {
return SC_CACHEERROR;
}
if(!std::getline(cacheFile, stringBuffer)) {
return SC_CACHEERROR;
}
try {
size_t oldPos=0, newPos=0;
// Parsing cache timestamp
if((newPos = stringBuffer.find(",", oldPos)) == std::string::npos) {
return SC_CACHEERROR;
}
cacheTs = std::stoull(stringBuffer.substr(oldPos, newPos - oldPos));
oldPos = newPos + 1;
// Parsing cache type
if((newPos = stringBuffer.find(",", oldPos)) == std::string::npos) {
return SC_CACHEERROR;
}
cacheNames = stringBuffer.substr(oldPos, newPos - oldPos) == "true";
cacheEntries = std::stoull(stringBuffer.substr(newPos + 1));
} catch(const std::exception &e) {
return SC_CACHEERROR;
}
cacheFile.close();
isValid = cacheTs >= ts && cacheEntries > 0 && (cacheNames || cacheNames == names);
entries = cacheEntries;
return SC_OK;
}
SCError SensorConfigImpl::findSensorCachePath(std::string &path) {
path = "";
std::string homeDir = "";
// Retrieving name of the cluster (just once)
if(_clusterName == "" && getClusterName(_clusterName) != SC_OK) {
_clusterName = "";
return SC_UNKNOWNERROR;
}
// Retrieving home dir
if((homeDir = getenv("HOME")) == "") {
return SC_PATHERROR;
}
path = homeDir + "/.cache/";
// Cache dir does not exist - we create it
if(access(path.c_str(), F_OK) != 0 && mkdir(path.c_str(), 0700) != 0) {
path = "";
return SC_PATHERROR;
}
// Cache dir exists but it is not accessible
if(access(path.c_str(), W_OK) != 0) {
path = "";
return SC_PATHERROR;
}
path += std::string(SENSOR_CACHE_FILENAME) + _clusterName;
return SC_OK;
}
SCError SensorConfigImpl::saveNamesToFile(const std::string &path, const std::list<std::string>& publicSensors) {
std::string stringBuffer = "";
std::ofstream cacheFile;
int lfd = acquireCacheLock(path, true);
cacheFile.open(path);
if(!cacheFile.is_open()) {
releaseCacheLock(lfd);
return SC_CACHEERROR;
}
try {
stringBuffer = std::to_string(TimeStamp().getRaw()) + "," + "false" + "," + std::to_string(publicSensors.size());
cacheFile << stringBuffer << std::endl;
for(const auto& p : publicSensors) {
SensorMetadata sm;
sm.setPublicName(p);
sm.setPattern(p);
stringBuffer = sm.getJSON();
stringBuffer.erase(std::remove(stringBuffer.begin(), stringBuffer.end(), '\n'), stringBuffer.end());
stringBuffer.erase(std::remove(stringBuffer.begin(), stringBuffer.end(), ' '), stringBuffer.end());
cacheFile << stringBuffer << std::endl;
}
} catch(const std::exception& e) {
cacheFile.close();
releaseCacheLock(lfd);
return SC_CACHEERROR;
}
cacheFile.close();
releaseCacheLock(lfd);
// std::cout << "Saved " << publicSensors.size() << " sensors to name cache..." << std::endl;
return SC_OK;
}
SCError SensorConfigImpl::saveMetadataToFile(const std::string &path, const std::list<PublicSensor>& publicSensors) {
std::string stringBuffer = "";
std::ofstream cacheFile;
int lfd = acquireCacheLock(path, true);
cacheFile.open(path);
if(!cacheFile.is_open()) {
releaseCacheLock(lfd);
return SC_CACHEERROR;
}
try {
stringBuffer = std::to_string(TimeStamp().getRaw()) + "," + "true" + "," + std::to_string(publicSensors.size());
cacheFile << stringBuffer << std::endl;
for(const auto& p : publicSensors) {
stringBuffer = PublicSensor::publicSensorToMetadata(p).getJSON();
stringBuffer.erase(std::remove(stringBuffer.begin(), stringBuffer.end(), '\n'), stringBuffer.end());
stringBuffer.erase(std::remove(stringBuffer.begin(), stringBuffer.end(), ' '), stringBuffer.end());
cacheFile << stringBuffer << std::endl;
}
} catch(const std::exception& e) {
cacheFile.close();
releaseCacheLock(lfd);
return SC_CACHEERROR;
}
cacheFile.close();
releaseCacheLock(lfd);
// std::cout << "Saved " << publicSensors.size() << " sensors to metadata cache..." << std::endl;
return SC_OK;
}
SCError SensorConfigImpl::loadNamesFromFile(const std::string &path, std::list<std::string>& publicSensors) {
std::string stringBuffer = "";
std::ifstream cacheFile;
bool valid = true;
uint64_t entries = 0;
int lfd = acquireCacheLock(path, false);
cacheFile.open(path);
if(isSensorCacheValid(path, false, valid, entries) != SC_OK || !valid) {
releaseCacheLock(lfd);
return SC_OBSOLETECACHE;
}
if(!cacheFile.is_open() || !std::getline(cacheFile, stringBuffer)) {
cacheFile.close();
releaseCacheLock(lfd);
return SC_CACHEERROR;
}
uint64_t sCtr = 0;
publicSensors.clear();
while(std::getline(cacheFile, stringBuffer)) {
SensorMetadata sm;
sm.parseJSON(stringBuffer);
if(sm.isValid()) {
publicSensors.push_back(*sm.getPublicName());
sCtr++;
}
}
cacheFile.close();
releaseCacheLock(lfd);
if(sCtr != entries) {
publicSensors.clear();
return SC_CACHEERROR;
} else {
// std::cout << "Loaded " << publicSensors.size() << " sensors from names cache..." << std::endl;
return SC_OK;
}
}
SCError SensorConfigImpl::loadMetadataFromFile(const std::string &path, std::list<PublicSensor>& publicSensors) {
std::string stringBuffer = "";
std::ifstream cacheFile;
bool valid = true;
uint64_t entries = 0;
int lfd = acquireCacheLock(path, false);
cacheFile.open(path);
if(isSensorCacheValid(path, true, valid, entries) != SC_OK || !valid) {
releaseCacheLock(lfd);
return SC_OBSOLETECACHE;
}
if(!cacheFile.is_open() || !std::getline(cacheFile, stringBuffer)) {
cacheFile.close();
releaseCacheLock(lfd);
return SC_CACHEERROR;
}
uint64_t sCtr = 0;
publicSensors.clear();
while(std::getline(cacheFile, stringBuffer)) {
SensorMetadata sm;
sm.parseJSON(stringBuffer);
if(sm.isValid()) {
publicSensors.push_back(PublicSensor::metadataToPublicSensor(sm));
sCtr++;
}
}
cacheFile.close();
releaseCacheLock(lfd);
if(sCtr != entries) {
publicSensors.clear();
return SC_CACHEERROR;
} else {
// std::cout << "Loaded " << publicSensors.size() << " sensors from metadata cache..." << std::endl;
return SC_OK;
}
}
int SensorConfigImpl::acquireCacheLock(const std::string& path, const bool write) {
struct flock fl;
fl.l_type = write ? F_WRLCK : F_RDLCK;
fl.l_whence = SEEK_SET;
fl.l_start = 0;
fl.l_len = 0;
if(path == "") {
return -1;
}
std::string lockPath = path + "_lock_file";
int l_fd = open(lockPath.c_str(), O_RDWR | O_CREAT, S_IRWXU);
if(l_fd < 0 || fcntl(l_fd, F_SETLKW, &fl) < 0) {
return -1;
}
// std::cout << "Successfully acquired cache " << (write ? "write" : "read") << " lock..." << std::endl;
return l_fd;
}
int SensorConfigImpl::releaseCacheLock(const int fd) {
struct flock fl;
fl.l_type = F_UNLCK;
fl.l_whence = SEEK_SET;
fl.l_start = 0;
fl.l_len = 0;
if(fd < 0) {
return -1;
}
if(fcntl(fd, F_SETLKW, &fl) < 0 || close(fd) < 0) {
return -1;
}
// std::cout << "Successfully released cache lock..." << std::endl;
return 0;
}
SensorConfigImpl::SensorConfigImpl(Connection* conn)
{
connection = conn;
session = connection->getSessionHandle();
_clusterName = "";
}
SensorConfigImpl::~SensorConfigImpl()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment