Просмотр исходного кода

Improve speed of communication interval retrieval

Move finding the interval with the most abstract communication from
the input CSV/XML to C++. Additionally, use multithreading to find
the interval with as many concurrent threads as make sense, given
the number of logical cores the system provides.

Note: be sure to also move the new botnet library to code/ID2T/,
in case the build script fails to do this (just as before
with libpcapreader).
dustin.born 7 лет назад
Родитель
Сommit
ab7fd200ad

+ 1 - 0
build.sh

@@ -20,6 +20,7 @@ fi
 
 if [ $? -eq 0 ]; then
     cp libpcapreader.so ../../../code/ID2TLib/
+    cp libbotnet.so ../../../code/ID2TLib/
 else
     echo "Error: 'make' did not finish successfully."
     exit

+ 12 - 69
code/ID2TLib/CommunicationProcessor.py

@@ -1,6 +1,7 @@
 from lea import Lea
 from Attack.MembersMgmtCommAttack import MessageType
 from Attack.MembersMgmtCommAttack import Message
+import ID2TLib.libbotnet as bcp
 
 # needed because of machine inprecision. E.g A time difference of 0.1s is stored as >0.1s
 EPS_TOLERANCE = 1e-13  # works for a difference of 0.1, no less
@@ -39,75 +40,17 @@ class CommunicationProcessor():
         self.local_init_ids = set(mapped_ids)
 
     def find_interval_most_comm(self, number_ids: int, max_int_time: float):
