Commit 7e9e0427 authored by Michael Ott's avatar Michael Ott
Browse files

Improve dcdbcsvimport to allow for more input file formats

parent e19fd249
......@@ -61,9 +61,40 @@ void usage(int argc, char* argv[])
std::cout << "Usage: " << argv[0] << " [-h <host>] [-t <col>] [-c <col[,col,col]>] <CSV File> <SensorPrefix>" << std::endl << std::endl;
std::cout << " -h <host> - Database hostname" << std::endl;
std::cout << " -t <col> - Column in the CSV that contains the timestamp [default: 0]" << std::endl;
std::cout << " -n <col> - Sensor name column" << std::endl;
std::cout << " -c <col[,col,col]> - Column in the CSV to use [default: all]" << std::endl;
std::cout << " -s <offset> - MQTT suffix start value [default: 0]" << std::endl;
std::cout << " -p - Publish sensors" << std::endl;
std::cout << " CSV File - CSV file with sensor readings. First row has to contain sensor names" << std::endl;
std::cout << " SensorPrefix - Prefix to use for sensor names" << std::endl;
std::cout << " MQTTPrefix - MQTT prefix to use for sensors" << std::endl;
}
sensor_t createSensor(const std::string& name, const std::string& prefix, const int suffix) {
sensor_t sensor;
sensor.name = name;
int prefixLen = prefix.size() - std::count(prefix.begin(), prefix.end(), '/');
int suffixLen = 28-prefixLen;
std::stringstream ss;
ss << std::setfill ('0') << std::setw(suffixLen) << std::hex << suffix;
std::string suffixStr = ss.str();
if (suffixLen < suffixStr.size()) {
std::cerr << "The specified MQTT prefix (" << prefixLen << ") length is too long, it should not exceed " << 28 - suffixStr.size() << " hex characters" << std::endl;
exit(EXIT_FAILURE);
}
if (suffixStr.size() < suffixLen) {
suffixStr.insert(0, std::string(suffixLen-suffixStr.size(), '0'));
}
sensor.topic = prefix + suffixStr;
sensor.publicName = name;
std::replace(sensor.publicName.begin(), sensor.publicName.end(), ' ', '_');
return sensor;
}
int main(int argc, char** argv)
......@@ -81,32 +112,60 @@ int main(int argc, char** argv)
host = "localhost";
}
int offset = 0;
int suffixStart = 0;
int tsColumn = 0;
std::set<int> columns;
while ((ret=getopt(argc, argv, "+h:c:o:t:"))!=-1) {
switch(ret) {
case 'h':
host = optarg;
break;
case 'c': {
std::string s(optarg);
boost::tokenizer<boost::escaped_list_separator<char> > tk(s, boost::escaped_list_separator<char>('\\', ',', '\"'));
for (boost::tokenizer<boost::escaped_list_separator<char> >::iterator i=tk.begin(); i!=tk.end();++i)
columns.insert(std::stoi(*i));
} break;
case 'o':
offset = atoi(optarg);
std::cout << ret << ": " << optarg << std::endl;
break;
case 't':
tsColumn = atoi(optarg);
std::cout << ret << ": " << optarg << std::endl;
break;
default:
usage(argc, argv);
exit(EXIT_FAILURE);
}
bool publish = false;
int verbose = 0;
int sensorNameColumn = -1;
while ((ret=getopt(argc, argv, "+h:t:n:c:s:pv"))!=-1) {
switch(ret) {
case 'h':
host = optarg;
break;
case 't':
tsColumn = atoi(optarg);
break;
case 'n':
sensorNameColumn = atoi(optarg);
break;
case 'c': {
std::string s(optarg);
boost::tokenizer<boost::escaped_list_separator<char> > tk(s, boost::escaped_list_separator<char>('\\', ',', '\"'));
for (boost::tokenizer<boost::escaped_list_separator<char> >::iterator i=tk.begin(); i!=tk.end();++i)
columns.insert(std::stoi(*i));
} break;
case 's':
suffixStart = atoi(optarg);
std::cout << ret << ": " << optarg << std::endl;
break;
case 'p':
publish = true;
break;
case 'v':
if (optarg) {
verbose = atoi(optarg);
} else {
verbose = 1;
}
break;
default:
usage(argc, argv);
std::cout << "Unknown parameter: " << (char) ret << std::endl;
exit(EXIT_FAILURE);
}
}
std::string csvFilename = argv[optind];
std::string prefix = argv[optind+1];
int prefixLen = prefix.size()-std::count(prefix.begin(), prefix.end(), '/');
if (prefixLen > 27) {
std::cerr << "The specified MQTT prefix is too long (" << prefixLen << "), it must not exceed 27 hex characters." << std::endl;
}
columns.erase(tsColumn);
if (sensorNameColumn > -1) {
columns.erase(sensorNameColumn);
}
/* Connect to the data store */
......@@ -124,20 +183,16 @@ int main(int argc, char** argv)
/* Initialize the SensorConfig and SensorDataStore interfaces */
DCDB::SensorConfig sensorConfig(connection);
DCDB::SensorDataStore sensorDataStore(connection);
sensorDataStore.setDebugLog(true);
std::ifstream fs;
std::string s;
std::vector<std::string> vec;
std::map<int,sensor_t> sensors;
std::string csvFilename = argv[optind];
std::string prefix = argv[optind+1];
int suffixLen = 32-(prefix.size()-std::count(prefix.begin(), prefix.end(), '/'));
uint64_t lineno = 0;
/* Read header line from CSV to obtain sensor names and topics */
std::cout << std::endl;
std::cout << "Parsing CSV file: " << csvFilename << std::endl;
std::cout << "Using sensor name prefix: " << prefix << std::endl;
std::cout << "Using MQTT prefix: " << prefix << std::endl;
std::cout << "Columns:";
if (columns.size()) {
std::set<int>::iterator it;
......@@ -149,28 +204,30 @@ int main(int argc, char** argv)
std::cout << "all" << std::endl;
}
std::cout << "Timestamp Column: " << tsColumn << std::endl;
if (sensorNameColumn > -1) {
std::cout << "Sensorname Column: " << sensorNameColumn << std::endl;
}
fs.open (csvFilename, std::fstream::in);
std::getline(fs, s);
lineno++;
boost::tokenizer<boost::escaped_list_separator<char> > tk(s, boost::escaped_list_separator<char>('\\', ',', '\"'));
int topics = offset;
int col = 0;
for (boost::tokenizer<boost::escaped_list_separator<char> >::iterator i=tk.begin(); i!=tk.end();++i)
{
if (col != tsColumn) {
sensor_t sensor;
sensor.name = *i;
std::stringstream ss;
ss << std::setfill('0') << std::setw(suffixLen) << std::hex << topics;;
sensor.topic = prefix + ss.str();
sensor.publicName = prefix + "." + sensor.name;
std::replace(sensor.publicName.begin(), sensor.publicName.end(), ' ', '_');
sensors.insert(std::pair<int,sensor_t>(col, sensor));
topics++;
}
col++;
int topics = suffixStart;
std::map<int,sensor_t> sensorsByCol;
std::map<std::string,sensor_t> sensorsByName;
sensor_t sensor;
if (sensorNameColumn == -1) {
std::getline(fs, s);
lineno++;
boost::tokenizer<boost::escaped_list_separator<char> > tk(s, boost::escaped_list_separator<char>('\\', ',', '\"'));
for (boost::tokenizer<boost::escaped_list_separator<char> >::iterator i=tk.begin(); i!=tk.end();++i)
{
if (col != tsColumn) {
sensor = createSensor(*i, prefix, topics);
sensorsByCol.insert(std::pair<int,sensor_t>(col, sensor));
topics++;
}
col++;
}
}
/* Read actual sensor readings */
......@@ -178,42 +235,70 @@ int main(int argc, char** argv)
lineno++;
col = 0;
DCDB::TimeStamp ts(UINT64_C(0));
boost::tokenizer<boost::escaped_list_separator<char> > tk(s, boost::escaped_list_separator<char>('\\', ',', '\"'));
DCDB::TimeStamp ts(UINT64_C(0));
boost::tokenizer<boost::escaped_list_separator<char> > tk(s, boost::escaped_list_separator<char>('\\', ',', '\"'));
for (boost::tokenizer<boost::escaped_list_separator<char> >::iterator i=tk.begin(); i!=tk.end();++i) {
if (col == tsColumn) {
ts = DCDB::TimeStamp(*i);
}
} else if (col == sensorNameColumn) {
std::map<std::string,sensor_t>::iterator it = sensorsByName.find(*i);
if (it != sensorsByName.end()) {
sensor = it->second ;
} else {
sensor = createSensor(*i, prefix, topics);
sensorsByName.insert(std::pair<std::string,sensor_t>(sensor.name, sensor));
topics++;
}
}
col++;
}
col = 0;
for (boost::tokenizer<boost::escaped_list_separator<char> >::iterator i=tk.begin(); i!=tk.end();++i)
for (boost::tokenizer<boost::escaped_list_separator<char> >::iterator i=tk.begin(); i!=tk.end();++i)
{
if (((columns.size() == 0) || (columns.find(col) != columns.end())) && col != tsColumn) {
std::cout << ts.getRaw() << " " << col << " " << sensors[col].topic << " " << *i << std::endl;
try {
DCDB::SensorId sid(sensors[col].topic);
sensorDataStore.insert(&sid, ts.getRaw(), std::stoll(*i));
}
catch (std::exception &e) {
std::cerr << "Error parsing CSV line " << lineno << " column " << col+1 << ": \"" << *i << "\"" << std::endl;
}
}
col++;
}
if (((columns.size() == 0) || (columns.find(col) != columns.end()))) {
if (sensorNameColumn == -1) {
sensor = sensorsByCol[col];
}
if (verbose >= 2) {
std::cout << ts.getRaw() << " " << col << " " << sensor.name << " " << sensor.topic << " " << *i << std::endl;
}
try {
DCDB::SensorId sid(sensor.topic);
sensorDataStore.insert(&sid, ts.getRaw(), std::stoll(*i));
}
catch (std::exception &e) {
if (verbose >= 1) {
std::cerr << "Error parsing CSV line " << lineno << " column " << col+1 << ": \"" << s << "\"" << std::endl;
}
}
}
col++;
}
}
fs.close();
/* Create public sensor names
std::cout << std::endl;
std::cout << "Publishing sensors..." << std::endl;
for (it = sensors.begin(); it != sensors.end(); it++) {
std::cout << it->name << " " << it->topic << " " << it->publicName << std::endl;
sensorConfig.publishSensor(it->publicName.c_str(), it->topic.c_str());
} */
/* Create public sensor names */
if (publish) {
std::cout << std::endl;
std::cout << "Publishing sensors..." << std::endl;
if (sensorNameColumn > -1) {
std::map<std::string,sensor_t>::iterator it;
for (it = sensorsByName.begin(); it != sensorsByName.end(); it++) {
sensor_t sensor = it->second;
std::cout << sensor.name << " " << sensor.topic << " " << sensor.publicName << std::endl;
sensorConfig.publishSensor(sensor.publicName.c_str(), sensor.topic.c_str());
}
} else {
std::map<int,sensor_t>::iterator it;
for (it = sensorsByCol.begin(); it != sensorsByCol.end(); it++) {
std::cout << sensor.name << " " << sensor.topic << " " << sensor.publicName << std::endl;
sensorConfig.publishSensor(sensor.publicName.c_str(), sensor.topic.c_str());
}
}
}
/* Disconnect */
delete connection;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment