connection.cpp 16.8 KB
Newer Older
1
2
//================================================================================
// Name        : connection.cpp
3
// Author      : Axel Auweter, Daniele Tafani
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
Michael Ott's avatar
Michael Ott committed
6
// Description : C++ API implementation for libdcdb connections
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
//================================================================================
27
28
29
30
31
32
33
34

#include <algorithm>
#include <iostream>

#include <boost/lexical_cast.hpp>

#include "cassandra.h"

35
#include "dcdb/connection.h"
36
37
38
#include "connection_internal.h"
#include "dcdbglobals.h"

39
40
using namespace DCDB;

41
/*
42
43
44
 * Definition of the public Connection functions.
 * All calls forward calls of all Connection functions
 * to their counterparts in the ConnectionImpl class.
45
 */
46
void Connection::printError(CassFuture* future) {
47
48
49
  impl->printError(future);
}

Alessio Netti's avatar
Alessio Netti committed
50
51
52
53
54
55
56
57
58
59
60
61
void Connection::setNumThreadsIo(uint32_t n) {
    impl->setNumThreadsIo(n);
}

void Connection::setQueueSizeIo(uint32_t s)  {
    impl->setQueueSizeIo(s);
}

void Connection::setBackendParams(uint32_t* p) {
    impl->setBackendParams(p);
}

62
void Connection::setHostname(std::string hostname) {
63
64
65
  impl->setHostname(hostname);
}

66
std::string Connection::getHostname() {
67
68
69
  return impl->getHostname();
}

70
void Connection::setPort(uint16_t port) {
71
72
73
  impl->setPort(port);
}

74
uint16_t Connection::getPort() {
75
76
77
  return impl->getPort();
}

Michael Ott's avatar
Michael Ott committed
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
void Connection::setUsername(std::string username) {
  impl->setUsername(username);
}

std::string Connection::getUsername() {
  return impl->getUsername();
}

void Connection::setPassword(std::string password) {
  impl->setHostname(password);
}

std::string Connection::getPassword() {
  return impl->getPassword();
}

94
bool Connection::connect() {
95
96
97
  return impl->connect();
}

98
void Connection::disconnect() {
99
100
101
  impl->disconnect();
}

102
bool Connection::initSchema() {
103
104
105
  return impl->initSchema();
}

106
CassSession* Connection::getSessionHandle() {
107
108
109
110
  return impl->getSessionHandle();
}

/*
111
 * Connection constructors & destructor.
112
113
114
 * Upon object creation, allocate a corresponding impl class
 * and free it on object deallocation.
 */
115
116
Connection::Connection() {
  impl = new ConnectionImpl();
117
118
}

119
120
Connection::Connection(std::string hostname, uint16_t port) {
  impl = new ConnectionImpl();
121
122
123
124
  impl->setHostname(hostname);
  impl->setPort(port);
}

Michael Ott's avatar
Michael Ott committed
125
126
127
128
129
130
131
132
Connection::Connection(std::string hostname, uint16_t port, std::string username, std::string password) {
  impl = new ConnectionImpl();
  impl->setHostname(hostname);
  impl->setPort(port);
  impl->setUsername(username);
  impl->setPassword(password);
}

133
Connection::~Connection() {
Axel Auweter's avatar
Axel Auweter committed
134
  delete impl;
135
136
137
138
}


/*
139
 * Definitions of the protected ConnectionImpl class functions
140
141
142
143
144
145
146
 */

/**
 * @details
 * The validation for alphanumeric is implemented by using C++ STL's
 * find_if function. Pretty nice piece of C++!
 */
147
bool ConnectionImpl::validateName(std::string name)
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
{
    /*
     * Make sure name only consists of alphabetical characters or underscores (super ugly)...
     */
    class {
    public:
        static bool check(char c) {
            return !isalpha(c) && !(c == '_');
        }
    } isNotAlpha;

    if (find_if(name.begin(), name.end(), isNotAlpha.check) == name.end())
        return true;
    else
        return false;
}