-        """
-        Finds the time interval(s) of the given seconds with the most overall communication (i.e. requests and responses)
-        that has at least number_ids communication initiators in it. 
-        :param number_ids: The number of initiator IDs that have to exist in the interval(s)
-        :param max_int_time: The maximum time period of the interval
-        :return: A list of triples, where each triple contains the initiator IDs, the start index and end index
-                 of the respective interval in that order. The indices are with respect to self.packets
-        """
-
-        # setup initial variables
-        packets = self.packets
-        mtypes = self.mtypes
-        idx_low, idx_high = 0, 0  # the indices spanning the interval
-        comm_sum = 0  # the communication sum of the current interval
-        cur_highest_sum = 0  # the highest communication sum seen so far
-        init_ids = []  # the initiator IDs seen in the current interval in order of appearance
-        possible_intervals = []  # all intervals that have cur_highest_sum of communication and contain enough IDs
-
-        # Iterate over all packets from start to finish and process the info of each packet.
-        # Similar to a Sliding Window approach.
-        while True:
-            if idx_high < len(packets):
-                cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
-     
-            # if current interval time exceeds maximum time period, process information of the current interval
-            if greater_than(cur_int_time, max_int_time) or idx_high >= len(packets):
-                interval_ids = set(init_ids)
-                # if the interval contains enough initiator IDs, add it to possible_intervals
-                if len(interval_ids) >= number_ids:
-                    interval = {"IDs": sorted(interval_ids), "Start": idx_low, "End": idx_high-1}
-                    # reset possible intervals if new maximum of communication is found
-                    if comm_sum > cur_highest_sum:
-                        possible_intervals = [interval]
-                        cur_highest_sum = comm_sum
-                    # append otherwise
-                    elif comm_sum == cur_highest_sum:
-                        possible_intervals.append(interval)
-
-                # stop if all packets have been processed
-                if idx_high >= len(packets):
-                    break
-
-            # let idx_low "catch up" so that the current interval time fits into the maximum time period again
-            while greater_than(cur_int_time, max_int_time):
-                cur_packet = packets[idx_low]
-                # if message was no timeout, delete the first appearance of the initiator ID 
-                # of this packet from the initiator list and update comm_sum
-                if mtypes[int(cur_packet["Type"])] != MessageType.TIMEOUT:
-                    comm_sum -= 1
-                    del init_ids[0]
-
-                idx_low += 1
-                cur_int_time = float(packets[idx_high]["Time"]) - float(packets[idx_low]["Time"])
-
-            # consume the new packet at idx_high and process its information
-            cur_packet = packets[idx_high]
-            cur_mtype = mtypes[int(cur_packet["Type"])]
-            # if message is request, add src to initiator list
-            if MessageType.is_request(cur_mtype):
-                init_ids.append(cur_packet["Src"])
-                comm_sum += 1
-            # if message is response, add dst to initiator list
-            elif MessageType.is_response(cur_mtype):
-                init_ids.append(cur_packet["Dst"])
-                comm_sum += 1
-
-            idx_high += 1
-
-        return possible_intervals
+        botproc = bcp.botnet_comm_processor(self.packets)
+        cpp_intervals = botproc.find_interval(number_ids, max_int_time)
+        intervals = []
+        for cpp_interval in cpp_intervals:
+            ids = []
+            for id_ in cpp_interval[0]:
+                ids.append(str(id_))
+            interval = {"IDs": ids, "Start": cpp_interval[1], "End": cpp_interval[2]}
+            intervals.append(interval)
+
+        return intervals
 
     def det_id_roles_and_msgs(self):
         """

+ 6 - 0
code_boost/src/CMakeLists.txt

@@ -25,6 +25,9 @@ SET(CMAKE_CXX_STANDARD_REQUIRED ON)
 # Add the library source files
 SET(SOURCE_FILES cxx/pcap_processor.cpp cxx/pcap_processor.h cxx/statistics.cpp cxx/statistics.h cxx/statistics_db.cpp cxx/statistics_db.h cxx/utilities.h cxx/utilities.cpp)
 
+# Add botnet communication processor source files
+SET(BOT_COMM_PROC_SOURCE cxx/botnet_comm_processor.h cxx/botnet_comm_processor.cpp)
+
 # Include SQLiteCpp library and build it
 option(SQLITECPP_RUN_CPPLINT OFF)
 include_directories(SQLiteCpp/include)
@@ -77,6 +80,9 @@ ADD_LIBRARY(pcapreader SHARED ${SOURCE_FILES})
 # Libs pthread and dl are prerequisites of SQLiteCpp
 TARGET_LINK_LIBRARIES(pcapreader ${Boost_LIBRARIES} "${TINS_LIBRARY}" ${PYTHON_LIBRARIES} SQLiteCpp sqlite3 pthread dl)
 
+ADD_LIBRARY(botnet SHARED ${BOT_COMM_PROC_SOURCE})
+TARGET_LINK_LIBRARIES(botnet ${Boost_LIBRARIES} ${PYTHON_LIBRARIES})
+
 # comment this out to build executable (for development)
 #ADD_EXECUTABLE(cpp-pcapreader ${SOURCE_FILES})
 #TARGET_LINK_LIBRARIES(cpp-pcapreader ${Boost_LIBRARIES} "${TINS_LIBRARY}" SQLiteCpp sqlite3 pthread dl)

+ 229 - 0
code_boost/src/cxx/botnet_comm_processor.cpp

@@ -0,0 +1,229 @@
+#include "botnet_comm_processor.h"
+
+// Use references instead of values to save time?
+
+/**
+ * Creates a new botnet_comm_processor object. 
+ * The abstract python messages are converted to easier-to-handle C++ data structures.
+ * @param messages_pyboost The abstract communication messages 
+ *    represented as (python) list containing (python) dicts.
+ */
+botnet_comm_processor::botnet_comm_processor(py::list messages_pyboost){
+    for (int i = 0; i < len(messages_pyboost); i++){
+        py::dict msg_pyboost = py::extract<py::dict>(messages_pyboost[i]);
+        unsigned int src_id = std::stoi(py::extract<std::string>(msg_pyboost["Src"]));
+        unsigned int dst_id = std::stoi(py::extract<std::string>(msg_pyboost["Dst"]));
+        unsigned short type = (unsigned short) std::stoi(py::extract<std::string>(msg_pyboost["Type"]));
+        double time = std::stod(py::extract<std::string>(msg_pyboost["Time"]));
+        abstract_msg msg = {src_id, dst_id, type, time};
+        messages.push_back(msg);
+    }
+}
+
+/**
+ * Finds the time interval(s) of the given seconds with the most overall communication
+ * (i.e. requests and responses) that has at least number_ids communicating initiators in it. 
+ * @param number_ids The number of initiator IDs that have to exist in the interval(s).
+ * @param max_int_time The maximum time period of the interval.
+ * @return A (python) list of (python) tuple, where each tuple represents an interval with a set of the initiator IDs, 
+ * a start index and an end index in that order. The indices are with respect to the first abstract message.
+ */
+py::list botnet_comm_processor::find_interval(int number_ids, double max_int_time){
+    unsigned int logical_thread_count = std::thread::hardware_concurrency();
+    std::vector<std::thread> threads;
+    std::vector<std::future<std::vector<comm_interval> > > futures;
+
+    // create as many threads as can run concurrently and assign them respective sections
+    for (int i = 0; i < logical_thread_count; i++){
+        unsigned int start_idx = (i * messages.size() / logical_thread_count);
+        unsigned int end_idx = (i + 1) * messages.size() / logical_thread_count;
+        std::promise<std::vector<comm_interval> > p;  // use promises to retrieve return values
+        futures.push_back(p.get_future());
+        threads.push_back(std::thread(&botnet_comm_processor::find_interval_helper, this, std::move(p), number_ids, max_int_time, start_idx, end_idx));
+    }
+
+    // synchronize all threads
+    for (auto &t : threads){
+        t.join();
+    }
+
+    // accumulate results
+    std::vector<std::vector<comm_interval> > acc_possible_intervals;
+    for (auto &f : futures){
+        acc_possible_intervals.push_back(f.get());
+    }
+
+    // find overall most communicative interval
+    std::vector<comm_interval> possible_intervals;
+    unsigned int cur_highest_sum = 0;
+    for (const auto &single_poss_interval : acc_possible_intervals){
+        if (single_poss_interval.size() > 0 && single_poss_interval[0].comm_sum >= cur_highest_sum){
+            // if there is more than one interval, all of them have the same comm_sum
+            if (single_poss_interval[0].comm_sum > cur_highest_sum){
+                cur_highest_sum = single_poss_interval[0].comm_sum;
+                possible_intervals.clear();
+            }
+
+            for (const auto &interval : single_poss_interval){
+                possible_intervals.push_back(std::move(interval));
+            }
+        }
+    }
+
+    // return the result converted into python data structures
+    return convert_to_py_repr(possible_intervals);
+}
+
+/**
+ * Finds the time interval(s) of the given seconds within the given start and end index having the most 
+ * overall communication (i.e. requests and responses) as well as at least number_ids communicating initiators in it. 
+ * @param p An rvalue to a promise to return the found intervals.
+ * @param number_ids The number of initiator IDs that have to exist in the interval(s).
+ * @param max_int_time The maximum time period of the interval.
+ * @param start_idx The index of the first message to process with respect to the class member 'messages'.
+ * @param end_idx The upper index boundary where the search is stopped at (i.e. idx_low does not cross this boundary).
+ */
+void botnet_comm_processor::find_interval_helper(std::promise<std::vector<comm_interval> > && p, int number_ids, double max_int_time, int start_idx, int end_idx){
+    // setup initial variables
+    unsigned int idx_low = start_idx, idx_high = start_idx;  // the indices spanning the interval
+    unsigned int comm_sum = 0;  // the communication sum of the current interval
+    unsigned int cur_highest_sum = 0;  // the highest communication sum seen so far
+    double cur_int_time = 0;  // the time of the current interval
+    std::deque<unsigned int> init_ids;  // the initiator IDs seen in the current interval in order of appearance
+    std::vector<comm_interval> possible_intervals;  // all intervals that have cur_highest_sum of communication and contain enough IDs
+
+    // Iterate over all messages from start to finish and process the info of each message.
+    // Similar to a Sliding Window approach.
+    while (1){
+        if (idx_high < messages.size())
+            cur_int_time = messages[idx_high].time - messages[idx_low].time;
+ 
+        // if current interval time exceeds maximum time period, process information of the current interval
+        if (greater_than(cur_int_time, max_int_time) || idx_high >= messages.size()){
+            std::set<unsigned int> interval_ids;
+
+            for (int i = 0; i < init_ids.size(); i++) 
+                interval_ids.insert(init_ids[i]);
+
+            // if the interval contains enough initiator IDs, add it to possible_intervals
+            if (interval_ids.size() >= number_ids){
+                comm_interval interval = {interval_ids, comm_sum, idx_low, idx_high - 1};
+                // reset possible intervals if new maximum of communication is found
+                if (comm_sum > cur_highest_sum){
+                    possible_intervals.clear();
+                    possible_intervals.push_back(std::move(interval));
+                    cur_highest_sum = comm_sum;
+                }
+                // append otherwise
+                else if (comm_sum == cur_highest_sum)
+                    possible_intervals.push_back(std::move(interval));
+            }
+
+            // stop if all messages have been processed
+            if (idx_high >= messages.size())
+                break;
+        }
+
+        // let idx_low "catch up" so that the current interval time fits into the maximum time period again
+        while (greater_than(cur_int_time, max_int_time)){
+            if (idx_low >= end_idx)
+                goto end; 
+
+            abstract_msg &cur_msg = messages[idx_low];
+            // if message was not a timeout, delete the first appearance of the initiator ID 
+            // of this message from the initiator list and update comm_sum
+            if (cur_msg.type != TIMEOUT){
+                comm_sum -= 1;
+                init_ids.pop_front();
+            }
+
+            idx_low++;
+            cur_int_time = messages[idx_high].time - messages[idx_low].time;
+        }
+
+        // consume the new message at idx_high and process its information
+        abstract_msg &cur_msg = messages[idx_high];
+        // if message is request, add src to initiator list
+        if (msgtype_is_request(cur_msg.type)){
+            init_ids.push_back(cur_msg.src);
+            comm_sum += 1;
+        }
+        // if message is response, add dst to initiator list
+        else if (msgtype_is_response(cur_msg.type)){
+            init_ids.push_back(cur_msg.dst);
+            comm_sum += 1;
+        }
+
+        idx_high += 1;
+    }
+
+    end: p.set_value(possible_intervals);
+}
+
+/**
+ * Checks whether the given message type corresponds to a request.
+ * @param mtype The message type to check.
+ * @return true(1) if the message type is a request, false(0) otherwise.
+ */
+int botnet_comm_processor::msgtype_is_request(unsigned short mtype){
+    return mtype == SALITY_HELLO || mtype == SALITY_NL_REQUEST;
+}
+
+/**
+ * Checks whether the given message type corresponds to a response.
+ * @param mtype The message type to check.
+ * @return true(1) if the message type is a response, false(0) otherwise.
+ */
+int botnet_comm_processor::msgtype_is_response(unsigned short mtype){
+    return mtype == SALITY_HELLO_REPLY || mtype == SALITY_NL_REPLY;
+}
+
+// py::list botnet_comm_processor::std_vector_to_py_list(const std::vector<comm_interval> &intervals){
+//     py::object get_iter = py::iterator<std::vector<comm_interval> >();
+//     py::object iter = get_iter(intervals);
+//     py::list l(iter);
+//     return l;
+// }
+
+// py::list botnet_comm_processor::st_unorderedmap_to_py_dict(const std::vector<comm_interval> &intervals){
+//     py::object get_iter = py::iterator<std::vector<comm_interval> >();
+//     py::object iter = get_iter(intervals);
+//     py::list l(iter);
+//     return l;
+// }
+
+/**
+ * Converts the given vector of communication intervals to a python representation 
+ * using (python) lists and (python) tuples.
+ * @param intervals The communication intervals to convert.
+ * @return A boost::python::list containing the same information using boost::python::tuples for each interval.
+ */
+py::list botnet_comm_processor::convert_to_py_repr(const std::vector<comm_interval> &intervals){
+    py::list py_intervals;
+    for (const auto &interval : intervals){
+        py::list py_ids;
+        for (const auto &id : interval.ids){
+            py_ids.append(id);
+        }
+        py::tuple py_interval = py::make_tuple(py_ids, interval.start_idx, interval.end_idx);
+        py_intervals.append(py_interval);
+    }
+    return py_intervals;
+}
+
+// void botnet_comm_processor::print_message(abstract_msg message){
+//     std::cout << "Src: " << message.src << "   Dst: " << message.dst << "   Type: " << message.type << "   Time: " << message.time << std::endl;
+// }
+
+
+/*
+ * Comment out if executable should be build & run
+ * Comment in if library should be build
+ */
+
+using namespace boost::python;
+
+BOOST_PYTHON_MODULE (libbotnet) {
+    class_<botnet_comm_processor>("botnet_comm_processor", init<list>())
+            .def("find_interval", &botnet_comm_processor::find_interval);
+}

+ 110 - 0
code_boost/src/cxx/botnet_comm_processor.h

@@ -0,0 +1,110 @@
+/*
+ * Class for processing messages containing abstract Membership Management Communication.
+ * A message has to consist of (namely): Src, Dst, Type, Time.
+ */
+
+#ifndef BOTNET_COMM_PROCESSOR_H
+#define BOTNET_COMM_PROCESSOR_H
+
+#include <iostream>
+#include <boost/python.hpp>
+#include <vector>
+#include <chrono>
+#include <thread>
+#include <deque>
+#include <set>
+#include <future>
+
+/*
+ * Botnet communication types (equal to the ones contained in the MessageType class in MembersMgmtCommAttack.py)
+ */
+#define TIMEOUT 3
+#define SALITY_NL_REQUEST 101
+#define SALITY_NL_REPLY 102
+#define SALITY_HELLO 103
+#define SALITY_HELLO_REPLY 104
+
+/*
+ * Needed because of machine inprecision. E.g a time difference of 0.1s is stored as >0.1s
+ */
+#define EPS_TOLERANCE 1e-12  // works for a difference of 0.1
+
+/*
+ * For quick usage
+ */
+namespace py = boost::python;
+
+/*
+ * Definition of structs
+ */
+
+/*
+ * Struct used as data structure to represent an abstract communication message:
+ * - Source ID
+ * - Destination ID
+ * - Message type
+ * - Time of message
+ */
+struct abstract_msg {
+    unsigned int src;
+    unsigned int dst;
+    unsigned short type; 
+    double time;
+};
+
+/*
+ * Struct used as data structure to represent an interval of communication:
+ * - A set of all initiator IDs contained in the interval
+ * - The number of messages sent in the interval (excluding timeouts)
+ * - The start index of the interval with respect to the member variable 'packets'
+ * - The end index of the interval with respect to the member variable 'packets'
+ */
+struct comm_interval {
+    std::set<unsigned int> ids;
+    unsigned int comm_sum;
+    unsigned int start_idx;
+    unsigned int end_idx; 
+};
+
+/*
+ * A greater than operator desgined to handle slight machine inprecision up to EPS_TOLERANCE.
+ * @param a The first number
+ * @param b The second number
+ * @return true (1) if a > b, otherwise false(0)
+*/
+int greater_than(double a, double b){
+    return b - a < -1 * EPS_TOLERANCE;
+}
+
+class botnet_comm_processor {
+
+public:
+    /*
+    * Class constructor
+    */
+    botnet_comm_processor(py::list packets);
+    
+    /*
+     * Methods
+     */
+    py::list find_interval(int number_ids, double max_int_time);
+
+private:
+    /*
+     * Methods
+     */
+    void print_message(abstract_msg packet);
+    int msgtype_is_request(unsigned short mtype);
+    int msgtype_is_response(unsigned short mtype);
+    // py::list std_vector_to_py_list(std::vector<comm_interval> intervals)
+    py::list convert_to_py_repr(const std::vector<comm_interval>& intervals);
+    void find_interval_helper(std::promise<std::vector<comm_interval> > && p, int number_ids, double max_int_time, int start_idx, int end_idx);
+    
+    /*
+     * Attributes
+     */
+    std::vector<abstract_msg> messages;
+}; 
+
+
+#endif //BOTNET_COMM_PROCESSOR_H