FileSpewer.cpp 4.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
//============================================================================
// Name        : FileSpewer.cpp
// Author      : Axel Auweter
// Version     :
// Copyright   : Leibniz Supercomputing Centre
// Description : Minimalistic tool to push file contents over MQTT
//============================================================================

#include <cstdio>
#include <cstdlib>
11
#include <csignal>
12

13
#include <unistd.h>
14
#include <sys/stat.h>
15
#include <linux/limits.h>
16

17
#include "Settings.h"
18
#include "mosquitto.h"
19
20

Settings* mySettings;
21
22
23
24
25
int keepRunning;

void sigHandler(int sig) {
  keepRunning = 0;
}
26

27
28
void spew() {
  int mosqMajor, mosqMinor, mosqRevision;
29
  FILE *fp;
30
31
32
33
34
35

  /* Print mosquitto version and initialize library */
  mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
  printf("Initializing Mosquitto Library Version %d.%d.%d\n", mosqMajor, mosqMinor, mosqRevision);
  mosquitto_lib_init();

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
  /* Build filename as concatenation of hostname and absolute path and ensure that the file exists and is not a directory */
  char tmpHostname[256];
  if (gethostname(tmpHostname, 255) != 0) {
      fprintf(stderr, "Cannot get hostname.\n");
      exit(EXIT_FAILURE);
  }
  tmpHostname[255] = '\0';

  char tmpFilePath[PATH_MAX];
  if (realpath(mySettings->getFilename().c_str(), tmpFilePath) == NULL) {
      perror(mySettings->getFilename().c_str());
      exit(EXIT_FAILURE);
  }
  tmpFilePath[PATH_MAX-1] = '\0';

  struct stat sbuf;
  stat(tmpFilePath, &sbuf);
  if (!(sbuf.st_mode & (S_IFREG|S_IFCHR))) {
      fprintf(stderr, "%s is not a regular file!\n", tmpFilePath);
      exit(EXIT_FAILURE);
  }

  fp = fopen(tmpFilePath, "r");
  if(!fp) {
      perror(tmpFilePath);
      exit(EXIT_FAILURE);
  }

  std::string clientId;
  clientId = tmpHostname;
  clientId.append("/");
  clientId.append(tmpFilePath);

  printf("Client ID: %s\n", clientId.c_str());

  /* Init mosquitto struct */
  struct mosquitto* mosq;
  mosq = mosquitto_new(clientId.c_str(), false, NULL);
  if (!mosq) {
      perror(NULL);
      exit(EXIT_FAILURE);
  }

  /* Connect to the broker */
  printf("Connecting to broker...");
  fflush(stdout);
  if (mosquitto_connect(mosq, mySettings->getTargetHost().c_str(), 1883, 1000) != MOSQ_ERR_SUCCESS) {
      perror("\nCould not connect to host");
      exit(EXIT_FAILURE);
  }
  printf(" Done.\n");

  /* Catch SIGINT signals */
  signal(SIGINT, sigHandler);


  /* Here comes the main loop */
  keepRunning = 1;
  while(keepRunning) {

      /* Determine file size */
      off_t len;
      if(fseek(fp, 0, SEEK_END)!=0) {
          fprintf(stderr, "Warning: cannot determine file size.\n");
      }
      len = ftello(fp);
      fseek(fp, 0, SEEK_SET);

      /* Alloc send buffer (FIXME: shouldn't be doing this on every iteration) */
      void *data = malloc(len);
      if(!data) {
          fprintf(stderr, "Ouch, malloc failed.\n");
      }

      /* Read data into buffer */
      fread(data, len, 1, fp);

      /* Send the message */
      if(mosquitto_publish(mosq, NULL, tmpFilePath, len, data, 0, false) != MOSQ_ERR_SUCCESS) {
          fprintf(stderr, "Warning: cannot send message.\n");
      }
      else {
118
          printf("Sent %ld bytes...\n", len);
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
      }

      /* Free buffer */
      free(data);

      /* Wait for next iteration */
      usleep(mySettings->getInterval() * 1000);
  }


  /* Disconnect from broker */
  printf("\nDisconnecting from broker...");
  if (mosquitto_disconnect(mosq) != MOSQ_ERR_SUCCESS) {
      fprintf(stderr, "\nError while disconnecting!\n");
      exit(EXIT_FAILURE);
  }

  printf(" Done.\n");
137

138
139
  /* Close the file */
  fclose(fp);
140
141

  /* Cleanup mosquitto library */
142
  mosquitto_destroy(mosq);
143
144
145
  mosquitto_lib_cleanup();
}

146
147
148
149
150
151
152
void usage() {
  printf("Usage: FileSpewer [-i <interval>] [-h <host>] <filename>\n\n");
  printf("FileSpewer will send the contents of <filename> every <interval> milliseconds\n");
  printf("to the broker <host>. The default value for <interval> is 1000 milliseconds and\n");
  printf("the default <host> is localhost.\n");
}

153
int main(int argc, char * const argv[]) {
154
155
156
157
158
159
160
161

  /* Check command line */
  mySettings = new Settings(argc, argv);

  switch(mySettings->getMode()) {
  case USAGE:
    usage();
    break;
162
163
  case SPEW:
    spew();
164
    break;
165
166
167
  case ABORT:
  default:
    exit(EXIT_FAILURE);
168
169
170
171
172
173
174
  }

  /* Clean up */
  delete mySettings;

  return EXIT_SUCCESS;
}