Commit caa6c501 authored by Axel Auweter's avatar Axel Auweter
Browse files

CollectAgent now writes to database again.

parent a2bb421d
......@@ -40,11 +40,81 @@ int keepRunning;
uint64_t msgCtr;
uint64_t pmsgCtr;
#pragma pack(push,1)
typedef union {
uint64_t raw;
struct {
uint8_t knc_id;
uint8_t bnc_id;
uint8_t bic_id;
uint8_t chassis_id;
uint8_t rack_id_lsb;
uint8_t rack_id_msb;
uint8_t cluster_id;
uint8_t datacenter_id;
};
} DeviceLocation;
typedef struct {
uint64_t sensor_id : 16;
uint64_t device_id : 48;
} DeviceSensorId;
typedef union {
uint64_t raw[2];
struct {
DeviceLocation dl;
DeviceSensorId dsid;
};
} SensorId;
#pragma pack(pop)
void sigHandler(int sig)
{
keepRunning = 0;
}
bool topicToSid(SensorId* sid, string topic)
{
uint64_t pos = 0;
const char* buf = topic.c_str();
sid->raw[0] = 0;
sid->raw[1] = 0;
while (*buf) {
if (*buf >= '0' && *buf <= '9') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - '0')) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'A' && *buf <= 'F') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - 'A' + 0xa)) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'a' && *buf <= 'f') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - 'a' + 0xa)) << (60-(pos%64)));
pos += 4;
}
buf++;
}
return pos == 128;
}
string sidConvert(SensorId *sid)
{
uint64_t ll[2];
ll[0] = __builtin_bswap64(sid->raw[0]);
ll[1]= __builtin_bswap64(sid->raw[1]);
return string((char*)ll, 16);
}
string int64Convert(uint64_t n)
{
n = __builtin_bswap64(n);
return string((char*)&n, 8);
}
void mqttCallback(SimpleMQTTMessage *msg)
{
/*
......@@ -58,21 +128,53 @@ void mqttCallback(SimpleMQTTMessage *msg)
* Decode the message and put into the database.
*/
try {
ColumnParent cparent;
Column c;
string key, name, value;
#if 0
msg->dump();
#endif
if (msg->isPublish()) {
ColumnParent cparent;
Column c;
string key, name, value;
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = now - epoch;
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = now - epoch;
cparent.column_family = "sensordata";
uint64_t ts = diff.total_nanoseconds();
uint32_t sid = msgCtr;
cparent.column_family = "sensordata";
uint64_t ts = diff.total_nanoseconds();
msg->dump();
SensorId sid;
if (topicToSid(&sid,msg->getTopic())) {
#if 0
cout << "Topic decode successful:"
<< "\nRaw: " << sid.raw[0] << " " << sid.raw[1]
<< "\ndatacenter_id: " << hex << (uint32_t)sid.dl.datacenter_id
<< "\ncluster_id: " << hex << (uint32_t)sid.dl.cluster_id
<< "\nrack_id_msb: " << hex << (uint32_t)sid.dl.rack_id_msb
<< "\nrack_id_lsb: " << hex << (uint32_t)sid.dl.rack_id_lsb
<< "\nchassis_id: " << hex << (uint32_t)sid.dl.chassis_id
<< "\nbic_id: " << hex << (uint32_t)sid.dl.bic_id
<< "\nbnc_id: " << hex << (uint32_t)sid.dl.bnc_id
<< "\nknc_id: " << hex << (uint32_t)sid.dl.knc_id
<< "\ndevice_id: " << hex << sid.dsid.device_id
<< "\nsensor_id: " << hex << sid.dsid.sensor_id
<< "\n";
#endif
key = sidConvert(&sid);
name = int64Convert(ts);
value = int64Convert(*((uint64_t*)msg->getPayload()));
c.name = name;
c.value = value;
c.__isset.value = true;
c.timestamp = ts/1000;
c.__isset.timestamp = true;
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
}
if (msg->isPublish()) {
#if 0
key = convert(&sid, sizeof(msgCtr));
name = convert(&ts, sizeof(ts));
......@@ -169,7 +271,7 @@ int main(void) {
if (sensordataCf<0) {
cout << "Creating sensordata column familiy...\n";
query = "CREATE TABLE sensordata ( sid bigint, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
query = "CREATE TABLE sensordata ( sid blob, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
cout << "Sending CQL statement: " << query;
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment