module_redis_csv.c 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. * ZMap Copyright 2013 Regents of the University of Michigan
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  5. * use this file except in compliance with the License. You may obtain a copy
  6. * of the License at http://www.apache.org/licenses/LICENSE-2.0
  7. */
  8. #include <stdlib.h>
  9. #include <stdio.h>
  10. #include <string.h>
  11. #include <assert.h>
  12. #include <inttypes.h>
  13. #include <sys/socket.h>
  14. #include <netinet/in.h>
  15. #include <arpa/inet.h>
  16. #include "../../lib/logger.h"
  17. #include "../../lib/xalloc.h"
  18. #include "../../lib/redis.h"
  19. #include "output_modules.h"
  20. #define UNUSED __attribute__((unused))
  21. #define BUFFER_SIZE 1000
  22. static char **buffer;
  23. static int buffer_fill = 0;
  24. static char *queue_name = NULL;
  25. int rediscsvmodule_init(struct state_conf *conf, UNUSED char **fields, UNUSED int fieldlens)
  26. {
  27. buffer = xcalloc(BUFFER_SIZE, sizeof(char*));
  28. buffer_fill = 0;
  29. if (conf->output_args) {
  30. redisconf_t *rconf = redis_parse_connstr(conf->output_args);
  31. if (rconf->type == T_TCP) {
  32. log_info("redis-module", "{type: TCP, server: %s, "
  33. "port: %u, list: %s}", rconf->server,
  34. rconf->port, rconf->list_name);
  35. } else {
  36. log_info("redis-module", "{type: LOCAL, path: %s, "
  37. "list: %s}", rconf->path, rconf->list_name);
  38. }
  39. queue_name = rconf->list_name;
  40. } else {
  41. queue_name = strdup("zmap");
  42. }
  43. // generate field names CSV list to be logged.
  44. char *fieldstring = xcalloc(1000, fieldlens);
  45. memset(fieldstring, 0, sizeof(fields));
  46. for (int i=0; i < fieldlens; i++) {
  47. if (i) {
  48. strcat(fieldstring, ", ");
  49. }
  50. strcat(fieldstring, fields[i]);
  51. }
  52. log_info("redis-csv", "the following fields will be output to redis: %s.",
  53. fieldstring);
  54. free(fields);
  55. return redis_init(conf->output_args);
  56. }
  57. static int rediscsvmodule_flush(void)
  58. {
  59. if (redis_lpush_strings((char*) queue_name, buffer, buffer_fill)) {
  60. return EXIT_FAILURE;
  61. }
  62. for (int i=0; i < buffer_fill; i++) {
  63. free(buffer[i]);
  64. }
  65. buffer_fill = 0;
  66. return EXIT_SUCCESS;
  67. }
  68. #define INT_STR_LEN 20 // len(9223372036854775807) == 19
  69. static size_t guess_csv_string_length(fieldset_t *fs)
  70. {
  71. size_t len = 0;
  72. for (int i=0; i < fs->len; i++) {
  73. field_t *f = &(fs->fields[i]);
  74. if (f->type == FS_STRING) {
  75. len += strlen(f->value.ptr);
  76. len += 2; // potential quotes
  77. } else if (f->type == FS_UINT64) {
  78. len += INT_STR_LEN;
  79. } else if (f->type == FS_BINARY) {
  80. len += 2*f->len;
  81. } else if (f->type == FS_NULL) {
  82. // do nothing
  83. } else {
  84. log_fatal("csv", "received unknown output type "
  85. "(not str, binary, null, or uint64_t)");
  86. }
  87. }
  88. // estimated length + number of commas
  89. return len + (size_t) len + 256;
  90. }
  91. static void hex_encode_str(char *f, unsigned char* readbuf, size_t len)
  92. {
  93. char *temp = f;
  94. for(size_t i=0; i < len; i++) {
  95. sprintf(temp, "%02x", readbuf[i]);
  96. temp += (size_t) 2*sizeof(char);
  97. }
  98. }
  99. void make_csv_string(fieldset_t *fs, char *out, size_t len)
  100. {
  101. memset(out, 0, len);
  102. for (int i=0; i < fs->len; i++) {
  103. char *temp = out + (size_t) strlen(out);
  104. field_t *f = &(fs->fields[i]);
  105. char *dataloc = temp;
  106. if (i) { // only add comma if not first element
  107. sprintf(temp, ",");
  108. dataloc += (size_t) 1;
  109. }
  110. if (f->type == FS_STRING) {
  111. if (strlen(dataloc) + strlen((char*) f->value.ptr) >= len) {
  112. log_fatal("redis-csv", "out of memory---will overflow");
  113. }
  114. if (strchr((char*) f->value.ptr, ',')) {
  115. sprintf(dataloc, "\"%s\"", (char*) f->value.ptr);
  116. } else {
  117. sprintf(dataloc, "%s", (char*) f->value.ptr);
  118. }
  119. } else if (f->type == FS_UINT64) {
  120. if (strlen(dataloc) + INT_STR_LEN >= len) {
  121. log_fatal("redis-csv", "out of memory---will overflow");
  122. }
  123. sprintf(dataloc, "%" PRIu64, (uint64_t) f->value.num);
  124. } else if (f->type == FS_BINARY) {
  125. if (strlen(dataloc) + 2*f->len >= len) {
  126. log_fatal("redis-csv", "out of memory---will overflow");
  127. }
  128. hex_encode_str(out, (unsigned char*) f->value.ptr, f->len);
  129. } else if (f->type == FS_NULL) {
  130. // do nothing
  131. } else {
  132. log_fatal("redis-csv", "received unknown output type");
  133. }
  134. }
  135. }
  136. int rediscsvmodule_process(fieldset_t *fs)
  137. {
  138. size_t reqd_space = guess_csv_string_length(fs);
  139. char *x = xmalloc(reqd_space);
  140. make_csv_string(fs, x, reqd_space);
  141. buffer[buffer_fill] = x;
  142. // if full, flush all to redis
  143. if (++buffer_fill == BUFFER_SIZE) {
  144. if (rediscsvmodule_flush()) {
  145. return EXIT_FAILURE;
  146. }
  147. }
  148. return EXIT_SUCCESS;
  149. }
  150. int rediscsvmodule_close(UNUSED struct state_conf* c,
  151. UNUSED struct state_send *s,
  152. UNUSED struct state_recv *r)
  153. {
  154. if (rediscsvmodule_flush()) {
  155. return EXIT_FAILURE;
  156. }
  157. if (redis_close()) {
  158. return EXIT_FAILURE;
  159. }
  160. return EXIT_SUCCESS;
  161. }
  162. output_module_t module_redis_csv = {
  163. .name = "redis-csv",
  164. .init = &rediscsvmodule_init,
  165. .start = NULL,
  166. .update = NULL,
  167. .update_interval = 0,
  168. .close = &rediscsvmodule_close,
  169. .process_ip = &rediscsvmodule_process,
  170. .helptext = "Outputs one or more output fields in csv, and then flushes out to redis. \n"
  171. "By default, the probe module does not filter out duplicates or limit to successful fields, \n"
  172. "but rather includes all received packets. Fields can be controlled by \n"
  173. "setting --output-fileds. Filtering out failures and duplicate packets can \n"
  174. "be achieved by setting an --output-filter."
  175. };