/**
 * @details
 * This function updates the local copy of the data
 * store schema information. It is called whenever
 * an existsKeyspace query is performed to take note
 * of recently added keyspaces.
 */
172
void ConnectionImpl::updateSchema()
173
174
175
{
    /* Free the memory of the currently known schema definition. */
    if (schema)
176
      cass_schema_meta_free(schema);
177
178

    /* Get the new schema */
179
    schema = cass_session_get_schema_meta(session);
180
181
182
183
184
185
186
187
}

/**
 * @details
 * This function tries to retrieve schema information
 * about a given keyspace to determine if the keyspace
 * exists
 */
188
bool ConnectionImpl::existsKeyspace(std::string name)
189
190
191
{
    updateSchema();

192
    const CassKeyspaceMeta* keyspaceMeta = cass_schema_meta_keyspace_by_name(schema, name.c_str());
193
194
195
196
197
198
199
200
201
202
203
    if (keyspaceMeta != NULL)
      return true;
    else
      return false;
}

/**
 * @details
 * This function creates a new keyspace with a given name and
 * replication factor.
 */
204
void ConnectionImpl::createKeyspace(std::string name, int replicationFactor)
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
{
    std::string query;
    std::string rFact = boost::lexical_cast<std::string>(replicationFactor);

    if(validateName(name)) {
        query = "CREATE KEYSPACE " + name + " WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '" + rFact + "' };";
        if(executeSimpleQuery(query) != CASS_OK) {
            std::cerr << "Failed to create keyspace " << name << "!" << std::endl;
        }
    }
}

/**
 * @details
 * This function sets the current keyspace by issuing a CQL USE
 * statement and setting the currentKeySpace class member to point
 * to the newly selected keyspace.
 */
223
void ConnectionImpl::selectKeyspace(std::string name)
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
{
    std::string query;

    if (validateName(name) && existsKeyspace(name)) {
        query = "USE " + name + ";";
        if (executeSimpleQuery(query) != CASS_OK) {
            std::cerr << "Error selecting keyspace " << name << "!" << std::endl;
        }
        currentKeyspace = name;
    }
}

/**
 * @details
 * If no keyspace was selected the returned string is empty.
 */
240
std::string ConnectionImpl::getActiveKeyspace()
241
242
243
244
245
246
247
248
249
250
{
    return currentKeyspace;
}

/**
 * @details
 * This function tries to query meta information for a given
 * column family (table) to check whether the column family
 * exists.
 */
251
bool ConnectionImpl::existsColumnFamily(std::string name)
252
253
254
{
    updateSchema();

255
    const CassKeyspaceMeta* keyspaceMeta = cass_schema_meta_keyspace_by_name(schema, currentKeyspace.c_str());
256
257
258
259
260
    if (keyspaceMeta == NULL) {
        /* It is a bit misleading to return false if the keyspace doesn't even exist... */
        return false;
    }

261
    const CassTableMeta* tableMeta = cass_keyspace_meta_table_by_name(keyspaceMeta, name.c_str());
262
263
264
265
266
267
268
269
270
271
272
273
    if (tableMeta == NULL) {
        return false;
    }

    return true;
}

/**
 * @details
 * This functions assembles the parameters into a CQL CREATE
 * TABLE query and submits the query to the server.
 */
274
void ConnectionImpl::createColumnFamily(std::string name, std::string fields, std::string primaryKey, std::string options)
275
276
277
278
279
{
    std::stringstream query;

    /* FIXME: Secure this and use proper types for fields, primaryKey, and options. */
    query << "CREATE TABLE " << name
280
281
282
283
284
285
286
        << " ( " << fields << ", PRIMARY KEY (" << primaryKey << "))";

    if (options != "") {
      query << " WITH " << options;
    }

    query << ";";
287
288
289
290
291

    executeSimpleQuery(query.str());
}

