123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- /*
- * ZTee Copyright 2014 Regents of the University of Michigan
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy
- * of the License at http://www.apache.org/licenses/LICENSE-2.0
- */
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <assert.h>
- #include <errno.h>
- #include <getopt.h>
- #include <pthread.h>
- #include <unistd.h>
- #include <signal.h>
- #include "../lib/lockfd.h"
- #include "../lib/logger.h"
- #include "../lib/queue.h"
- #include "../lib/util.h"
- #include "../lib/xalloc.h"
- #include "../lib/csv.h"
- #include "topt.h"
- typedef enum file_format { FORMAT_CSV, FORMAT_JSON, FORMAT_RAW } format_t;
- static const char *format_names[] = { "csv", "json", "raw" };
- typedef struct ztee_conf {
- // Files
- char *output_filename;
- char *status_updates_filename;
- char *log_file_name;
- FILE *output_file;
- FILE *status_updates_file;
- FILE *log_file;
- // Log level
- int log_level;
- // Input formats
- format_t in_format;
- format_t out_format;
- // Output config
- int success_only;
- // Monitor config
- int monitor;
- // Field indicies
- size_t ip_field;
- size_t success_field;
- } ztee_conf_t;
- static ztee_conf_t tconf;
- static int print_from_csv(char *line);
- static format_t test_input_format(char *line, size_t len) {
- // Check for empty input, remember line contains '\n'
- if (len < 2) {
- return FORMAT_RAW;
- }
- if (len >= 3) {
- // If the input is JSON, the line should look like
- // {.......}\n
- if (line[0] == '{' && line[len - 2] == '}') {
- return FORMAT_JSON;
- }
- }
- if (strchr(line, ',') != NULL) {
- return FORMAT_CSV;
- }
- return FORMAT_RAW;
- }
- int done = 0;
- int process_done = 0;
- int total_read_in = 0;
- int read_in_last_sec = 0;
- int total_written = 0;
- double start_time;
- pthread_t threads[3];
- //one thread reads in
- //one thread writes out and parses
- //pops next element and determines what to do
- //if zqueue_t is empty and read_in is finished, then
- //it exits
- void *process_queue (void* my_q);
- //uses fgets to read from stdin and add it to the zqueue_t
- void *read_in (void* my_q);
- //does the same as find UP but finds only successful IPs, determined by the
- //is_successful field and flag
- void find_successful_IP (char* my_string);
- //finds IP in the string of csv and sends it to stdout for zgrab
- //you need to know what position is the csv string the ip field is in
- //zero indexed
- void find_IP (char* my_string);
- //writes a csv string out to csv file
- //fprintf(stderr, "Is empty inside if %i\n", is_empty(queue));
- void write_out_to_file (char* data);
- //figure out how many fields are present if it is a csv
- void figure_out_fields (char* data);
- //check that the output file is either in a csv form or json form
- //throws error is it is not either
- //NOTE: JSON OUTPUT NOT IMPLEMENTED
- void output_file_is_csv();
- void print_thread_error();
- //monitor code for ztee
- //executes every second
- void *monitor_ztee(void *my_q);
- #define SET_IF_GIVEN(DST,ARG) \
- { if (args.ARG##_given) { (DST) = args.ARG##_arg; }; }
- #define SET_BOOL(DST,ARG) \
- { if (args.ARG##_given) { (DST) = 1; }; }
- int main(int argc, char *argv[])
- {
- struct gengetopt_args_info args;
- struct cmdline_parser_params *params;
- params = cmdline_parser_params_create();
- assert(params);
- params->initialize = 1;
- params->override = 0;
- params->check_required = 0;
- if (cmdline_parser_ext(argc, argv, &args, params) != 0) {
- exit(EXIT_SUCCESS);
- }
- signal(SIGPIPE, SIG_IGN);
- // Handle help text and version
- if (args.help_given) {
- cmdline_parser_print_help();
- exit(EXIT_SUCCESS);
- }
- if (args.version_given) {
- cmdline_parser_print_version();
- exit(EXIT_SUCCESS);
- }
- // Try opening the log file
- tconf.log_level = ZLOG_WARN;
- if (args.log_file_given) {
- tconf.log_file = fopen(args.log_file_arg, "w");
- } else {
- tconf.log_file = stderr;
- }
- // Check for an error opening the log file
- if (tconf.log_file == NULL) {
- log_init(stderr, tconf.log_level, 0, "ztee");
- log_fatal("ztee", "Could not open log file");
- }
- // Actually init the logging infrastructure
- log_init(tconf.log_file, tconf.log_level, 0, "ztee");
- // Check for an output file
- if (args.inputs_num < 1) {
- log_fatal("ztee", "No output file specified");
- }
- if (args.inputs_num > 1) {
- log_fatal("ztee", "Extra positional arguments starting with %s",
- args.inputs[1]);
- }
- tconf.output_filename = args.inputs[0];
- tconf.output_file = fopen(tconf.output_filename, "w");
- if (!tconf.output_file) {
- log_fatal("ztee", "Could not open output file %s, %s",
- tconf.output_filename, strerror(errno));
- }
- // Read actual options
- int raw = 0;
- SET_BOOL(tconf.success_only, success_only);
- SET_BOOL(tconf.monitor, monitor);
- SET_BOOL(raw, raw);
- // Open the status update file if necessary
- if (args.status_updates_file_given) {
- // Try to open the status output file
- char *filename = args.status_updates_file_arg;
- FILE *file = fopen(filename, "w");
- if (!file) {
- char *err = strerror(errno);
- log_fatal("ztee", "unable to open status updates file %s (%s)",
- filename, err);
- }
- // Set the variables in state
- tconf.status_updates_filename = filename;
- tconf.status_updates_file = file;
- }
- // Read the first line of the input file
- size_t first_line_len = 1024;
- char *first_line = xmalloc(first_line_len);
- if (getline(&first_line, &first_line_len, stdin) < 0) {
- log_fatal("ztee", "reading input to test format failed");
- }
- // Detect the input format
- if (!raw) {
- format_t format = test_input_format(first_line, first_line_len);
- log_info("ztee", "detected input format %s", format_names[format]);
- tconf.in_format = format;
- } else {
- tconf.in_format = FORMAT_RAW;
- log_info("ztee", "raw input");
- }
- if (tconf.in_format == FORMAT_JSON) {
- log_fatal("ztee", "json input not implemented");
- }
- // Find fields if needed
- char *header = strdup(first_line);
- int found_success = 0;
- int found_ip = 0;
- if (tconf.in_format == FORMAT_CSV) {
- static const char *success_names[] = { "success" };
- static const char *ip_names[] = { "saddr", "ip" };
- int success_idx = csv_find_index(header, success_names, 1);
- if (success_idx >= 0) {
- found_success = 1;
- tconf.success_field = (size_t) success_idx;
- }
- int ip_idx = csv_find_index(header, ip_names, 2);
- if (found_ip >= 0) {
- found_ip = 1;
- tconf.ip_field = (size_t) ip_idx;
- }
- if (!found_ip) {
- log_fatal("ztee", "Unable to find IP/SADDR field");
- }
- }
- if (tconf.success_only) {
- if (tconf.in_format != FORMAT_CSV) {
- log_fatal("ztee", "success filter requires csv input");
- }
- if (!found_success) {
- log_fatal("ztee", "Could not find success field");
- }
- }
- // Make the queue
- zqueue_t* queue = queue_init();
- assert(queue);
- // Add the first line to the queue if needed
- push_back(first_line, queue);
- // Start the regular read thread
- pthread_t read_thread;
- if (pthread_create(&read_thread, NULL, read_in, queue)) {
- log_fatal("ztee", "unable to start read thread");
- }
- // Record the start time
- start_time = now();
- // Start the process thread
- pthread_t process_thread;
- if (pthread_create(&process_thread, NULL, process_queue, queue)) {
- log_fatal("ztee", "unable to start process thread");
- }
- // Start the monitor thread if necessary, and join to it
- if (tconf.monitor || tconf.status_updates_file) {
- pthread_t monitor_thread;
- if (pthread_create(&monitor_thread, NULL, monitor_ztee, queue)) {
- log_fatal("ztee", "unable to create monitor thread");
- }
- pthread_join(monitor_thread, NULL);
- }
- // Join to the remaining threads,
- pthread_join(read_thread, NULL);
- pthread_join(process_thread, NULL);
- return 0;
- }
- void *process_queue(void* arg)
- {
- zqueue_t *queue = arg;
- FILE *output_file = tconf.output_file;
- while (!process_done) {
- pthread_mutex_lock(&queue->lock);
- while (!done && is_empty(queue)) {
- pthread_cond_wait(&queue->empty, &queue->lock);
- }
- if (done && is_empty(queue)) {
- process_done = 1;
- pthread_mutex_unlock(&queue->lock);
- continue;
- }
- znode_t *node = pop_front_unsafe(queue);
- pthread_mutex_unlock(&queue->lock);
- // Write raw data to output file
- fprintf(output_file, "%s", node->data);
- fflush(output_file);
- if (ferror(output_file) != 0) {
- log_fatal("ztee", "%s", "Error writing to output file");
- }
- // Dump to stdout
- int stdout_ret;
- switch (tconf.in_format) {
- case FORMAT_JSON:
- log_fatal("ztee", "JSON input format unimplemented");
- break;
- case FORMAT_CSV:
- stdout_ret = print_from_csv(node->data);
- break;
- default:
- // Handle raw
- stdout_ret = fprintf(stdout, "%s", node->data);
- break;
- }
- // Check to see if write failed
- fflush(stdout);
- if (ferror(stdout) != 0 || stdout_ret <= 0) {
- log_fatal("ztee", "%s", "Error writing to stdout");
- }
- // Record output lines
- total_written++;
- // Free the memory
- free(node->data);
- free(node);
- }
- process_done = 1;
- fflush(output_file);
- fclose(output_file);
- return NULL;
- }
- void *read_in(void* arg)
- {
- // Allocate buffers
- zqueue_t *queue = (zqueue_t*) arg;
- size_t length = 1000;
- char *input = xcalloc(sizeof(char), length);;
- // Read in from stdin and add to back of linked list
- while (getline(&input, &length, stdin) > 0) {
- push_back(input, queue);
- total_read_in++;
- read_in_last_sec++;
- }
- pthread_mutex_lock(&queue->lock);
- done = 1;
- pthread_cond_signal(&queue->empty);
- pthread_mutex_unlock(&queue->lock);
- return NULL;
- }
- int print_from_csv(char *line)
- {
- if (total_written == 0) {
- return 1;
- }
- if (tconf.success_only) {
- char *success_entry = csv_get_index(line, tconf.success_field);
- if (success_entry == NULL) {
- return 1;
- }
- int success = 0;
- if (atoi(success_entry)) {
- success = 1;
- } else if (strcasecmp(success_entry, "true") == 0) {
- success = 1;
- }
- if (!success) {
- return 1;
- }
- }
- // Find the ip
- char *ip = csv_get_index(line, tconf.ip_field);
- int ret = fprintf(stdout, "%s\n", ip);
- return ret;
- }
- void output_file_is_csv()
- {
- return;
- /*
- char *dot = strrchr(output_filename);
- if dot == NULL {
- return;
- }
- */
- /*
- int length = strlen(output_filename);
- char *end_of_file = malloc(sizeof(char*) *4);
- strncpy(end_of_file, output_filename+(length - 3), 3);
- end_of_file[4] = '\0';
- const char *csv = "csv\n";
- const char *json = "jso\n";
- if(!strncmp(end_of_file, csv, 3) && !strncmp(end_of_file, json, 3)){
- log_fatal("ztee", "Invalid output format");
- }
- if(!strncmp(end_of_file, csv, 3)) output_csv = 1;
- if(!strncmp(end_of_file, json, 3)) output_csv = 0;
- */
- }
- void print_thread_error(char* string)
- {
- fprintf(stderr, "Could not create thread %s\n", string);
- return;
- }
- #define TIME_STR_LEN 20
- typedef struct ztee_stats {
- // Read stats
- uint32_t total_read;
- uint32_t read_per_sec_avg;
- uint32_t read_last_sec;
- // Buffer stats
- uint32_t buffer_cur_size;
- uint32_t buffer_avg_size;
- uint64_t _buffer_size_sum;
- // Duration
- double _last_age;
- uint32_t time_past;
- char time_past_str[TIME_STR_LEN];
- } stats_t;
- void update_stats(stats_t *stats, zqueue_t *queue)
- {
- double age = now() - start_time;
- double delta = age - stats->_last_age;
- stats->_last_age = age;
- stats->time_past = age;
- time_string((int)age, 0, stats->time_past_str, TIME_STR_LEN);
- uint32_t total_read = total_read_in;
- stats->read_last_sec = (total_read - stats->total_read) / delta;
- stats->total_read = total_read;
- stats->read_per_sec_avg = stats->total_read / age;
- stats->buffer_cur_size = get_size(queue);
- stats->_buffer_size_sum += stats->buffer_cur_size;
- stats->buffer_avg_size = stats->_buffer_size_sum / age;
- }
- void *monitor_ztee(void* arg)
- {
- zqueue_t *queue = (zqueue_t *) arg;
- stats_t *stats = xmalloc(sizeof(stats_t));
- if (tconf.status_updates_file) {
- fprintf(tconf.status_updates_file, "time_past,total_read_in,read_in_last_sec,read_per_sec_avg,buffer_current_size,buffer_avg_size\n");
- fflush(tconf.status_updates_file);
- }
- while (!process_done) {
- sleep(1);
- update_stats(stats, queue);
- if (tconf.monitor) {
- lock_file(stderr);
- fprintf(stderr, "%5s read_rate: %u rows/s (avg %u rows/s), buffer_size: %u (avg %u)\n",
- stats->time_past_str,
- stats->read_last_sec,
- stats->read_per_sec_avg,
- stats->buffer_cur_size,
- stats->buffer_avg_size);
- fflush(stderr);
- unlock_file(stderr);
- }
- if (tconf.status_updates_file) {
- fprintf(tconf.status_updates_file, "%u,%u,%u,%u,%u,%u\n",
- stats->time_past,
- stats->total_read,
- stats->read_last_sec,
- stats->read_per_sec_avg,
- stats->buffer_cur_size,
- stats->buffer_avg_size);
- fflush(tconf.status_updates_file);
- }
- }
- if (tconf.monitor) {
- lock_file(stderr);
- fflush(stderr);
- unlock_file(stderr);
- }
- if (tconf.status_updates_file) {
- fflush(tconf.status_updates_file);
- fclose(tconf.status_updates_file);
- }
- return NULL;
- }
|