Commit 329dcd95 authored by Alessio Netti's avatar Alessio Netti
Browse files

Merge remote-tracking branch 'remotes/origin/master' into mqttTopics

parents de31d03b 677ab26c
......@@ -56,7 +56,7 @@ else
OPENSSL_TARGET = $(if $(findstring $(shell uname),Darwin),"darwin64-x86_64-cc","linux-x86_64")
endif
PUBHEADERS = pusherpqueue.h dcdbdaemon.h
PUBHEADERS = dcdbdaemon.h
FULL_CC = $(shell which $(CC))
FULL_CXX = $(shell which $(CXX))
......@@ -120,9 +120,9 @@ doc:
@cd doc/ && doxygen Doxyfile
@echo "Generated docs into doc/html"
all: check-cross-compile deps $(foreach s,$(SUB_DIRS),$(s)-build) doc
all: check-cross-compile deps $(foreach s,$(SUB_DIRS),$(s)-build)
install: depsinstall $(SUB_DIRS) doc
install: depsinstall $(SUB_DIRS)
@cd common/include && install $(PUBHEADERS) $(DCDBDEPLOYPATH)/include/dcdb && cd ..
@echo DONE
......
include ../config.mk
include ../common.mk
#include ../common.mk
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/common/include -I$(DCDBDEPLOYPATH)/include -DVERSION=\"$(VERSION)\"
CXXFLAGS += -DBOOST_NETWORK_ENABLE_HTTPS -I../common/include -I$(DCDBDEPLOYPATH)/include
LIBS = -L../lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -rdynamic
ANALYZERS = aggregator
......
include ../config.mk
CXXFLAGS = -O2 -g --std=c++11 -Wall \
-Wno-unused-function -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option \
-fmessage-length=0 \
-I../common/include/ \
CXXFLAGS += -I../common/include/ \
-I../lib/include \
-I$(DCDBDEPLOYPATH)/include \
-I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include \
-DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_LOG_DYN_LINK \
-DVERSION=\"$(VERSION)\"
-I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include
OBJS = ../common/src/logging.o \
../analytics/AnalyticsManager.o \
......
......@@ -447,7 +447,7 @@ int main(int argc, char* const argv[]) {
cassandraSettings.username, cassandraSettings.password);
dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
uint32_t params[3] = {cassandraSettings.coreConnPerHost, cassandraSettings.maxConnPerHost, cassandraSettings.maxConcRequests};
uint32_t params[1] = {cassandraSettings.coreConnPerHost};
dcdbConn->setBackendParams(params);
if (!dcdbConn->connect()) {
......@@ -510,8 +510,6 @@ int main(int argc, char* const argv[]) {
LOG(info) << " NumThreadsIO: " << cassandraSettings.numThreadsIo;
LOG(info) << " QueueSizeIO: " << cassandraSettings.queueSizeIo;
LOG(info) << " CoreConnPerHost: " << cassandraSettings.coreConnPerHost;
LOG(info) << " MaxConnPerHost: " << cassandraSettings.maxConnPerHost;
LOG(info) << " MaxConcRequests: " << cassandraSettings.maxConcRequests;
LOG(info) << " DebugLog: " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
#ifdef SimpleMQTTVerbose
LOG(info) << " Username: " << cassandraSettings.username;
......
......@@ -38,8 +38,6 @@ cassandra {
numThreadsIo 1
queueSizeIo 4096
coreConnPerHost 1
maxConnPerHost 2
maxConcRequests 100
debugLog false
}
......@@ -42,10 +42,6 @@ void Configuration::readAdditionalBlocks(boost::property_tree::iptree& cfg) {
cassandraSettings.queueSizeIo = stoul(global.second.data());
} else if (boost::iequals(global.first, "coreConnPerHost")) {
cassandraSettings.coreConnPerHost = stoul(global.second.data());
} else if (boost::iequals(global.first, "maxConnPerHost")) {
cassandraSettings.maxConnPerHost = stoul(global.second.data());
} else if (boost::iequals(global.first, "maxConcRequests")) {
cassandraSettings.maxConcRequests = stoul(global.second.data());
} else if (boost::iequals(global.first, "debugLog")) {
cassandraSettings.debugLog = to_bool(global.second.data());
} else {
......
......@@ -32,8 +32,6 @@ public:
uint32_t numThreadsIo = 1;
uint32_t queueSizeIo = 4096;
uint32_t coreConnPerHost = 1;
uint32_t maxConnPerHost = 2;
uint32_t maxConcRequests = 100;
bool debugLog = false;
};
......
//================================================================================
// Name : pusherpqeue.h
// Author : Axel Auweter
// Copyright : Leibniz Supercomputing Centre
// Description : Template class for priority queues used by various DCDB pushers
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
//================================================================================
#include <list>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/lock_types.hpp>
#ifndef PUSHERPQUEUE_H_
#define PUSHERPQUEUE_H_
class ListSemaphore {
private:
unsigned int count_;
boost::mutex countMtx_;
boost::condition_variable cond_;
public:
ListSemaphore(unsigned int initialCount) {
count_ = initialCount;
}
void post() {
boost::unique_lock<boost::mutex> lock(countMtx_);
count_++;
cond_.notify_one();
}
void wait() {
boost::unique_lock<boost::mutex> lock(countMtx_);
while (count_ == 0) {
cond_.wait(lock);
}
count_--;
}
};
template <class T>
class PusherPQueue
{
protected:
struct pListElement_ {
T elem_;
boost::posix_time::ptime dueTime_;
};
ListSemaphore pListSem_;
std::list<struct pListElement_> pList_;
bool abort_;
public:
void insert(T element, boost::posix_time::ptime dueTime);
void sleepUntilNext();
T popNext();
bool empty();
void abort();
unsigned int size();
PusherPQueue();
virtual ~PusherPQueue();
};
template <class T>
void PusherPQueue<T>::insert(T element, boost::posix_time::ptime dueTime)
{
/*
* Insert the element into the priority queue and increase
* the queue length semaphore.
*/
struct pListElement_ e;
e.elem_ = element;
e.dueTime_ = dueTime;
typename std::list<struct pListElement_>::iterator iter;
for (iter = pList_.begin(); iter != pList_.end(); iter++) {
if ((*iter).dueTime_ > dueTime) {
pList_.insert(iter, e);
pListSem_.post();
return;
}
}
/*
* In case the list was empty or the due time of
* the element is after the last element in the
* list, we'll end up here.
*/
pList_.push_back(e);
pListSem_.post();
}
template <class T>
void PusherPQueue<T>::sleepUntilNext()
{
/*
* Block if there is no element in the list. As the
* element is not being removed by this function,
* post the semaphore right after.
*/
pListSem_.wait();
if(abort_)
return;
pListSem_.post();
/*
* Sleep until the first element in the queue
* is due.
*/
pListElement_ first;
first = pList_.front();
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = first.dueTime_ - now;
if (diff.is_negative())
return;
usleep(diff.total_microseconds());
}
template <class T>
T PusherPQueue<T>::popNext()
{
/*
* Pop the first element from the priority queue.
*/
pListSem_.wait();
T ret = pList_.front().elem_;
pList_.pop_front();
return ret;
}
template <class T>
bool PusherPQueue<T>::empty()
{
/*
* Return true if the list is empty.
*/
return pList_.empty();
}
template <class T>
void PusherPQueue<T>::abort()
{
/*
* Break out of a blocked sleepUntilNext()...
*/
abort_ = true;
pListSem_.post();
}
template <class T>
unsigned int PusherPQueue<T>::size()
{
/*
* Return the size of the list
*/
return pList_.size();
}
template <class T>
PusherPQueue<T>::PusherPQueue() : pListSem_(0), abort_(false)
{
/*
* Nothing to be done here...
*/
}
template <class T>
PusherPQueue<T>::~PusherPQueue()
{
/*
* Nothing to be done here...
*/
}
#endif /* PUSHERPQUEUE_H_ */
......@@ -147,8 +147,11 @@ void RESTHttpsServer::handle_session(tcp::socket& socket, ssl::context& ctx) {
if(ec) { ServerLOG(error) << "stream shutdown: " << ec.message(); }
serverError:
//For graceful closure of a connected socket we shut it down first although
//this is not strictly necessary.
//Fails if client already disconnected from socket.
socket.shutdown(tcp::socket::shutdown_both, ec);
if(ec) { ServerLOG(error) << "socket shutdown: " << ec.message(); }
if(ec) { ServerLOG(warning) << "socket shutdown: " << ec.message(); }
socket.close(ec);
if(ec) { ServerLOG(error) << "socket close: " << ec.message(); }
......
......@@ -6,4 +6,5 @@ DEFAULT_VERSION = 0.3
GIT_VERSION = $(shell git describe --tags 2>/dev/null|sed 's/-\([0-9]*\)/.\1/')
VERSION := $(if $(GIT_VERSION),$(GIT_VERSION),$(DEFAULT_VERSION))
CXXFLAGS = -std=c++11 -O2 -g -Wall -fmessage-length=0 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_LOG_DYN_LINK -DVERSION=\"$(VERSION)\"
OS = $(shell uname -s)
......@@ -58,6 +58,8 @@ MQTTPusher::~MQTTPusher() {
if(_connected) {
mosquitto_disconnect(_mosq);
}
mosquitto_destroy(_mosq);
mosquitto_lib_cleanup();
}
void MQTTPusher::push() {
......@@ -160,6 +162,7 @@ void MQTTPusher::push() {
}
}
}
delete[] reads;
mosquitto_disconnect(_mosq);
}
......
include ../config.mk
include ../common.mk
CXXFLAGS = -std=c++11 -O2 -g -Wall \
-Wno-unused-function -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unused-variable \
-DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_LOG_DYN_LINK -DBOOST_NETWORK_ENABLE_HTTPS \
-I$(DCDBBASEPATH)/dcdb/common/include \
-I$(DCDBDEPLOYPATH)/include \
-DVERSION=\"$(VERSION)\"
CXXFLAGS += -DBOOST_NETWORK_ENABLE_HTTPS \
-I../common/include \
-I$(DCDBDEPLOYPATH)/include
LIBS = -L../lib \
-L$(DCDBDEPLOYPATH)/lib/ \
......
include ../config.mk
# C++ Compiler Flags (use fPIC for our dynamic library)
CXXFLAGS = -O2 -ggdb -Wall -Werror \
-fPIC --std=c++11 -I./include -I./include_internal -I$(DCDBDEPLOYPATH)/include \
-I../common/include/ -fmessage-length=0 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG \
-Wno-unused-local-typedef -Wno-unknown-warning-option -Wno-unknown-warning \
-Wno-deprecated-declarations -DVERSION=\"$(VERSION)\"
CXXFLAGS += -Werror -fPIC -I./include -I./include_internal -I$(DCDBDEPLOYPATH)/include -I../common/include/
# List of object files to build and the derived list of corresponding source files
OBJS = src/connection.o \
......
......@@ -313,8 +313,6 @@ void ConnectionImpl::setQueueSizeIo(uint32_t s) {
void ConnectionImpl::setBackendParams(uint32_t* p) {
if(!connected) {
coreConnPerHost = p[0];
maxConnPerHost = p[1];
maxConcRequests = p[2];
}
}
......@@ -373,8 +371,6 @@ bool ConnectionImpl::connect() {
cass_cluster_set_num_threads_io(cluster, numThreadsIo);
cass_cluster_set_queue_size_io(cluster, queueSizeIo);
cass_cluster_set_core_connections_per_host(cluster, coreConnPerHost);
cass_cluster_set_max_connections_per_host(cluster, maxConnPerHost);
cass_cluster_set_max_concurrent_requests_threshold(cluster, maxConcRequests);
/* Force protcol version to 1 */
cass_cluster_set_protocol_version(cluster, 1);
......@@ -539,8 +535,6 @@ ConnectionImpl::ConnectionImpl() {
numThreadsIo = 1;
queueSizeIo = 4096;
coreConnPerHost = 1;
maxConnPerHost = 2;
maxConcRequests = 100;
hostname_ = "localhost";
port_ = 9042;
......
include ../../config.mk
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include -DVERSION=\"$(VERSION)\"
CXXFLAGS += -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include
OBJS = dcdbconfig.o sensoraction.o dbaction.o useraction.o
LIBS = -L../../lib -L$(DCDBDEPLOYPATH)/lib -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto
# GCC 4.8 is broken
......
include ../../config.mk
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DVERSION=\"$(VERSION)\"
CXXFLAGS += -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include
OBJS = dcdbcsvimport.o
LIBS = -L../../lib -L$(DCDBDEPLOYPATH)/lib -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto -lpthread
TARGET = dcdbcsvimport
......
include ../../config.mk
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DVERSION=\"$(VERSION)\"
CXXFLAGS += -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include
OBJS = dcdbquery.o query.o
LIBS = -L../../lib -L$(DCDBDEPLOYPATH)/lib -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto
TARGET = dcdbquery
......
include ../../config.mk
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -Wno-unknown-warning -fmessage-length=0 -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DVERSION=\"$(VERSION)\"
CXXFLAGS += -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include
OBJS = dcdbquerysum.o
LIBS = -L../../lib -L$(DCDBDEPLOYPATH)/lib -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto
TARGET = dcdbquerysum
......
include ../../config.mk
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DVERSION=\"$(VERSION)\"
CXXFLAGS += -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include
OBJS = dcdbunitconv.o
LIBS = -L../../lib -L$(DCDBDEPLOYPATH)/lib -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto
TARGET = dcdbunitconv
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment