/* * ZTee Copyright 2014 Regents of the University of Michigan * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy * of the License at http://www.apache.org/licenses/LICENSE-2.0 */ #include #include #include #include #include #include #include #include #include #include "../lib/lockfd.h" #include "../lib/logger.h" #include "../lib/queue.h" #include "../lib/util.h" #include "../lib/xalloc.h" #include "../lib/csv.h" #include "topt.h" typedef enum file_format { FORMAT_CSV, FORMAT_JSON, FORMAT_RAW } format_t; static const char *format_names[] = { "csv", "json", "raw" }; typedef struct ztee_conf { // Files char *output_filename; char *status_updates_filename; char *log_file_name; FILE *output_file; FILE *status_updates_file; FILE *log_file; // Log level int log_level; // Input formats format_t in_format; format_t out_format; // Output config int success_only; // Monitor config int monitor; // Field indicies size_t ip_field; size_t success_field; } ztee_conf_t; static ztee_conf_t tconf; static int print_from_csv(char *line); static format_t test_input_format(char *line, size_t len) { // Check for empty input, remember line contains '\n' if (len < 2) { return FORMAT_RAW; } if (len >= 3) { // If the input is JSON, the line should look like // {.......}\n if (line[0] == '{' && line[len - 2] == '}') { return FORMAT_JSON; } } if (strchr(line, ',') != NULL) { return FORMAT_CSV; } return FORMAT_RAW; } int done = 0; int process_done = 0; int total_read_in = 0; int read_in_last_sec = 0; int total_written = 0; double start_time; pthread_t threads[3]; //one thread reads in //one thread writes out and parses //pops next element and determines what to do //if zqueue_t is empty and read_in is finished, then //it exits void *process_queue (void* my_q); //uses fgets to read from stdin and add it to the zqueue_t void *read_in (void* my_q); //does the same as find UP but finds only successful IPs, determined by the //is_successful field and flag void find_successful_IP (char* my_string); //finds IP in the string of csv and sends it to stdout for zgrab //you need to know what position is the csv string the ip field is in //zero indexed void find_IP (char* my_string); //writes a csv string out to csv file //fprintf(stderr, "Is empty inside if %i\n", is_empty(queue)); void write_out_to_file (char* data); //figure out how many fields are present if it is a csv void figure_out_fields (char* data); //check that the output file is either in a csv form or json form //throws error is it is not either //NOTE: JSON OUTPUT NOT IMPLEMENTED void output_file_is_csv(); void print_thread_error(); //monitor code for ztee //executes every second void *monitor_ztee(void *my_q); #define SET_IF_GIVEN(DST,ARG) \ { if (args.ARG##_given) { (DST) = args.ARG##_arg; }; } #define SET_BOOL(DST,ARG) \ { if (args.ARG##_given) { (DST) = 1; }; } int main(int argc, char *argv[]) { struct gengetopt_args_info args; struct cmdline_parser_params *params; params = cmdline_parser_params_create(); assert(params); params->initialize = 1; params->override = 0; params->check_required = 0; if (cmdline_parser_ext(argc, argv, &args, params) != 0) { exit(EXIT_SUCCESS); } signal(SIGPIPE, SIG_IGN); // Handle help text and version if (args.help_given) { cmdline_parser_print_help(); exit(EXIT_SUCCESS); } if (args.version_given) { cmdline_parser_print_version(); exit(EXIT_SUCCESS); } // Try opening the log file tconf.log_level = ZLOG_WARN; if (args.log_file_given) { tconf.log_file = fopen(args.log_file_arg, "w"); } else { tconf.log_file = stderr; } // Check for an error opening the log file if (tconf.log_file == NULL) { log_init(stderr, tconf.log_level, 0, "ztee"); log_fatal("ztee", "Could not open log file"); } // Actually init the logging infrastructure log_init(tconf.log_file, tconf.log_level, 0, "ztee"); // Check for an output file if (args.inputs_num < 1) { log_fatal("ztee", "No output file specified"); } if (args.inputs_num > 1) { log_fatal("ztee", "Extra positional arguments starting with %s", args.inputs[1]); } tconf.output_filename = args.inputs[0]; tconf.output_file = fopen(tconf.output_filename, "w"); if (!tconf.output_file) { log_fatal("ztee", "Could not open output file %s, %s", tconf.output_filename, strerror(errno)); } // Read actual options int raw = 0; SET_BOOL(tconf.success_only, success_only); SET_BOOL(tconf.monitor, monitor); SET_BOOL(raw, raw); // Open the status update file if necessary if (args.status_updates_file_given) { // Try to open the status output file char *filename = args.status_updates_file_arg; FILE *file = fopen(filename, "w"); if (!file) { char *err = strerror(errno); log_fatal("ztee", "unable to open status updates file %s (%s)", filename, err); } // Set the variables in state tconf.status_updates_filename = filename; tconf.status_updates_file = file; } // Read the first line of the input file size_t first_line_len = 1024; char *first_line = xmalloc(first_line_len); if (getline(&first_line, &first_line_len, stdin) < 0) { log_fatal("ztee", "reading input to test format failed"); } // Detect the input format if (!raw) { format_t format = test_input_format(first_line, first_line_len); log_info("ztee", "detected input format %s", format_names[format]); tconf.in_format = format; } else { tconf.in_format = FORMAT_RAW; log_info("ztee", "raw input"); } if (tconf.in_format == FORMAT_JSON) { log_fatal("ztee", "json input not implemented"); } // Find fields if needed char *header = strdup(first_line); int found_success = 0; int found_ip = 0; if (tconf.in_format == FORMAT_CSV) { static const char *success_names[] = { "success" }; static const char *ip_names[] = { "saddr", "ip" }; int success_idx = csv_find_index(header, success_names, 1); if (success_idx >= 0) { found_success = 1; tconf.success_field = (size_t) success_idx; } int ip_idx = csv_find_index(header, ip_names, 2); if (found_ip >= 0) { found_ip = 1; tconf.ip_field = (size_t) ip_idx; } if (!found_ip) { log_fatal("ztee", "Unable to find IP/SADDR field"); } } if (tconf.success_only) { if (tconf.in_format != FORMAT_CSV) { log_fatal("ztee", "success filter requires csv input"); } if (!found_success) { log_fatal("ztee", "Could not find success field"); } } // Make the queue zqueue_t* queue = queue_init(); assert(queue); // Add the first line to the queue if needed push_back(first_line, queue); // Start the regular read thread pthread_t read_thread; if (pthread_create(&read_thread, NULL, read_in, queue)) { log_fatal("ztee", "unable to start read thread"); } // Record the start time start_time = now(); // Start the process thread pthread_t process_thread; if (pthread_create(&process_thread, NULL, process_queue, queue)) { log_fatal("ztee", "unable to start process thread"); } // Start the monitor thread if necessary, and join to it if (tconf.monitor || tconf.status_updates_file) { pthread_t monitor_thread; if (pthread_create(&monitor_thread, NULL, monitor_ztee, queue)) { log_fatal("ztee", "unable to create monitor thread"); } pthread_join(monitor_thread, NULL); } // Join to the remaining threads, pthread_join(read_thread, NULL); pthread_join(process_thread, NULL); return 0; } void *process_queue(void* arg) { zqueue_t *queue = arg; FILE *output_file = tconf.output_file; while (!process_done) { pthread_mutex_lock(&queue->lock); while (!done && is_empty(queue)) { pthread_cond_wait(&queue->empty, &queue->lock); } if (done && is_empty(queue)) { process_done = 1; pthread_mutex_unlock(&queue->lock); continue; } znode_t *node = pop_front_unsafe(queue); pthread_mutex_unlock(&queue->lock); // Write raw data to output file fprintf(output_file, "%s", node->data); fflush(output_file); if (ferror(output_file) != 0) { log_fatal("ztee", "%s", "Error writing to output file"); } // Dump to stdout int stdout_ret; switch (tconf.in_format) { case FORMAT_JSON: log_fatal("ztee", "JSON input format unimplemented"); break; case FORMAT_CSV: stdout_ret = print_from_csv(node->data); break; default: // Handle raw stdout_ret = fprintf(stdout, "%s", node->data); break; } // Check to see if write failed fflush(stdout); if (ferror(stdout) != 0 || stdout_ret <= 0) { log_fatal("ztee", "%s", "Error writing to stdout"); } // Record output lines total_written++; // Free the memory free(node->data); free(node); } process_done = 1; fflush(output_file); fclose(output_file); return NULL; } void *read_in(void* arg) { // Allocate buffers zqueue_t *queue = (zqueue_t*) arg; size_t length = 1000; char *input = xcalloc(sizeof(char), length);; // Read in from stdin and add to back of linked list while (getline(&input, &length, stdin) > 0) { push_back(input, queue); total_read_in++; read_in_last_sec++; } pthread_mutex_lock(&queue->lock); done = 1; pthread_cond_signal(&queue->empty); pthread_mutex_unlock(&queue->lock); return NULL; } int print_from_csv(char *line) { if (total_written == 0) { return 1; } if (tconf.success_only) { char *success_entry = csv_get_index(line, tconf.success_field); if (success_entry == NULL) { return 1; } int success = 0; if (atoi(success_entry)) { success = 1; } else if (strcasecmp(success_entry, "true") == 0) { success = 1; } if (!success) { return 1; } } // Find the ip char *ip = csv_get_index(line, tconf.ip_field); int ret = fprintf(stdout, "%s\n", ip); return ret; } void output_file_is_csv() { return; /* char *dot = strrchr(output_filename); if dot == NULL { return; } */ /* int length = strlen(output_filename); char *end_of_file = malloc(sizeof(char*) *4); strncpy(end_of_file, output_filename+(length - 3), 3); end_of_file[4] = '\0'; const char *csv = "csv\n"; const char *json = "jso\n"; if(!strncmp(end_of_file, csv, 3) && !strncmp(end_of_file, json, 3)){ log_fatal("ztee", "Invalid output format"); } if(!strncmp(end_of_file, csv, 3)) output_csv = 1; if(!strncmp(end_of_file, json, 3)) output_csv = 0; */ } void print_thread_error(char* string) { fprintf(stderr, "Could not create thread %s\n", string); return; } #define TIME_STR_LEN 20 typedef struct ztee_stats { // Read stats uint32_t total_read; uint32_t read_per_sec_avg; uint32_t read_last_sec; // Buffer stats uint32_t buffer_cur_size; uint32_t buffer_avg_size; uint64_t _buffer_size_sum; // Duration double _last_age; uint32_t time_past; char time_past_str[TIME_STR_LEN]; } stats_t; void update_stats(stats_t *stats, zqueue_t *queue) { double age = now() - start_time; double delta = age - stats->_last_age; stats->_last_age = age; stats->time_past = age; time_string((int)age, 0, stats->time_past_str, TIME_STR_LEN); uint32_t total_read = total_read_in; stats->read_last_sec = (total_read - stats->total_read) / delta; stats->total_read = total_read; stats->read_per_sec_avg = stats->total_read / age; stats->buffer_cur_size = get_size(queue); stats->_buffer_size_sum += stats->buffer_cur_size; stats->buffer_avg_size = stats->_buffer_size_sum / age; } void *monitor_ztee(void* arg) { zqueue_t *queue = (zqueue_t *) arg; stats_t *stats = xmalloc(sizeof(stats_t)); if (tconf.status_updates_file) { fprintf(tconf.status_updates_file, "time_past,total_read_in,read_in_last_sec,read_per_sec_avg,buffer_current_size,buffer_avg_size\n"); fflush(tconf.status_updates_file); } while (!process_done) { sleep(1); update_stats(stats, queue); if (tconf.monitor) { lock_file(stderr); fprintf(stderr, "%5s read_rate: %u rows/s (avg %u rows/s), buffer_size: %u (avg %u)\n", stats->time_past_str, stats->read_last_sec, stats->read_per_sec_avg, stats->buffer_cur_size, stats->buffer_avg_size); fflush(stderr); unlock_file(stderr); } if (tconf.status_updates_file) { fprintf(tconf.status_updates_file, "%u,%u,%u,%u,%u,%u\n", stats->time_past, stats->total_read, stats->read_last_sec, stats->read_per_sec_avg, stats->buffer_cur_size, stats->buffer_avg_size); fflush(tconf.status_updates_file); } } if (tconf.monitor) { lock_file(stderr); fflush(stderr); unlock_file(stderr); } if (tconf.status_updates_file) { fflush(tconf.status_updates_file); fclose(tconf.status_updates_file); } return NULL; }