/*
292
 * Definitions of the public ConnectionImpl class functions
293
294
 */

295
void ConnectionImpl::printError(CassFuture* future)
296
297
298
299
300
301
302
303
{
  const char *message;
  size_t      length;
  cass_future_error_message(future, &message, &length);

  std::cerr << "Cassandra Backend Error: " << std::string(message, length) << std::endl;
}

Alessio Netti's avatar
Alessio Netti committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
void ConnectionImpl::setNumThreadsIo(uint32_t n) {
    if(!connected)
        numThreadsIo = n;
}

void ConnectionImpl::setQueueSizeIo(uint32_t s) {
    if(!connected)
        queueSizeIo = s;
}

void ConnectionImpl::setBackendParams(uint32_t* p) {
    if(!connected) {
        coreConnPerHost = p[0];
    }
}

320
void ConnectionImpl::setHostname(std::string hostname) {
321
322
323
324
  if (!connected)
    hostname_ = hostname;
}

325
std::string ConnectionImpl::getHostname() {
326
327
328
  return hostname_;
}

329
void ConnectionImpl::setPort(uint16_t port) {
330
331
332
333
  if (!connected)
    port_ = port;
}

334
uint16_t ConnectionImpl::getPort() {
335
336
337
  return port_;
}

Michael Ott's avatar
Michael Ott committed
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
void ConnectionImpl::setUsername(std::string username) {
  if (!connected)
    username_ = username;
}

std::string ConnectionImpl::getUsername() {
  return username_;
}

void ConnectionImpl::setPassword(std::string password) {
  if (!connected)
    password_ = password;
}

std::string ConnectionImpl::getPassword() {
  return password_;
}

356
357
358
359
360
/**
 * @details
 * This function connects to the selected Cassandra
 * front end node using the CQL API.
 */
361
bool ConnectionImpl::connect() {
362
363
364
365
366
367
  if (connected)
    return false;

  /* Set hostname and port */
  cass_cluster_set_contact_points(cluster, hostname_.c_str());
  cass_cluster_set_port(cluster, port_);
Michael Ott's avatar
Michael Ott committed
368
369
370
  if (username_.size() && password_.size()) {
    cass_cluster_set_credentials(cluster, username_.c_str(), password_.c_str());
  }
371

Alessio Netti's avatar
Alessio Netti committed
372
373
374
375
  cass_cluster_set_num_threads_io(cluster, numThreadsIo);
  cass_cluster_set_queue_size_io(cluster, queueSizeIo);
  cass_cluster_set_core_connections_per_host(cluster, coreConnPerHost);

376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
  /* Force protcol version to 1 */
  cass_cluster_set_protocol_version(cluster, 1);

  /* Connect to the server */
  CassError rc = CASS_OK;
  CassFuture* future = cass_session_connect(session, cluster);

  /* Wait for successful connection */
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
      printError(future);
      cass_future_free(future);
      return false;
  }

  cass_future_free(future);
  connected = true;
  return true;
}

398
void ConnectionImpl::disconnect() {
399
400
401
402
403
404
405
406
407
408
  if (!connected)
    return;

  CassFuture* future;
  future = cass_session_close(session);
  cass_future_wait(future);
  cass_future_free(future);
  connected = false;
}

409
CassSession* ConnectionImpl::getSessionHandle() {
410
411
412
  return session;
}

413
CassError ConnectionImpl::executeSimpleQuery(std::string query)
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
{
  CassError rc = CASS_OK;
  CassFuture* future = NULL;
  CassStatement* statement = cass_statement_new(query.c_str(), 0);

  future = cass_session_execute(session, statement);
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
      printError(future);
  }

  cass_future_free(future);
  cass_statement_free(statement);

  return rc;
}

