Commit 4e348ade authored by Axel Auweter's avatar Axel Auweter
Browse files

Forgot to add some files to previous commit.

parent 40c489d1
include ../config.mk
CXXFLAGS = -O0 -g -Wall -std=c++11 -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/
OBJS = collectagent.o cassandra/Cassandra.o cassandra/cassandra_constants.o cassandra/cassandra_types.o
CXXFLAGS = -O0 -g -Wall -Wno-null-conversion -std=c++11 -stdlib=libc++ -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/
OBJS = collectagent.o simplemqttserver.o simplemqttserverthread.o cassandra/Cassandra.o cassandra/cassandra_constants.o cassandra/cassandra_types.o
SRC = $(patsubst cassandra/%,,$(OBJS:.o=.cpp))
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lthrift -lmosquitto -lssl -lcrypto -lpthread
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lthrift -lmosquitto -lssl -lcrypto -lpthread -lboost_system -lc++
TARGET = collectagent
SUBTARGETS = cassandra/Cassandra.h
......
......@@ -6,7 +6,6 @@
// Description : As of now this is some rudimentary bad code to talk to the DB
//============================================================================
//#include <cstdio>
#include <cstdlib>
#include <sstream>
......@@ -26,6 +25,7 @@
#include <thrift/protocol/TBinaryProtocol.h>
#include "cassandra/Cassandra.h"
#include "simplemqttserver.h"
#include "mosquitto.h"
......@@ -45,19 +45,19 @@ void sigHandler(int sig) {
}
static inline string convert(void* ptr, size_t len) {
switch (len) {
case 4:
*((uint32_t*)ptr) = __builtin_bswap32(*((uint32_t*)ptr));
break;
case 8:
*((uint64_t*)ptr) = __builtin_bswap64(*((uint64_t*)ptr));
break;
default:
// Do nothing and hope for the best...
break;
}
string res((char*)ptr, len);
return res;
switch (len) {
case 4:
// *((uint32_t*)ptr) = __builtin_bswap32(*((uint32_t*)ptr));
break;
case 8:
// *((uint64_t*)ptr) = __builtin_bswap64(*((uint64_t*)ptr));
break;
default:
// Do nothing and hope for the best...
break;
}
string res((char*)ptr, len);
return res;
}
void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) {
......@@ -70,15 +70,15 @@ void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_mes
time_t t = time(NULL);
struct tm *now = localtime(&t);
datestr << "'" << (now->tm_year + 1900) << "-" << (now->tm_mon + 1)
<< "-" << (now->tm_mday) << " " << (now->tm_hour) << ":"
<< (now->tm_min) << ":" << (now->tm_sec) << "'";
<< "-" << (now->tm_mday) << " " << (now->tm_hour) << ":"
<< (now->tm_min) << ":" << (now->tm_sec) << "'";
#else
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = now - epoch;
//datestr << diff.total_nanoseconds();
#endif
//sidstr << msgCtr;
#if 0
......@@ -91,10 +91,10 @@ void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_mes
string key, name, value;
uint64_t ts = diff.total_nanoseconds();
uint32_t sid = msgCtr;
key = convert(&sid, sizeof(msgCtr));
name = convert(&ts, sizeof(ts));
float tmp = 3.141;
value = convert(&tmp, 4);
c.name = name;
......@@ -102,10 +102,10 @@ void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_mes
c.__isset.value = true;
c.timestamp = ts/1000;
c.__isset.timestamp = true;
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
#endif
}
catch(TTransportException te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
......@@ -116,7 +116,7 @@ void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_mes
}
}
#if 0
int main(void) {
boost::shared_ptr<TSocket> sock;
boost::shared_ptr<TTransport> tr;
......@@ -132,7 +132,7 @@ int main(void) {
myClient = new CassandraClient(prot);
tr->open();
start:
start:
myClient->describe_cluster_name(clusterName);
cout << "Cluster name: " << clusterName << "\n";
......@@ -305,3 +305,34 @@ int main(void) {
}
return EXIT_SUCCESS;
}
#else
int main(void) {
MQTTFixedHeader mh;
mh.bits.type = MQTT_CONNECT;
mh.bits.qos = 2;
mh.bits.dup = 1;
mh.bits.retain = 1;
mh.bits.remaining_length[0] = 0xff;
mh.bits.remaining_length[1] = 0xee;
mh.bits.remaining_length[2] = 0xdd;
mh.bits.remaining_length[3] = 0xcc;
printf("mh.raw: 0x%02x 0x%02x 0x%02x 0x%02x 0x%02x\n", mh.raw[0], mh.raw[1], mh.raw[2], mh.raw[3], mh.raw[4]);
try{
SimpleMQTTServer ms;
ms.start();
sleep(60);
ms.stop();
}
catch (std::exception e) {
cout << "Exception: " << e.what() << "\n";
}
return 0;
}
#endif
......@@ -7,14 +7,177 @@
#include "simplemqttserver.h"
using namespace std;
void SimpleMQTTServer::initSockets(void)
{
struct addrinfo hints;
struct addrinfo *ainfo_head, *ainfo_cur;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_family = PF_UNSPEC;
if (getaddrinfo(listenAddress.c_str(), listenPort.c_str(), &hints, &ainfo_head))
throw new runtime_error("Error initializing socket.");
ainfo_cur = ainfo_head;
for (ainfo_cur = ainfo_head; ainfo_cur; ainfo_cur = ainfo_cur->ai_next) {
#ifdef SimpleMQTTVerbose
/*
* Print some details about the current addrinfo.
*/
char buf[INET_ADDRSTRLEN+1], buf6[INET6_ADDRSTRLEN+1];
if (ainfo_cur->ai_family == AF_INET) {
buf[INET_ADDRSTRLEN] = 0;
inet_ntop(AF_INET, &((struct sockaddr_in *)ainfo_cur->ai_addr)->sin_addr, buf, INET_ADDRSTRLEN);
cout << "Initializing IPv4 socket:\n"
<< "\tAddress: " << buf << "\n"
<< "\tPort: " << ntohs(((struct sockaddr_in *)ainfo_cur->ai_addr)->sin_port) << "\n";
}
else if (ainfo_cur->ai_family == AF_INET6) {
buf6[INET6_ADDRSTRLEN] = 0;
inet_ntop(AF_INET6, &((struct sockaddr_in6 *)ainfo_cur->ai_addr)->sin6_addr, buf6, INET6_ADDRSTRLEN);
cout << "Initializing IPv6 socket...\n"
<< "\tAddress: " << buf6 << "\n"
<< "\tPort: " << ntohs(((struct sockaddr_in6 *)ainfo_cur->ai_addr)->sin6_port) << "\n";
}
else {
cout << "Unclear socket type.\n";
}
#endif
/*
* Open the socket.
*/
int sock = socket(ainfo_cur->ai_family, ainfo_cur->ai_socktype, ainfo_cur->ai_protocol);
if (sock == -1) {
#ifdef SimpleMQTTVerbose
cout << "Error: could not create socket.\n";
#endif
continue;
}
/*
* Set socket options.
*/
int sopt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &sopt, sizeof(sopt));
sopt = 1;
setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &sopt, sizeof(sopt));
#if 1
sopt = fcntl(sock, F_GETFL, 0);
if (sopt == -1) {
cout << "Warning: could not get socket options, ignoring socket.\n";
close(sock);
continue;
}
sopt |= O_NONBLOCK;
if (fcntl(sock, F_SETFL, sopt) == -1) {
cout << "Warning: could not set socket options, ignoring socket.\n";
close(sock);
continue;
}
#endif
/*
* Bind and listen on socket.
*/
if (::bind(sock, ainfo_cur->ai_addr, ainfo_cur->ai_addrlen) == -1) {
cout << "Warning: could not bind to socket, ignoring socket.\n";
close(sock);
continue;
}
if (listen(sock, SimpleMQTTMaxBacklog) == -1) {
cout << "Warning: could not listen on socket, ignoring socket.\n";
close(sock);
continue;
}
listenSockets.push_back(new int(sock));
}
freeaddrinfo(ainfo_head);
#ifdef SimpleMQTTVerbose
cout << "Opened " << listenSockets.size() << " network sockets for MQTT connections.\n";
#endif
}
void SimpleMQTTServer::start()
{
/*
* Check if at least one socket is open.
*/
if (listenSockets.size() == 0) {
throw new invalid_argument("Failed to establish a listen socket with the given configuration.");
}
/*
* Start one accept thread per socket.
*/
for (int i=0; i<listenSockets.size(); i++)
acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i]));
}
void SimpleMQTTServer::stop()
{
/*
* Terminate all running server threads.
*/
acceptThreads.clear();
}
void SimpleMQTTServer::init(string addr, string port)
{
/*
* Assign all class internal variables with sane values and
* do some simple validation checks.
*/
if (addr.empty() || addr.length() == 0)
throw new invalid_argument("The listen address cannot be empty.");
listenAddress = addr;
if (port.empty() || (strtol(port.c_str(), NULL, 10) == 0))
throw new invalid_argument("Network port is not numeric.");
listenPort = port;
/*
* Set up the sockets.
*/
initSockets();
}
SimpleMQTTServer::SimpleMQTTServer()
{
// TODO Auto-generated constructor stub
/*
* Initialize server with default settings.
*/
init("localhost", "1883");
}
SimpleMQTTServer::SimpleMQTTServer(std::string addr, std::string port)
{
/*
* Initialize server to listen on specified address and port.
*/
init(addr, port);
}
SimpleMQTTServer::~SimpleMQTTServer()
{
// TODO Auto-generated destructor stub
/*
* Stop the server operation.
*/
stop();
/*
* Close all listen sockets.
*/
for (int i=0; i<listenSockets.size(); i++)
close(listenSockets[i]);
listenSockets.clear();
}
......@@ -6,6 +6,21 @@
*/
#include <stdint.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <poll.h>
#include <fcntl.h>
#include <exception>
#include <stdexcept>
#include <system_error>
#include <boost/ptr_container/ptr_list.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/thread/mutex.hpp>
#ifndef SIMPLEMQTTSERVER_H_
#define SIMPLEMQTTSERVER_H_
......@@ -14,45 +29,73 @@
* Define the number of connections that are handled by each thread.
* If the number of clients exceeds the specified value, a new thread will be spawned.
*/
#define SimpleMQTTConnectionsPerThread 10
#define MQTT_CONNECT 0x1
#define MQTT_CONNACK 0x2
#define MQTT_PUBLISH 0x3
#define MQTT_PUBACK 0x4
#define MQTT_PUBREC 0x5
#define MQTT_PUBREL 0x6
#define MQTT_PUBCOMP 0x7
#define MQTT_SUBSCRIBE 0x8
#define MQTT_SUBACK 0x9
#define MQTT_UNSUBSCRIBE 0xa
#define MQTT_UNSUBACK 0xb
#define MQTT_PINGREQ 0xc
#define MQTT_PINGRESP 0xd
#define MQTT_DISCONNECT 0xe
#pragma pack(push,1)
typedef union {
uint8_t raw[5];
struct {
uint8_t retain :1;
uint8_t qos :2;
uint8_t dup :1;
uint8_t type :4;
uint8_t remaining_length[4];
} bits;
} MQTTFixedHeader;
#pragma pack(pop)
#ifndef SimpleMQTTConnectionsPerThread
#define SimpleMQTTConnectionsPerThread 16
#endif
/*
* Define the maximum number of threads that will be spawned per socket.
*/
#ifndef SimpleMQTTMaxThreadsPerSocket
#define SimpleMQTTMaxThreadsPerSocket 128
#endif
/*
* Define the maximum backlog size for listen().
*/
#ifndef SimpleMQTTBacklog
#define SimpleMQTTMaxBacklog 100
#endif
/*
* Define the standard wait time for poll() calls.
*/
#ifndef SimpleMQTTPollTimeout
#define SimpleMQTTPollTimeout 100
#endif
/*
* Enable verbose output of the MQTT server.
*/
#define SimpleMQTTVerbose
#include "simplemqttservermessage.h"
#include "simplemqttserverthread.h"
/*
* SimpleMQTTServer Class Definition
*
* Usage:
*
* // Create a simple MQTT server instance listening on localhost, port 1883 (default):
* SimpleMQTTServer s();
*
* // Create a simple MQTT server instance listening on 127.0.0.1 (IPv4 only), port 1234:
* SimpleMQTTServer ss("127.0.0.1", 1234);
*
* s.start(); // Start server
* s.stop(); // Stop server
*
*/
class SimpleMQTTServer
{
protected:
void* run_server_thread(void*);
std::string listenAddress;
std::string listenPort;
boost::ptr_vector<int> listenSockets;
boost::ptr_list<SimpleMQTTServerAcceptThread> acceptThreads;
void init(std::string addr, std::string port);
void initSockets(void);
public:
void start();
void stop();
SimpleMQTTServer();
SimpleMQTTServer(std::string addr, std::string port);
virtual
~SimpleMQTTServer();
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment