Commit 0178119d authored by Alessio Netti's avatar Alessio Netti
Browse files

Grafana: various changes

- Added all missing Cassandra configuration parameters
- Decoupled the sensor navigator's initialization
- Added the "filter" configuration parameter for the sensor navigator
- Improved error handling
parent 1d865069
......@@ -52,6 +52,8 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
hierarchySettings.separator = global.second.data();
} else if (boost::iequals(global.first, "regex")) {
hierarchySettings.regex = global.second.data();
} else if (boost::iequals(global.first, "filter")) {
hierarchySettings.filter = global.second.data();
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
......@@ -63,6 +65,19 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
if (boost::iequals(global.first, "address")) {
cassandraSettings.host = parseNetworkHost(global.second.data());
cassandraSettings.port = parseNetworkPort(global.second.data());
if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
} else if (boost::iequals(global.first, "username")) {
cassandraSettings.username = global.second.data();
} else if (boost::iequals(global.first, "password")) {
cassandraSettings.password = global.second.data();
} else if (boost::iequals(global.first, "numThreadsIo")) {
cassandraSettings.numThreadsIo = stoul(global.second.data());
} else if (boost::iequals(global.first, "queueSizeIo")) {
cassandraSettings.queueSizeIo = stoul(global.second.data());
} else if (boost::iequals(global.first, "coreConnPerHost")) {
cassandraSettings.coreConnPerHost = stoul(global.second.data());
} else if (boost::iequals(global.first, "debugLog")) {
cassandraSettings.debugLog = to_bool(global.second.data());
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
......
......@@ -42,6 +42,12 @@
typedef struct {
std::string host;
std::string port;
std::string username;
std::string password;
uint32_t numThreadsIo;
uint32_t queueSizeIo;
uint32_t coreConnPerHost;
bool debugLog;
} cassandraSettings_t;
/**
......@@ -51,6 +57,7 @@ typedef struct {
typedef struct {
std::string separator;
std::string regex;
std::string filter;
} hierarchySettings_t;
/**
......@@ -78,8 +85,19 @@ public:
restAPISettings.host = std::string(DEFAULT_GRAFANAHOST);
restAPISettings.port = std::string(DEFAULT_GRAFANAPORT);
cassandraSettings.host = std::string(DEFAULT_CASSANDRAHOST);
cassandraSettings.port = std::string(DEFAULT_CASSANDRAPORT);
cassandraSettings.username = "";
cassandraSettings.password = "";
cassandraSettings.numThreadsIo = 1;
cassandraSettings.queueSizeIo = 4096;
cassandraSettings.coreConnPerHost = 1;
cassandraSettings.debugLog = false;
hierarchySettings.separator = "";
hierarchySettings.regex = "";
hierarchySettings.filter = "";
}
virtual ~Configuration() {}
......
......@@ -52,12 +52,14 @@ void printSyntax()
012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
std::cout << "Usage:" << std::endl;
std::cout << " grafanaserver [-d] [-c<host:port>] [-t<number>] [-v<level>] [-w<path>] <config>" << std::endl;
std::cout << " grafanaserver [-d] [-c<host:port>] [-u<username>] [-p<password>] [-t<number>] [-v<level>] [-w<path>] <config>" << std::endl;
std::cout << " grafanaserver -h" << std::endl;
std::cout << std::endl;
std::cout << "Options:" << std::endl;
std::cout << " -c <host:port> Cassandra host and port. [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
std::cout << " -u<username> Cassandra username [default: none]" << endl;
std::cout << " -p<password> Cassandra password [default: none]" << endl;
std::cout << " -t <number> Thread count. [default: " << DEFAULT_THREADS << "]" << std::endl;
std::cout << " -v <level> Set verbosity of output. [default: " << DEFAULT_LOGLEVEL << "]" << std::endl
<< " Can be a number between 5 (all) and 0 (fatal)." << std::endl;
......@@ -106,7 +108,7 @@ int main(int argc, char *argv[])
}
//define allowed command-line options once
const char opts[] = "c:t:v:w:dh";
const char opts[] = "c:u:p:t:v:w:dh";
//check if help flag specified
char c;
......@@ -124,169 +126,187 @@ int main(int argc, char *argv[])
//init LOGGING
initLogging();
//set up logger to command line
auto cmdSink = setupCmdLogger();
//get logger instance
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
//finished logging startup for the moment (file log added later)
try {
_configuration = new Configuration(argv[argc-1], "grafana.conf");
//Read global variables from config file
_configuration->readConfig();
//read global settings
Configuration& globalSettings = *_configuration;
cassandraSettings_t& cassandraSettings = _configuration->cassandraSettings;
serverSettings_t& restAPISettings = _configuration->restAPISettings;
hierarchySettings_t& hierarchySettings = _configuration->hierarchySettings;
//reset getopt()
optind = 1;
//read in options (overwrite dcdbpusher.conf settings if necessary)
while ((c = getopt(argc, argv, opts)) != -1) {
switch (c)
{
case 'c':
cassandraSettings.host = parseNetworkHost(optarg);
cassandraSettings.port = parseNetworkPort(optarg) == "" ? DEFAULT_CASSANDRAPORT : parseNetworkPort(optarg);
break;
case 't':
globalSettings.threads = stoul(optarg);
case 'v':
globalSettings.logLevelCmd = stoi(optarg);
break;
case 'd':
globalSettings.daemonize = 1;
break;
case 'w':
globalSettings.tempdir = optarg;
if (globalSettings.tempdir[globalSettings.tempdir.length()-1] != '/') {
globalSettings.tempdir.append("/");
}
break;
case 'h':
printSyntax();
return 1;
default:
if (c != '?') cerr << "Unknown parameter: " << c << endl;
return 1;
}
}
//we now should know where the writable tempdir is
//set up logger to file
if (globalSettings.logLevelFile >= 0) {
auto fileSink = setupFileLogger(globalSettings.tempdir, std::string("grafanaserver"));
fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(globalSettings.logLevelFile));
}
//severity level may be overwritten (per option or config-file) --> set it according to globalSettings
if (globalSettings.logLevelCmd >= 0) {
cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(globalSettings.logLevelCmd));
}
LOG(info) << "Logging setup complete";
//print configuration to give some feedback
//config of plugins is only printed if the config shall be validated or to debug level otherwise
LOG_LEVEL vLogLevel = LOG_LEVEL::debug;
LOG_VAR(vLogLevel) << "----- Configuration -----";
//print global settings in either case
LOG(info) << "Global Settings:";
LOG(info) << " Threads: " << globalSettings.threads;
LOG(info) << " Daemonize: " << (globalSettings.daemonize ? "Enabled" : "Disabled");
LOG(info) << " Write-Dir: " << globalSettings.tempdir;
LOG(info) << "Grafana Settings:";
LOG(info) << " Grafana Server: " << restAPISettings.host << ":" << restAPISettings.port;
LOG(info) << " Certificate: " << restAPISettings.certificate;
LOG(info) << " Private key file: " << restAPISettings.privateKey;
LOG(info) << "Cassandra Settings:";
LOG(info) << " Cassandra Server: " << cassandraSettings.host << ":" << cassandraSettings.port;
LOG(info) << "Hierarchy Settings:";
LOG(info) << " Regex: " << (hierarchySettings.regex != "" ? hierarchySettings.regex : "none");
LOG(info) << " Separator: " << (hierarchySettings.separator != "" ? hierarchySettings.separator : "none");
LOG_VAR(vLogLevel) << "----- End Configuration -----";
//Setting up the connection with the Cassandra DB
LOG(info) << "Connecting to the Cassandra database...";
_cassandraConnection = new DCDB::Connection();
_cassandraConnection->setHostname(cassandraSettings.host);
_cassandraConnection->setPort(atoi(cassandraSettings.port.c_str()));
if (!_cassandraConnection->connect()) {
LOG(fatal) << "Failed to connect to the Cassandra database!";
return 1;
}
_httpsServer = new RestAPI(restAPISettings, hierarchySettings, _cassandraConnection);
_configuration->readRestAPIUsers(_httpsServer);
if (globalSettings.daemonize) {
//boost.log does not support forking officially.
//however, just don't touch the sinks after daemonizing and it should work nonetheless
LOG(info) << "Detaching...";
cmdSink->flush();
boost::log::core::get()->remove_sink(cmdSink);
cmdSink.reset();
//daemonize
dcdbdaemon();
LOG(info) << "Now detached";
}
LOG(info) << "Creating threads...";
//dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
keepAliveWork = boost::make_shared<boost::asio::io_service::work>(io);
//Create pool of threads which handle the sensors
for(size_t i = 0; i < globalSettings.threads; i++) {
threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
}
LOG(info) << "Threads created!";
LOG(info) << "Starting RestAPI Https Server...";
_httpsServer->start();
LOG(info) << "Registering signal handlers...";
signal(SIGINT, sigHandler); //Handle Strg+C
signal(SIGTERM, sigHandler); //Handle termination
LOG(info) << "Signal handlers registered!";
LOG(info) << "Cleaning up...";
delete _configuration;
LOG(info) << "Setup complete!";
LOG(trace) << "Running...";
//Run until Strg+C
threads.join_all();
//will only continue if interrupted by SIGINT and threads were stopped
LOG(info) << "Tearing down objects...";
delete _cassandraConnection;
delete _httpsServer;
_configuration = new Configuration(argv[argc-1], "grafana.conf");
//Read global variables from config file
_configuration->readConfig();
//read global settings
Configuration& globalSettings = *_configuration;
cassandraSettings_t& cassandraSettings = _configuration->cassandraSettings;
serverSettings_t& restAPISettings = _configuration->restAPISettings;
hierarchySettings_t& hierarchySettings = _configuration->hierarchySettings;
//reset getopt()
optind = 1;
//read in options (overwrite dcdbpusher.conf settings if necessary)
while ((c = getopt(argc, argv, opts)) != -1) {
switch (c)
{
case 'c':
cassandraSettings.host = parseNetworkHost(optarg);
cassandraSettings.port = parseNetworkPort(optarg) == "" ? DEFAULT_CASSANDRAPORT : parseNetworkPort(optarg);
break;
case 'u':
cassandraSettings.username = optarg;
break;
case 'p': {
cassandraSettings.password = optarg;
// What does this do? Mask the password?
size_t pwdLen = strlen(optarg);
memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
if (pwdLen > 3) {
memset(optarg+3, 0, pwdLen-3);
}
break;
}
case 't':
globalSettings.threads = stoul(optarg);
case 'v':
globalSettings.logLevelCmd = stoi(optarg);
break;
case 'd':
globalSettings.daemonize = 1;
break;
case 'w':
globalSettings.tempdir = optarg;
if (globalSettings.tempdir[globalSettings.tempdir.length()-1] != '/') {
globalSettings.tempdir.append("/");
}
break;
case 'h':
printSyntax();
return 1;
default:
if (c != '?') cerr << "Unknown parameter: " << c << endl;
return 1;
}
}
//we now should know where the writable tempdir is
//set up logger to file
if (globalSettings.logLevelFile >= 0) {
auto fileSink = setupFileLogger(globalSettings.tempdir, std::string("grafanaserver"));
fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(globalSettings.logLevelFile));
}
//severity level may be overwritten (per option or config-file) --> set it according to globalSettings
if (globalSettings.logLevelCmd >= 0) {
cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(globalSettings.logLevelCmd));
}
LOG(info) << "Logging setup complete";
//print configuration to give some feedback
//config of plugins is only printed if the config shall be validated or to debug level otherwise
LOG_LEVEL vLogLevel = LOG_LEVEL::debug;
LOG_VAR(vLogLevel) << "----- Configuration -----";
//print global settings in either case
LOG(info) << "Global Settings:";
LOG(info) << " Threads: " << globalSettings.threads;
LOG(info) << " Daemonize: " << (globalSettings.daemonize ? "Enabled" : "Disabled");
LOG(info) << " Write-Dir: " << globalSettings.tempdir;
LOG(info) << "Grafana Settings:";
LOG(info) << " Grafana Server: " << restAPISettings.host << ":" << restAPISettings.port;
LOG(info) << " Certificate: " << restAPISettings.certificate;
LOG(info) << " Private key file: " << restAPISettings.privateKey;
LOG(info) << "Cassandra Settings:";
LOG(info) << " Cassandra Server: " << cassandraSettings.host << ":" << cassandraSettings.port;
LOG(info) << "Hierarchy Settings:";
LOG(info) << " Regex: " << (hierarchySettings.regex != "" ? hierarchySettings.regex : "none");
LOG(info) << " Separator: " << (hierarchySettings.separator != "" ? hierarchySettings.separator : "none");
LOG_VAR(vLogLevel) << "----- End Configuration -----";
//Setting up the connection with the Cassandra DB
LOG(info) << "Connecting to the Cassandra database...";
_cassandraConnection = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()),
cassandraSettings.username, cassandraSettings.password);
_cassandraConnection->setNumThreadsIo(cassandraSettings.numThreadsIo);
_cassandraConnection->setQueueSizeIo(cassandraSettings.queueSizeIo);
uint32_t params[1] = {cassandraSettings.coreConnPerHost};
_cassandraConnection->setBackendParams(params);
if (!_cassandraConnection->connect()) {
throw std::runtime_error("Failed to connect to the Cassandra database!");
}
_httpsServer = new RestAPI(restAPISettings, hierarchySettings, _cassandraConnection);
_configuration->readRestAPIUsers(_httpsServer);
LOG(info) << "Retrieving published sensor names and topics...";
if(!_httpsServer->initTree()) {
throw std::runtime_error("Unable to initialize the sensor navigator!");
}
if (globalSettings.daemonize) {
//boost.log does not support forking officially.
//however, just don't touch the sinks after daemonizing and it should work nonetheless
LOG(info) << "Detaching...";
cmdSink->flush();
boost::log::core::get()->remove_sink(cmdSink);
cmdSink.reset();
//daemonize
dcdbdaemon();
LOG(info) << "Now detached";
}
LOG(info) << "Creating threads...";
//dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
keepAliveWork = boost::make_shared<boost::asio::io_service::work>(io);
//Create pool of threads which handle the sensors
for(size_t i = 0; i < globalSettings.threads; i++) {
threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
}
LOG(info) << "Threads created!";
LOG(info) << "Starting RestAPI Https Server...";
_httpsServer->start();
LOG(info) << "Registering signal handlers...";
signal(SIGINT, sigHandler); //Handle Strg+C
signal(SIGTERM, sigHandler); //Handle termination
LOG(info) << "Signal handlers registered!";
LOG(info) << "Cleaning up...";
delete _configuration;
LOG(info) << "Setup complete!";
LOG(trace) << "Running...";
//Run until Strg+C
threads.join_all();
//will only continue if interrupted by SIGINT and threads were stopped
LOG(info) << "Tearing down objects...";
delete _cassandraConnection;
delete _httpsServer;
}
catch (const std::runtime_error& e) {
LOG(fatal) << e.what();
return EXIT_FAILURE;
LOG(fatal) << e.what();
return EXIT_FAILURE;
}
catch (const exception& e) {
LOG(fatal) << "Exception: " << e.what();
abrt(EXIT_FAILURE, INTERR);
LOG(fatal) << "Exception: " << e.what();
abrt(EXIT_FAILURE, INTERR);
}
LOG(info) << "Exiting...Goodbye!";
......
......@@ -35,7 +35,7 @@ or run
```bash
./grafanaserver -h
```
to print the help-section of dcdbpusher.
to print the help-section of grafanaserver.
Grafanaserver will check the given file-path for the global configuration file which has to be named `grafana.conf`.
......@@ -52,11 +52,18 @@ The configuration specifies various settings for grafanaserver in general. Pleas
| tempdir | One can specify a writeable directory where grafanaserver can write its temporary and logging files to. Default is the current (' ./ ' ) directory.
| | |
| cassandra | Wrapper for the Cassandra DB settings.
| address | Define address and port of the Cassandra DB server which stores the timeserie data. Default is 127.0.0.1:9042.
| address | Define address and port of the Cassandra DB server which stores the time series data. Default is 127.0.0.1:9042.
| user | Username for Cassandra authentication. Default is empty.
| password | Password for Cassandra authentication. Default is empty.
| numThreadsIo | Number of I/O threads in the Cassandra driver. Default is 1.
| queueSizeIo | Size of the request queue for each I/O thread. Default is 4096.
| coreConnPerHost | Maximum number of connection per Cassandra database. Default is 1.
| debugLog | Outputs verbose error messages if enabled. Default is false.
| | |
| hierarchy | Wrapper for the definition of he hierarchical structure of the sensor tree.
| regex | Regular expression representing the full hierarchy of sensor names, where each level is separated by a provided "separator" character. If left empty, '/' are considered default separators of the different levels composing the name (e.g., /system/rack/chassis/node/sensor). If the published sensor names are different from their corresponding MQTT topics, the provided regex needs to reflect it. For example, the regex "mySystem.,rack\\d{2}.,chassis\\d{2}.,server\\d{2}" could represent a custom sensor name such as "mySystem.rack01.chassis03.server10" (see the documentation of the Sensor Navigator for more details). However, it is highly encouraged to leave the default configuration settings, whereas published sensor names exactly match their correspondant MQTT topics.
| separator | Specifies a custom separator for the different levels composing the sensor name. Default is emtpy.
| filter | Regular expression to filter the set of nodes used to build the sensor navigator. Default is none.
| | |
| restAPI | Bundles all values related to the RestAPI. See the corresponding [section](#restApi) for more information on supported functionality.
| address | Define (IP-)address and port where the REST API server should run on. Default is 127.0.0.1:8081.
......
......@@ -40,41 +40,54 @@ RestAPI::RestAPI(serverSettings_t settings,
hierarchySettings_t hierarchySettings,
DCDB::Connection* cassandraConnection) :
RESTHttpsServer(settings),
_connection(cassandraConnection) {
_connection(cassandraConnection),
_hierarchySettings(hierarchySettings) {
//Configuring endpoints
addEndpoint("/", {http::verb::get, stdBind(GET_datasource)});
addEndpoint("/levels", {http::verb::post, stdBind(POST_levels)});
addEndpoint("/search", {http::verb::post, stdBind(POST_search)});
addEndpoint("/query", {http::verb::post, stdBind(POST_query)});
}
//Initializes the internal sensor navigator
bool RestAPI::initTree() {
if(_sensorConfig) {
delete _sensorConfig;
_sensorConfig = NULL;
}
if(_navigator) {
delete _navigator;
_navigator = NULL;
}
LOG(info) << "Retrieving published sensor names and topics...";
//Get the list of all public sensors and topics.
std::vector<std::string> sensors;
std::vector<std::string> topics;
std::list<DCDB::PublicSensor> publicSensors;
_sensorConfig = new DCDB::SensorConfig(_connection);
_sensorConfig->getPublicSensorsVerbose(publicSensors);
if(_sensorConfig->getPublicSensorsVerbose(publicSensors)!=DCDB::SC_OK) {
LOG(error) << "Unable to fetch list of public sensors!";
return false;
}
for(auto& s : publicSensors) {
sensors.push_back(s.name);
topics.push_back(s.pattern);
}
//Build the tree navigator
LOG(info) << "Building the sensor navigator...";
_separator = _hierarchySettings.separator;
_navigator = new SensorNavigator();
_navigator->buildTree(hierarchySettings.regex, &sensors, &topics);
_separator = hierarchySettings.separator;
_navigator->setFilter(_hierarchySettings.filter);
_navigator->buildTree(_hierarchySettings.regex, &topics);
return true;
}
//Dummy GET request to create a datasource. All necessary checks that could be peformed here are
//already done by the RESTAPIServer (e.g., user credentials, connectivity to the DB,...).
void::RestAPI::GET_datasource(endpointArgs) {
void RestAPI::GET_datasource(endpointArgs) {
res.body() = "Data Source Added";
res.result(http::status::ok);
......@@ -83,7 +96,7 @@ void::RestAPI::GET_datasource(endpointArgs) {
//Returns the depth of the sensor tree in the navigator. Used to precompute the max number of levels
//at the frontend for the UI.
void::RestAPI::POST_levels(endpointArgs) {
void RestAPI::POST_levels(endpointArgs) {
res.body() = "[" + std::to_string(_navigator->getTreeDepth() + 1) + "]";;
res.result(http::status::ok);
......@@ -92,7 +105,7 @@ void::RestAPI::POST_levels(endpointArgs) {
//Returns to Grafana a list of possible targets to be dispayed in the query drop-down menus of a panel
//(i.e., hierarchical levels of the system and sensors).
void::RestAPI::POST_search(endpointArgs) {
void RestAPI::POST_search(endpointArgs) {
res.body() = "[";
std::set<std::string> *treeOutput;
......@@ -165,7 +178,7 @@ void::RestAPI::POST_search(endpointArgs) {
}
//Resolves a query given a time range and a list of sensors and returns it to Grafana.
void::RestAPI::POST_query(endpointArgs) {
void RestAPI::POST_query(endpointArgs) {
std::string startTime, endTime;
std::stringstream bodyss;
......
......@@ -51,17 +51,25 @@
* @ingroup grafana
*/
class RestAPI : public RESTHttpsServer {
public:
RestAPI(serverSettings_t settings,
hierarchySettings_t hierarchySettings,
DCDB::Connection* cassandraConnection);
virtual ~RestAPI() {
delete _navigator;
delete _sensorConfig;
if(_navigator) {
delete _navigator;
_navigator = NULL;
}
if(_sensorConfig) {
delete _sensorConfig;
_sensorConfig = NULL;
}
}
bool initTree();
private:
/******************************************************************************/
......@@ -129,6 +137,7 @@ private:
DCDB::Connection* _connection;
DCDB::SensorConfig* _sensorConfig;
std::string _separator;
hierarchySettings_t _hierarchySettings;
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment