module_mongodb.c 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. /*
  2. * ZMap Copyright 2013 Regents of the University of Michigan
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  5. * use this file except in compliance with the License. You may obtain a copy
  6. * of the License at http://www.apache.org/licenses/LICENSE-2.0
  7. */
  8. #include <stdlib.h>
  9. #include <stdio.h>
  10. #include <string.h>
  11. #include "../../lib/logger.h"
  12. #include "../../lib/xalloc.h"
  13. #include "mongoc.h"
  14. #include "bson.h"
  15. #include "output_modules.h"
  16. #define UNUSED __attribute__((unused))
  17. #define BUFFER_SIZE 50
  18. static int buffer_fill = 0;
  19. static mongoc_client_t *client = NULL;
  20. static mongoc_collection_t *collection = NULL;
  21. static mongoc_bulk_operation_t *bulk = NULL;
  22. void mongodb_module_log(mongoc_log_level_t log_level, const char *log_domain, const char *msg, UNUSED void *user_data)
  23. {
  24. if (log_level == MONGOC_LOG_LEVEL_ERROR) {
  25. log_fatal("mongodb-module", "%s: %s", log_domain, msg);
  26. }
  27. else if (log_level == MONGOC_LOG_LEVEL_CRITICAL){
  28. log_error("mongodb-module", "%s: %s", log_domain, msg);
  29. }
  30. else if (log_level == MONGOC_LOG_LEVEL_WARNING){
  31. log_warn("mongodb-module", "%s: %s", log_domain, msg);
  32. }
  33. else if (log_level == MONGOC_LOG_LEVEL_INFO || log_level == MONGOC_LOG_LEVEL_MESSAGE){
  34. log_info("mongodb-module", "%s: %s", log_domain, msg);
  35. }
  36. else if (log_level == MONGOC_LOG_LEVEL_DEBUG){
  37. log_debug("mongodb-module", "%s: %s", log_domain, msg);
  38. }
  39. else {
  40. log_trace("mongodb-module", "%s: %s", log_domain, msg);
  41. }
  42. }
  43. int mongodb_module_init(struct state_conf *conf, UNUSED char **fields, UNUSED int fieldlens)
  44. {
  45. char *uri_str = NULL;
  46. buffer_fill = 0;
  47. const char *db;
  48. if (conf->output_args) {
  49. uri_str = conf->output_args;
  50. }
  51. mongoc_init();
  52. mongoc_log_set_handler(mongodb_module_log, NULL);
  53. mongoc_uri_t *uri = mongoc_uri_new(uri_str);
  54. if (uri == NULL) {
  55. log_fatal("mongodb-module", "URI %s not valid!", uri_str);
  56. }
  57. client = mongoc_client_new_from_uri(uri);
  58. db = mongoc_uri_get_database(uri);
  59. collection = mongoc_client_get_collection(client, db ? db : strdup("zmap_output"), conf->output_filename ? conf->output_filename : strdup("zmap_output"));
  60. bulk = mongoc_collection_create_bulk_operation(collection,false,NULL);
  61. return EXIT_SUCCESS;
  62. }
  63. static int mongodb_module_flush(void)
  64. {
  65. int ret;
  66. uint32_t bulk_ret;
  67. bson_error_t error;
  68. bson_t reply;
  69. mongoc_bulk_operation_t *old_bulk;
  70. if (buffer_fill == 0){
  71. mongoc_bulk_operation_destroy(bulk);
  72. return EXIT_SUCCESS;
  73. }
  74. bulk_ret = mongoc_bulk_operation_execute(bulk, &reply, &error);
  75. old_bulk = bulk;
  76. if (bulk_ret == 0) {
  77. mongoc_log(MONGOC_LOG_LEVEL_ERROR, "zmap", "Error executing bulk insert: %s", error.message);
  78. ret = EXIT_FAILURE;
  79. } else {
  80. bulk = mongoc_collection_create_bulk_operation(collection,false,NULL);
  81. ret = EXIT_SUCCESS;
  82. }
  83. bson_destroy(&reply);
  84. mongoc_bulk_operation_destroy(old_bulk);
  85. return ret;
  86. }
  87. int mongodb_module_process(fieldset_t *fs)
  88. {
  89. bson_t *doc;
  90. if (!bulk) {
  91. return EXIT_FAILURE;
  92. }
  93. if (buffer_fill == BUFFER_SIZE) {
  94. if (mongodb_module_flush()) {
  95. return EXIT_FAILURE;
  96. }
  97. }
  98. doc = bson_new();
  99. for (int i=0; i < fs->len; i++) {
  100. field_t *f = &(fs->fields[i]);
  101. if (f->type == FS_STRING) {
  102. BSON_APPEND_UTF8(doc,f->name,f->value.ptr);
  103. } else if (f->type == FS_UINT64) {
  104. BSON_APPEND_INT64(doc,f->name,(uint64_t) f->value.num);
  105. } else if (f->type == FS_BINARY) {
  106. BSON_APPEND_BINARY(doc,f->name, BSON_SUBTYPE_BINARY,f->value.ptr, f->len);
  107. } else if (f->type == FS_NULL) {
  108. // do nothing
  109. } else {
  110. log_fatal("mongodb", "received unknown output type");
  111. }
  112. }
  113. mongoc_bulk_operation_insert(bulk,doc);
  114. buffer_fill++;
  115. return EXIT_SUCCESS;
  116. }
  117. int mongodb_module_close(UNUSED struct state_conf* c,
  118. UNUSED struct state_send* s,
  119. UNUSED struct state_recv* r)
  120. {
  121. if (mongodb_module_flush()) {
  122. return EXIT_FAILURE;
  123. }
  124. mongoc_collection_destroy(collection);
  125. mongoc_client_destroy(client);
  126. mongoc_cleanup();
  127. return EXIT_SUCCESS;
  128. }
  129. output_module_t module_mongodb = {
  130. .name = "mongodb",
  131. .init = &mongodb_module_init,
  132. .start = NULL,
  133. .update = NULL,
  134. .update_interval = 0,
  135. .close = &mongodb_module_close,
  136. .process_ip = &mongodb_module_process,
  137. .helptext = "Write output to MongoDB. Defaults to mongodb://localhost:27017/zmap_output. Specify a custom connection URI in output module args."
  138. };