433
bool ConnectionImpl::initSchema() {
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452

  if (!connected)
    return false;

  /* Keyspace and column family for published sensors */
  /* FIXME: We should have a good way to determine the number of replicas here */
  if (!existsKeyspace(CONFIG_KEYSPACE_NAME)) {
      std::cout << "Creating Keyspace " << CONFIG_KEYSPACE_NAME << "...\n";
      createKeyspace(CONFIG_KEYSPACE_NAME, 1);
  }

  selectKeyspace(CONFIG_KEYSPACE_NAME);

  if (!(getActiveKeyspace().compare(CONFIG_KEYSPACE_NAME) == 0)) {
      std::cout << "Cannot select keyspace " << CONFIG_KEYSPACE_NAME << "\n";
      return false;
  }

  if (!existsColumnFamily(CF_PUBLISHEDSENSORS)) {
453
      std::cout << "Creating Column Family " CF_PUBLISHEDSENSORS "...\n";
454
      createColumnFamily(CF_PUBLISHEDSENSORS,
455
456
457
458
459
          "name varchar, "              /* Public name */
          "virtual boolean, "           /* Whether it is a published physical sensor or a virtual sensor */
          "pattern varchar, "           /* In case of physical sensors: pattern for MQTT topics that this sensor matches against */
          "scaling_factor double, "     /* Unused */
          "unit varchar, "              /* Unit of the sensor (e.g. W for Watts) */
460
461
462
          "sensor_mask bigint, "        /* Bit mask that specifies sensor properties. Currently defined ones are:
                                           Integrable: indicates whether the sensor is integrable over time;
                                           Monotonic : indicates whether the collected sensor data is monotonic. */
463
          "operations varchar, "        /* Operations for the sensor (e.g., avg, stdev,...). */
464
          "expression varchar, "        /* For virtual sensors: arithmetic expression to derive the virtual sensor's value */
465
          "vsensorid varchar, "         /* For virtual sensors: Unique sensorId for the sensor in the virtualsensors table */
466
          "tzero bigint, "              /* For virtual sensors: time of the first reading */
467
468
          "interval bigint,"             /* Interval in nanoseconds at which this virtual sensor provides readings */
          "ttl bigint",                 /* Time to live in nanoseconds for readings of this sensor */
469
470

          "name",                       /* Make the "name" column the primary key */
Michael Ott's avatar
Michael Ott committed
471
          "COMPACT STORAGE AND CACHING = {'keys' : 'all'} "); /* Enable compact storage and maximum caching */
472
473
  }

474
  /* Keyspace and column family for raw and virtual sensor data */
475
476
477
478
479
480
481
482
483
484
485
486
487
  if (!existsKeyspace(KEYSPACE_NAME)) {
      std::cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
      createKeyspace(KEYSPACE_NAME, 1);
  }

  selectKeyspace(KEYSPACE_NAME);

  if (!(getActiveKeyspace().compare(KEYSPACE_NAME) == 0)) {
      std::cout << "Cannot select keyspace " << KEYSPACE_NAME << "\n";
      return false;
  }

  if (!existsColumnFamily(CF_SENSORDATA)) {
488
      std::cout << "Creating Column Family " CF_SENSORDATA "...\n";
489
      createColumnFamily(CF_SENSORDATA,
490
491
          "sid varchar, ws smallint, ts bigint, value bigint",
          "sid, ws, ts",
492
493
          "COMPACT STORAGE AND gc_grace_seconds = " SENSORDATA_GC_GRACE_SECONDS 
          " AND compaction = " SENSORDATA_COMPACTION);
494
495
  }

496
  if (!existsColumnFamily(CF_VIRTUALSENSORS)) {
497
      std::cout << "Creating Column Family " CF_VIRTUALSENSORS "...\n";
498
      createColumnFamily(CF_VIRTUALSENSORS,
499
500
          "sid varchar, ws smallint, ts bigint, value bigint",
          "sid, ws, ts",
501
502
          "COMPACT STORAGE AND gc_grace_seconds = " SENSORDATA_GC_GRACE_SECONDS
          " AND compaction = " SENSORDATA_COMPACTION);
503
504
  }

505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
  /* Keyspace and column family for Caliper Event data */
  if (!existsKeyspace(CED_KEYSPACE_NAME)) {
      std::cout << "Creating Keyspace " << CED_KEYSPACE_NAME << "...\n";
      createKeyspace(CED_KEYSPACE_NAME, 1);
  }

  selectKeyspace(CED_KEYSPACE_NAME);

  if (!(getActiveKeyspace().compare(CED_KEYSPACE_NAME) == 0)) {
      std::cout << "Cannot select keyspace " << CED_KEYSPACE_NAME << "\n";
      return false;
  }

  if (!existsColumnFamily(CF_CALIEVTDATA)) {
      std::cout << "Creating Column Family " CF_CALIEVTDATA "...\n";
      createColumnFamily(CF_CALIEVTDATA,
          "name varchar, "  /* Public name */
          "ts bigint, "     /* Timestamp of a Caliper Event */
          "value varchar ", /* String representation of the Event */

          "name, ts",       /* Make the "name" and "ts" column together the primary key */
          "COMPACT STORAGE" /* Enable compact storage */
          );                /* No further options required */
  }

530
531
532
533
534
535
536
537
538
539
540
541
542
543
  /* Keyspace and column family for job data */
  if (!existsKeyspace(JD_KEYSPACE_NAME)) {
      std::cout << "Creating Keyspace " << JD_KEYSPACE_NAME << "...\n";
      createKeyspace(JD_KEYSPACE_NAME, 1);
  }

  selectKeyspace(JD_KEYSPACE_NAME);

  if (!(getActiveKeyspace().compare(JD_KEYSPACE_NAME) == 0)) {
      std::cout << "Cannot select keyspace " << JD_KEYSPACE_NAME << "\n";
      return false;
  }

  if (!existsColumnFamily(CF_JOBDATA)) {
544
      std::cout << "Creating Column Family " CF_JOBDATA "...\n";
545
546
547
548
549
550
551
      createColumnFamily(CF_JOBDATA,
          "jid bigint, "        /* Job Id */
          "uid bigint, "        /* User Id */
          "start_ts bigint, "   /* Start timestamp of the job */
          "end_ts bigint, "     /* End timestamp of the job */
          "nodes set<varchar>", /* Set of nodes used by the job */

552
553
          "jid, start_ts"       /* Make jid + start_ts columns the primary key*/
                                /* Together they should be unique */
554
          );                    /* No further options required */
555
556
  }

557
558
559
  return true;
}

560
ConnectionImpl::ConnectionImpl() {
561
562
563
564
  cluster = nullptr;
  session = nullptr;
  schema = nullptr;

Alessio Netti's avatar
Alessio Netti committed
565
566
567
568
  numThreadsIo = 1;
  queueSizeIo = 4096;
  coreConnPerHost = 1;

569
570
571
  hostname_ = "localhost";
  port_ = 9042;
  connected = false;
Axel Auweter's avatar
Axel Auweter committed
572
573
574
575
576
577
578
579
580
581

  /* Set loglevel to errors since our token() queries will result in unnecessary warnings by the driver */
  cass_log_set_level(CASS_LOG_ERROR);

  /* Create a new cluster object */
  if (!cluster)
    cluster = cass_cluster_new();

  if (!session)
    session = cass_session_new();
582
583
}

584
ConnectionImpl::~ConnectionImpl() {
585
586
  /* Clean up... */
  disconnect();
Axel Auweter's avatar
Axel Auweter committed
587
  if (schema)
588
    cass_schema_meta_free(schema);
589
590
591
592
  if (session)
    cass_session_free(session);
  if (cluster)
    cass_cluster_free(cluster);
Axel Auweter's avatar
Axel Auweter committed
593

594
595
596
}