Commit 336107da authored by Michael Ott's avatar Michael Ott
Browse files

Lookup sensors for topics in DB, add statistics on insert rates, tweak DB parameters

parent 7e9e0427
......@@ -50,10 +50,13 @@
#include <dcdb/c_api.h>
#include "dcdb/version.h"
int verbose = 0;
typedef struct {
std::string name;
std::string topic;
std::string publicName;
std::string name;
std::string topic;
std::string publicName;
uint64_t count;
} sensor_t;
void usage(int argc, char* argv[])
......@@ -70,30 +73,42 @@ void usage(int argc, char* argv[])
std::cout << " MQTTPrefix - MQTT prefix to use for sensors" << std::endl;
}
sensor_t createSensor(const std::string& name, const std::string& prefix, const int suffix) {
sensor_t sensor;
sensor.name = name;
int prefixLen = prefix.size() - std::count(prefix.begin(), prefix.end(), '/');
int suffixLen = 28-prefixLen;
std::stringstream ss;
ss << std::setfill ('0') << std::setw(suffixLen) << std::hex << suffix;
std::string suffixStr = ss.str();
if (suffixLen < suffixStr.size()) {
std::cerr << "The specified MQTT prefix (" << prefixLen << ") length is too long, it should not exceed " << 28 - suffixStr.size() << " hex characters" << std::endl;
exit(EXIT_FAILURE);
}
sensor_t* createSensor(DCDB::SensorConfig& sensorConfig, const std::string& name, const std::string& prefix, const int suffix) {
sensor_t* sensor = new sensor_t;
sensor->name = name;
sensor->count = 0;
if (suffixStr.size() < suffixLen) {
suffixStr.insert(0, std::string(suffixLen-suffixStr.size(), '0'));
DCDB::PublicSensor psensor;
if (sensorConfig.getPublicSensorByName(psensor, name.c_str()) == DCDB::SC_OK) {
if (verbose) {
std::cout << "Found " << name << " in database: " << psensor.pattern << std::endl;
}
sensor->topic = psensor.pattern;
sensor->publicName = psensor.name;
} else {
int prefixLen = prefix.size() - std::count(prefix.begin(), prefix.end(), '/');
int suffixLen = 28-prefixLen;
std::stringstream ss;
ss << std::setfill ('0') << std::setw(suffixLen) << std::hex << suffix;
std::string suffixStr = ss.str();
if (suffixLen < suffixStr.size()) {
std::cerr << "The specified MQTT prefix (" << prefixLen << ") length is too long, it should not exceed " << 28 - suffixStr.size() << " hex characters" << std::endl;
exit(EXIT_FAILURE);
}
if (suffixStr.size() < suffixLen) {
suffixStr.insert(0, std::string(suffixLen-suffixStr.size(), '0'));
}
sensor->topic = prefix + suffixStr;
sensor->publicName = name;
std::replace(sensor->publicName.begin(), sensor->publicName.end(), ' ', '_');
if (verbose) {
std::cout << "Created new sensor " << sensor->publicName << " in database: " << sensor->topic << std::endl;
}
}
sensor.topic = prefix + suffixStr;
sensor.publicName = name;
std::replace(sensor.publicName.begin(), sensor.publicName.end(), ' ', '_');
return sensor;
}
......@@ -116,9 +131,8 @@ int main(int argc, char** argv)
int tsColumn = 0;
std::set<int> columns;
bool publish = false;
int verbose = 0;
int sensorNameColumn = -1;
while ((ret=getopt(argc, argv, "+h:t:n:c:s:pv"))!=-1) {
while ((ret=getopt(argc, argv, "+h:t:n:c:s:pv:"))!=-1) {
switch(ret) {
case 'h':
host = optarg;
......@@ -175,6 +189,9 @@ int main(int argc, char** argv)
DCDB::Connection* connection;
connection = new DCDB::Connection();
connection->setHostname(host);
connection->setNumThreadsIo(4);
connection->setQueueSizeIo(256*1024);
if (!connection->connect()) {
std::cout << "Cannot connect to database." << std::endl;
return 1;
......@@ -182,6 +199,7 @@ int main(int argc, char** argv)
/* Initialize the SensorConfig and SensorDataStore interfaces */
DCDB::SensorConfig sensorConfig(connection);
sensorConfig.loadCache();
DCDB::SensorDataStore sensorDataStore(connection);
sensorDataStore.setDebugLog(true);
......@@ -212,9 +230,9 @@ int main(int argc, char** argv)
int col = 0;
int topics = suffixStart;
std::map<int,sensor_t> sensorsByCol;
std::map<std::string,sensor_t> sensorsByName;
sensor_t sensor;
std::map<int,sensor_t*> sensorsByCol;
std::map<std::string,sensor_t*> sensorsByName;
sensor_t* sensor;
if (sensorNameColumn == -1) {
std::getline(fs, s);
lineno++;
......@@ -222,8 +240,8 @@ int main(int argc, char** argv)
for (boost::tokenizer<boost::escaped_list_separator<char> >::iterator i=tk.begin(); i!=tk.end();++i)
{
if (col != tsColumn) {
sensor = createSensor(*i, prefix, topics);
sensorsByCol.insert(std::pair<int,sensor_t>(col, sensor));
sensor = createSensor(sensorConfig, *i, prefix, topics);
sensorsByCol.insert(std::pair<int,sensor_t*>(col, sensor));
topics++;
}
col++;
......@@ -231,6 +249,9 @@ int main(int argc, char** argv)
}
/* Read actual sensor readings */
uint64_t count = 0;
uint64_t total = 0;
time_t t0 = time(NULL);
while (std::getline(fs, s)) {
lineno++;
......@@ -238,19 +259,30 @@ int main(int argc, char** argv)
DCDB::TimeStamp ts(UINT64_C(0));
boost::tokenizer<boost::escaped_list_separator<char> > tk(s, boost::escaped_list_separator<char>('\\', ',', '\"'));
for (boost::tokenizer<boost::escaped_list_separator<char> >::iterator i=tk.begin(); i!=tk.end();++i) {
if (col == tsColumn) {
ts = DCDB::TimeStamp(*i);
} else if (col == sensorNameColumn) {
std::map<std::string,sensor_t>::iterator it = sensorsByName.find(*i);
if (it != sensorsByName.end()) {
sensor = it->second ;
} else {
sensor = createSensor(*i, prefix, topics);
sensorsByName.insert(std::pair<std::string,sensor_t>(sensor.name, sensor));
topics++;
if (col == tsColumn) {
try {
ts = DCDB::TimeStamp(*i);
}
catch (std::exception &e) {
if (verbose > 1) {
std::cerr << "Error parsing timestamp " << *i << std::endl;
}
}
col++;
} else if (col == sensorNameColumn) {
std::map<std::string,sensor_t*>::iterator it = sensorsByName.find(*i);
if (it != sensorsByName.end()) {
sensor = it->second ;
} else {
sensor = createSensor(sensorConfig, *i, prefix, topics);
sensorsByName.insert(std::pair<std::string,sensor_t*>(sensor->name, sensor));
topics++;
}
}
col++;
}
if (ts.getRaw() == 0) {
continue;
}
col = 0;
......@@ -261,17 +293,28 @@ int main(int argc, char** argv)
sensor = sensorsByCol[col];
}
if (verbose >= 2) {
std::cout << ts.getRaw() << " " << col << " " << sensor.name << " " << sensor.topic << " " << *i << std::endl;
std::cout << ts.getRaw() << " " << col << " " << sensor->name << " " << sensor->topic << " " << *i << std::endl;
}
try {
DCDB::SensorId sid(sensor.topic);
DCDB::SensorId sid(sensor->topic);
sensorDataStore.insert(&sid, ts.getRaw(), std::stoll(*i));
sensor->count++;
}
catch (std::exception &e) {
if (verbose >= 1) {
if (verbose > 1) {
std::cerr << "Error parsing CSV line " << lineno << " column " << col+1 << ": \"" << s << "\"" << std::endl;
}
}
total++;
count++;
if (total % 1000 == 0) {
time_t t1 = time(NULL);
if (t1 != t0) {
std::cout << total << " " << count/(t1-t0) << " inserts/s" << std::endl;
t0 = t1;
count = 0;
}
}
}
col++;
}
......@@ -285,17 +328,18 @@ int main(int argc, char** argv)
std::cout << "Publishing sensors..." << std::endl;
if (sensorNameColumn > -1) {
std::map<std::string,sensor_t>::iterator it;
std::map<std::string,sensor_t*>::iterator it;
for (it = sensorsByName.begin(); it != sensorsByName.end(); it++) {
sensor_t sensor = it->second;
std::cout << sensor.name << " " << sensor.topic << " " << sensor.publicName << std::endl;
sensorConfig.publishSensor(sensor.publicName.c_str(), sensor.topic.c_str());
sensor_t* sensor = it->second;
std::cout << sensor->name << " " << sensor->topic << " " << sensor->publicName << " (" << sensor->count << " inserts)" << std::endl;
sensorConfig.publishSensor(sensor->publicName.c_str(), sensor->topic.c_str());
}
} else {
std::map<int,sensor_t>::iterator it;
std::map<int,sensor_t*>::iterator it;
for (it = sensorsByCol.begin(); it != sensorsByCol.end(); it++) {
std::cout << sensor.name << " " << sensor.topic << " " << sensor.publicName << std::endl;
sensorConfig.publishSensor(sensor.publicName.c_str(), sensor.topic.c_str());
sensor_t* sensor = it->second;
std::cout << sensor->name << " " << sensor->topic << " " << sensor->publicName << " (" << sensor->count << " inserts)" << std::endl;
sensorConfig.publishSensor(sensor->publicName.c_str(), sensor->topic.c_str());
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment