ztee.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. /*
  2. * ZTee Copyright 2014 Regents of the University of Michigan
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  5. * use this file except in compliance with the License. You may obtain a copy
  6. * of the License at http://www.apache.org/licenses/LICENSE-2.0
  7. */
  8. #include <stdio.h>
  9. #include <stdlib.h>
  10. #include <string.h>
  11. #include <assert.h>
  12. #include <errno.h>
  13. #include <getopt.h>
  14. #include <pthread.h>
  15. #include <unistd.h>
  16. #include <signal.h>
  17. #include "../lib/lockfd.h"
  18. #include "../lib/logger.h"
  19. #include "../lib/queue.h"
  20. #include "../lib/util.h"
  21. #include "../lib/xalloc.h"
  22. #include "../lib/csv.h"
  23. #include "topt.h"
  24. typedef enum file_format { FORMAT_CSV, FORMAT_JSON, FORMAT_RAW } format_t;
  25. static const char *format_names[] = { "csv", "json", "raw" };
  26. typedef struct ztee_conf {
  27. // Files
  28. char *output_filename;
  29. char *status_updates_filename;
  30. char *log_file_name;
  31. FILE *output_file;
  32. FILE *status_updates_file;
  33. FILE *log_file;
  34. // Log level
  35. int log_level;
  36. // Input formats
  37. format_t in_format;
  38. format_t out_format;
  39. // Output config
  40. int success_only;
  41. // Monitor config
  42. int monitor;
  43. // Field indicies
  44. size_t ip_field;
  45. size_t success_field;
  46. } ztee_conf_t;
  47. static ztee_conf_t tconf;
  48. static int print_from_csv(char *line);
  49. static format_t test_input_format(char *line, size_t len) {
  50. // Check for empty input, remember line contains '\n'
  51. if (len < 2) {
  52. return FORMAT_RAW;
  53. }
  54. if (len >= 3) {
  55. // If the input is JSON, the line should look like
  56. // {.......}\n
  57. if (line[0] == '{' && line[len - 2] == '}') {
  58. return FORMAT_JSON;
  59. }
  60. }
  61. if (strchr(line, ',') != NULL) {
  62. return FORMAT_CSV;
  63. }
  64. return FORMAT_RAW;
  65. }
  66. int done = 0;
  67. int process_done = 0;
  68. int total_read_in = 0;
  69. int read_in_last_sec = 0;
  70. int total_written = 0;
  71. double start_time;
  72. pthread_t threads[3];
  73. //one thread reads in
  74. //one thread writes out and parses
  75. //pops next element and determines what to do
  76. //if zqueue_t is empty and read_in is finished, then
  77. //it exits
  78. void *process_queue (void* my_q);
  79. //uses fgets to read from stdin and add it to the zqueue_t
  80. void *read_in (void* my_q);
  81. //does the same as find UP but finds only successful IPs, determined by the
  82. //is_successful field and flag
  83. void find_successful_IP (char* my_string);
  84. //finds IP in the string of csv and sends it to stdout for zgrab
  85. //you need to know what position is the csv string the ip field is in
  86. //zero indexed
  87. void find_IP (char* my_string);
  88. //writes a csv string out to csv file
  89. //fprintf(stderr, "Is empty inside if %i\n", is_empty(queue));
  90. void write_out_to_file (char* data);
  91. //figure out how many fields are present if it is a csv
  92. void figure_out_fields (char* data);
  93. //check that the output file is either in a csv form or json form
  94. //throws error is it is not either
  95. //NOTE: JSON OUTPUT NOT IMPLEMENTED
  96. void output_file_is_csv();
  97. void print_thread_error();
  98. //monitor code for ztee
  99. //executes every second
  100. void *monitor_ztee(void *my_q);
  101. #define SET_IF_GIVEN(DST,ARG) \
  102. { if (args.ARG##_given) { (DST) = args.ARG##_arg; }; }
  103. #define SET_BOOL(DST,ARG) \
  104. { if (args.ARG##_given) { (DST) = 1; }; }
  105. int main(int argc, char *argv[])
  106. {
  107. struct gengetopt_args_info args;
  108. struct cmdline_parser_params *params;
  109. params = cmdline_parser_params_create();
  110. assert(params);
  111. params->initialize = 1;
  112. params->override = 0;
  113. params->check_required = 0;
  114. if (cmdline_parser_ext(argc, argv, &args, params) != 0) {
  115. exit(EXIT_SUCCESS);
  116. }
  117. signal(SIGPIPE, SIG_IGN);
  118. // Handle help text and version
  119. if (args.help_given) {
  120. cmdline_parser_print_help();
  121. exit(EXIT_SUCCESS);
  122. }
  123. if (args.version_given) {
  124. cmdline_parser_print_version();
  125. exit(EXIT_SUCCESS);
  126. }
  127. // Try opening the log file
  128. tconf.log_level = ZLOG_WARN;
  129. if (args.log_file_given) {
  130. tconf.log_file = fopen(args.log_file_arg, "w");
  131. } else {
  132. tconf.log_file = stderr;
  133. }
  134. // Check for an error opening the log file
  135. if (tconf.log_file == NULL) {
  136. log_init(stderr, tconf.log_level, 0, "ztee");
  137. log_fatal("ztee", "Could not open log file");
  138. }
  139. // Actually init the logging infrastructure
  140. log_init(tconf.log_file, tconf.log_level, 0, "ztee");
  141. // Check for an output file
  142. if (args.inputs_num < 1) {
  143. log_fatal("ztee", "No output file specified");
  144. }
  145. if (args.inputs_num > 1) {
  146. log_fatal("ztee", "Extra positional arguments starting with %s",
  147. args.inputs[1]);
  148. }
  149. tconf.output_filename = args.inputs[0];
  150. tconf.output_file = fopen(tconf.output_filename, "w");
  151. if (!tconf.output_file) {
  152. log_fatal("ztee", "Could not open output file %s, %s",
  153. tconf.output_filename, strerror(errno));
  154. }
  155. // Read actual options
  156. int raw = 0;
  157. SET_BOOL(tconf.success_only, success_only);
  158. SET_BOOL(tconf.monitor, monitor);
  159. SET_BOOL(raw, raw);
  160. // Open the status update file if necessary
  161. if (args.status_updates_file_given) {
  162. // Try to open the status output file
  163. char *filename = args.status_updates_file_arg;
  164. FILE *file = fopen(filename, "w");
  165. if (!file) {
  166. char *err = strerror(errno);
  167. log_fatal("ztee", "unable to open status updates file %s (%s)",
  168. filename, err);
  169. }
  170. // Set the variables in state
  171. tconf.status_updates_filename = filename;
  172. tconf.status_updates_file = file;
  173. }
  174. // Read the first line of the input file
  175. size_t first_line_len = 1024;
  176. char *first_line = xmalloc(first_line_len);
  177. if (getline(&first_line, &first_line_len, stdin) < 0) {
  178. log_fatal("ztee", "reading input to test format failed");
  179. }
  180. // Detect the input format
  181. if (!raw) {
  182. format_t format = test_input_format(first_line, first_line_len);
  183. log_info("ztee", "detected input format %s", format_names[format]);
  184. tconf.in_format = format;
  185. } else {
  186. tconf.in_format = FORMAT_RAW;
  187. log_info("ztee", "raw input");
  188. }
  189. if (tconf.in_format == FORMAT_JSON) {
  190. log_fatal("ztee", "json input not implemented");
  191. }
  192. // Find fields if needed
  193. char *header = strdup(first_line);
  194. int found_success = 0;
  195. int found_ip = 0;
  196. if (tconf.in_format == FORMAT_CSV) {
  197. static const char *success_names[] = { "success" };
  198. static const char *ip_names[] = { "saddr", "ip" };
  199. int success_idx = csv_find_index(header, success_names, 1);
  200. if (success_idx >= 0) {
  201. found_success = 1;
  202. tconf.success_field = (size_t) success_idx;
  203. }
  204. int ip_idx = csv_find_index(header, ip_names, 2);
  205. if (found_ip >= 0) {
  206. found_ip = 1;
  207. tconf.ip_field = (size_t) ip_idx;
  208. }
  209. if (!found_ip) {
  210. log_fatal("ztee", "Unable to find IP/SADDR field");
  211. }
  212. }
  213. if (tconf.success_only) {
  214. if (tconf.in_format != FORMAT_CSV) {
  215. log_fatal("ztee", "success filter requires csv input");
  216. }
  217. if (!found_success) {
  218. log_fatal("ztee", "Could not find success field");
  219. }
  220. }
  221. // Make the queue
  222. zqueue_t* queue = queue_init();
  223. assert(queue);
  224. // Add the first line to the queue if needed
  225. push_back(first_line, queue);
  226. // Start the regular read thread
  227. pthread_t read_thread;
  228. if (pthread_create(&read_thread, NULL, read_in, queue)) {
  229. log_fatal("ztee", "unable to start read thread");
  230. }
  231. // Record the start time
  232. start_time = now();
  233. // Start the process thread
  234. pthread_t process_thread;
  235. if (pthread_create(&process_thread, NULL, process_queue, queue)) {
  236. log_fatal("ztee", "unable to start process thread");
  237. }
  238. // Start the monitor thread if necessary, and join to it
  239. if (tconf.monitor || tconf.status_updates_file) {
  240. pthread_t monitor_thread;
  241. if (pthread_create(&monitor_thread, NULL, monitor_ztee, queue)) {
  242. log_fatal("ztee", "unable to create monitor thread");
  243. }
  244. pthread_join(monitor_thread, NULL);
  245. }
  246. // Join to the remaining threads,
  247. pthread_join(read_thread, NULL);
  248. pthread_join(process_thread, NULL);
  249. return 0;
  250. }
  251. void *process_queue(void* arg)
  252. {
  253. zqueue_t *queue = arg;
  254. FILE *output_file = tconf.output_file;
  255. while (!process_done) {
  256. pthread_mutex_lock(&queue->lock);
  257. while (!done && is_empty(queue)) {
  258. pthread_cond_wait(&queue->empty, &queue->lock);
  259. }
  260. if (done && is_empty(queue)) {
  261. process_done = 1;
  262. pthread_mutex_unlock(&queue->lock);
  263. continue;
  264. }
  265. znode_t *node = pop_front_unsafe(queue);
  266. pthread_mutex_unlock(&queue->lock);
  267. // Write raw data to output file
  268. fprintf(output_file, "%s", node->data);
  269. fflush(output_file);
  270. if (ferror(output_file) != 0) {
  271. log_fatal("ztee", "%s", "Error writing to output file");
  272. }
  273. // Dump to stdout
  274. int stdout_ret;
  275. switch (tconf.in_format) {
  276. case FORMAT_JSON:
  277. log_fatal("ztee", "JSON input format unimplemented");
  278. break;
  279. case FORMAT_CSV:
  280. stdout_ret = print_from_csv(node->data);
  281. break;
  282. default:
  283. // Handle raw
  284. stdout_ret = fprintf(stdout, "%s", node->data);
  285. break;
  286. }
  287. // Check to see if write failed
  288. fflush(stdout);
  289. if (ferror(stdout) != 0 || stdout_ret <= 0) {
  290. log_fatal("ztee", "%s", "Error writing to stdout");
  291. }
  292. // Record output lines
  293. total_written++;
  294. // Free the memory
  295. free(node->data);
  296. free(node);
  297. }
  298. process_done = 1;
  299. fflush(output_file);
  300. fclose(output_file);
  301. return NULL;
  302. }
  303. void *read_in(void* arg)
  304. {
  305. // Allocate buffers
  306. zqueue_t *queue = (zqueue_t*) arg;
  307. size_t length = 1000;
  308. char *input = xcalloc(sizeof(char), length);;
  309. // Read in from stdin and add to back of linked list
  310. while (getline(&input, &length, stdin) > 0) {
  311. push_back(input, queue);
  312. total_read_in++;
  313. read_in_last_sec++;
  314. }
  315. pthread_mutex_lock(&queue->lock);
  316. done = 1;
  317. pthread_cond_signal(&queue->empty);
  318. pthread_mutex_unlock(&queue->lock);
  319. return NULL;
  320. }
  321. int print_from_csv(char *line)
  322. {
  323. if (total_written == 0) {
  324. return 1;
  325. }
  326. if (tconf.success_only) {
  327. char *success_entry = csv_get_index(line, tconf.success_field);
  328. if (success_entry == NULL) {
  329. return 1;
  330. }
  331. int success = 0;
  332. if (atoi(success_entry)) {
  333. success = 1;
  334. } else if (strcasecmp(success_entry, "true") == 0) {
  335. success = 1;
  336. }
  337. if (!success) {
  338. return 1;
  339. }
  340. }
  341. // Find the ip
  342. char *ip = csv_get_index(line, tconf.ip_field);
  343. int ret = fprintf(stdout, "%s\n", ip);
  344. return ret;
  345. }
  346. void output_file_is_csv()
  347. {
  348. return;
  349. /*
  350. char *dot = strrchr(output_filename);
  351. if dot == NULL {
  352. return;
  353. }
  354. */
  355. /*
  356. int length = strlen(output_filename);
  357. char *end_of_file = malloc(sizeof(char*) *4);
  358. strncpy(end_of_file, output_filename+(length - 3), 3);
  359. end_of_file[4] = '\0';
  360. const char *csv = "csv\n";
  361. const char *json = "jso\n";
  362. if(!strncmp(end_of_file, csv, 3) && !strncmp(end_of_file, json, 3)){
  363. log_fatal("ztee", "Invalid output format");
  364. }
  365. if(!strncmp(end_of_file, csv, 3)) output_csv = 1;
  366. if(!strncmp(end_of_file, json, 3)) output_csv = 0;
  367. */
  368. }
  369. void print_thread_error(char* string)
  370. {
  371. fprintf(stderr, "Could not create thread %s\n", string);
  372. return;
  373. }
  374. #define TIME_STR_LEN 20
  375. typedef struct ztee_stats {
  376. // Read stats
  377. uint32_t total_read;
  378. uint32_t read_per_sec_avg;
  379. uint32_t read_last_sec;
  380. // Buffer stats
  381. uint32_t buffer_cur_size;
  382. uint32_t buffer_avg_size;
  383. uint64_t _buffer_size_sum;
  384. // Duration
  385. double _last_age;
  386. uint32_t time_past;
  387. char time_past_str[TIME_STR_LEN];
  388. } stats_t;
  389. void update_stats(stats_t *stats, zqueue_t *queue)
  390. {
  391. double age = now() - start_time;
  392. double delta = age - stats->_last_age;
  393. stats->_last_age = age;
  394. stats->time_past = age;
  395. time_string((int)age, 0, stats->time_past_str, TIME_STR_LEN);
  396. uint32_t total_read = total_read_in;
  397. stats->read_last_sec = (total_read - stats->total_read) / delta;
  398. stats->total_read = total_read;
  399. stats->read_per_sec_avg = stats->total_read / age;
  400. stats->buffer_cur_size = get_size(queue);
  401. stats->_buffer_size_sum += stats->buffer_cur_size;
  402. stats->buffer_avg_size = stats->_buffer_size_sum / age;
  403. }
  404. void *monitor_ztee(void* arg)
  405. {
  406. zqueue_t *queue = (zqueue_t *) arg;
  407. stats_t *stats = xmalloc(sizeof(stats_t));
  408. if (tconf.status_updates_file) {
  409. fprintf(tconf.status_updates_file, "time_past,total_read_in,read_in_last_sec,read_per_sec_avg,buffer_current_size,buffer_avg_size\n");
  410. fflush(tconf.status_updates_file);
  411. }
  412. while (!process_done) {
  413. sleep(1);
  414. update_stats(stats, queue);
  415. if (tconf.monitor) {
  416. lock_file(stderr);
  417. fprintf(stderr, "%5s read_rate: %u rows/s (avg %u rows/s), buffer_size: %u (avg %u)\n",
  418. stats->time_past_str,
  419. stats->read_last_sec,
  420. stats->read_per_sec_avg,
  421. stats->buffer_cur_size,
  422. stats->buffer_avg_size);
  423. fflush(stderr);
  424. unlock_file(stderr);
  425. }
  426. if (tconf.status_updates_file) {
  427. fprintf(tconf.status_updates_file, "%u,%u,%u,%u,%u,%u\n",
  428. stats->time_past,
  429. stats->total_read,
  430. stats->read_last_sec,
  431. stats->read_per_sec_avg,
  432. stats->buffer_cur_size,
  433. stats->buffer_avg_size);
  434. fflush(tconf.status_updates_file);
  435. }
  436. }
  437. if (tconf.monitor) {
  438. lock_file(stderr);
  439. fflush(stderr);
  440. unlock_file(stderr);
  441. }
  442. if (tconf.status_updates_file) {
  443. fflush(tconf.status_updates_file);
  444. fclose(tconf.status_updates_file);
  445. }
  446. return NULL;
  447. }