/** * TraCINg-Server - Gathering and visualizing cyber incidents on the world * * Copyright 2013 Matthias Gazzari, Annemarie Mattmann, André Wolski * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * This module provides methods of receiving POST data and storing them in a * database. Additionally it is send via socket.io to every client connected to * the server. */ // load node modules var City = require("geoip").City; // requires 'npm install geoip' and on ubuntu 'aptitude install libgeoip-dev' var city = new City("GeoLiteCity.dat"); var stream = require("stream"); var Parser = require("newline-json").Parser; var Chance = require("chance"); // load internal modules //var db = require("./db/db.js"); var db = require("./db/database.js"); var debugInstance = 0; var Debug = function(){ var output = ""; var instance = debugInstance++; this.get = function(){ return output; }; this.add = function(text){ text = "PostHandler["+instance+"]: " + text; console.log(text); output += text + "\n"; }; }; var cacheIPLocations = {}; function lookUpIp(ip){ cacheIPLocations[ip] = cacheIPLocations[ip] || city.lookupSync(ip); return cacheIPLocations[ip]; } /* Countermeasure logic */ var TIMESLOT_SECONDS = 10; //var WINDOW_MAX_TIMESLOTS = 60*60; var WINDOW_MAX_TIMESLOTS = 60; var monitor_count = {}; var window_list = []; var ALARM_RATIO_THRESHOLD = 1.89; function init_stats_window(initial_value) { var debug = new Debug(); debug.add("initiating window with ratio " + initial_value); for(var x=0; x < WINDOW_MAX_TIMESLOTS; ++x){ window_list[x] = initial_value; } } function update_slot(monitor_id) { if (monitor_id in monitor_count) { monitor_count[monitor_id] += 1; } else { monitor_count[monitor_id] = 1; } } function update_window() { var debug = new Debug(); var timeslot = Math.round(new Date().getTime() / (1000 * TIMESLOT_SECONDS)) % WINDOW_MAX_TIMESLOTS; var amount_monitors = Object.keys(monitor_count).length; var amount_attacks = 0; for (var monitor_id in monitor_count) { amount_attacks += monitor_count[monitor_id]; } if (amount_monitors == 0 || amount_attacks == 0) { debug.add("No monitors counted, setting to last value"); if (timeslot - 1 > 0) { window_list[timeslot] = window_list[timeslot - 1]; } else { window_list[timeslot] = window_list[WINDOW_MAX_TIMESLOTS - 1]; } } else { window_list[timeslot] = amount_attacks / amount_monitors; debug.add("Timeslot / attacks / monitors / ratio: "+ timeslot +" / "+ amount_attacks +" / "+ amount_monitors + " / "+ window_list[timeslot]); } // reset monitor counts monitor_count = {}; var sum_ratio = 0; for (var key in window_list) { sum_ratio += window_list[key]; } var ratio = sum_ratio / WINDOW_MAX_TIMESLOTS; debug.add("SUM(ratio) / ( SUM(ratio)/cnt ): " + sum_ratio + " / " + ratio); if (ratio < ALARM_RATIO_THRESHOLD) { debug.add("!!!!!!!!! ALAAAARM!!!! Ratio is too low! we get attacked!!!"); } } var interval_update_window = setInterval(update_window, TIMESLOT_SECONDS * 1000); var data_empty = { sensorname: "", src: { ip : "", port : "" }, dst: { ip : "", port : "" }, type: "" }; function addEntry(data_new) { var debug = new Debug(); if (data_new === undefined) { debug.add("NO data given: " + data_new); data_new = data_empty; } if (typeof io == 'undefined') { //debug.add("no io, no emit -> I quit"); return; } var chance = new Chance(); // Add a random entry to tracing. This will affect source and destination port, // all other values wil remain constant. ip_src = data_new.src.ip || chance.ip(); ip_dst = data_new.dst.ip || chance.ip(); var data = { sensorname: data_new.sensorname || "RandomMonitor", sensortype: "RandomType", src: { ip: ip_src, port: data_new.src.port || chance.integer({min: 0, max: 65535}), ll: [0,0], country: "", cc: 0, city: "" }, dst: { ip: ip_dst, port: data_new.dst.port || chance.integer({min: 0, max: 65535}), ll: [0,0], country: "", cc: 0, city: "" }, type: data_new.type || 11, log: null, md5sum: "0123456789", date: new Date(), authorized: false }; debug.add("adding, name/ip/port: " + data.sensorname + " / " + data.src.ip + " / " + data.dst.ip + " / " + data.src.port + " / " + data.dst.port); ipa = lookUpIp(ip_src); if(ipa){ data.src["ll"] = [ipa.longitude, ipa.latitude]; data.src["country"] = ipa.country_name; data.src["cc"] = ipa.country_code; data.src["city"] = ipa.city || ""; } ipd = lookUpIp(ip_dst); if(ipd){ data.dst["ll"] = [ipd.longitude, ipd.latitude]; data.dst["country"] = ipd.country_name; data.dst["cc"] = ipd.country_code; data.dst["city"] = ipd.city || ""; } //debug.add("> " + chance.integer({min: 0, max: 65535})); //debug.add("> " + chance.ip()); //debug.add("Sending random entry: " + data["src"]["ip"]); io.sockets.emit("markIncident", data); update_slot(data.sensorname); } // Add random entries function add_entry_random() { addEntry(); } var data_attacker = []; var monitors = [10, 10, 10, 11, 11]; for (var key in monitors) { data_attacker.push( { sensorname: "monitor_" + key, src: { ip : "1.2.3." + key, port : 12345, }, dst: { ip : "1.2.3.4", port : 33, }, type: monitors[key] } ); } var chance_addentry = new Chance(); // Add Entries from a set of known attackers function add_entry_random_known() { var debug = new Debug(); var pos = chance_addentry.integer({min: 0, max: monitors.length - 1}); //debug.add("using: " + pos + " / " + data_attacker[pos]+ " / " + data_attacker[1].sensorname); addEntry(data_attacker[pos]); } init_stats_window(2); //var interval_countermeasure = setInterval(add_entry_random, 1000); var interval_countermeasure = setInterval(add_entry_random_known, 1000); // process incomding data, respond with a status code, broadcast the received // data and store the data in a database function process (response, postData, authorized, sensor, io, request_ip) { var debug = new Debug(); //debug.add("Processing " + (authorized?"authorized":"unauthorized") + " incoming data"); // "from '" + response.connection.remoteAddress + "'" if(postData.length == 0) { debug.add("Received no POST data."); response.writeHead(400, {"Content-Type": "text/plain"}); response.write(debug.get()); response.end(); } else { try { var items = []; var dataStr = postData.trim() + "\n"; //console.log(dataStr); var nlj = new stream.Readable(); var parser = new Parser(); var read = false; nlj._read = function _read(){ if(!read) { nlj.push(dataStr); read = true; } else nlj.push(null); }; parser.on('data', function(parsedData){ // get geodata from ip var ipa, ipd; ipa = lookUpIp(parsedData.src.ip); if(parsedData.dst) { ipd = lookUpIp(parsedData.dst.ip); } // generate data var data = { sensorname: sensor.name || parsedData.sensor && parsedData.sensor.name || "Unknown", sensortype: sensor.type || parsedData.sensor && parsedData.sensor.type || "Unknown", src: { ip: parsedData.src && parsedData.src.ip || null, port: parsedData.src && parsedData.src.port && (parsedData.src.port >= 0) && (parsedData.src.port <= 65535) && parsedData.src.port || 0, ll: [0,0], country: "", cc: 0, city: "" }, dst: { ip: parsedData.dst && parsedData.dst.ip || null, port: parsedData.dst && parsedData.dst.port && (parsedData.dst.port >= 0) && (parsedData.dst.port <= 65535) && parsedData.dst.port || 0, ll: [0,0], country: "", cc: 0, city: "" }, type: parsedData.type || 0, log: parsedData.log || null, md5sum: parsedData.md5sum || null, date: (parsedData.date || parsedData.date === 0) && new Date(parsedData.date*1000) || new Date(), // now authorized: authorized }; if(!data.dst.ip || parsedData.internal_attack || !ipd){ ipd = lookUpIp(request_ip); } if(ipa){ data.src["ll"] = [ipa.longitude, ipa.latitude]; data.src["country"] = ipa.country_name; data.src["cc"] = ipa.country_code; data.src["city"] = ipa.city || ""; } if(ipd){ data.dst["ll"] = [ipd.longitude, ipd.latitude]; data.dst["country"] = ipd.country_name; data.dst["cc"] = ipd.country_code; data.dst["city"] = ipd.city || ""; } if(parsedData.hasOwnProperty("sync_id")){ data["sync_id"] = parseInt(parsedData.sync_id, 10); } if(parsedData.hasOwnProperty("device")){ data["device"] = parsedData.device; } if(parsedData.hasOwnProperty("bssid")){ data["bssid"] = parsedData.bssid; } if(parsedData.hasOwnProperty("ssid")){ data["ssid"] = parsedData.ssid; } if(parsedData.hasOwnProperty("ssid")){ data["internal_attack"] = parsedData.internal_attack; } if(parsedData.hasOwnProperty("external_ip")){ data["external_ip"] = parsedData.external_ip; } items.push(data); update_slot( data["sensorname"] ); if(!ipa) { debug.add("An invalid source IP: " + parsedData.src.ip + " (need to be a valid IP that could be resolved to a location via GeoIP)"); } }); parser.on('end', function(){ if(items.length > 0){ //debug.add("try to insert " + items.length + " item(s) to the database"); // function(..) gets called on succesfull insert into database db.insert(items, function(err, items) { //console.log("dbInsertCallback", arguments); if(err){ debug.add("dbInsert error: " + err); response.writeHead(400, {"Content-Type": "text/plain"}); } else if(!items || items.length == 0){ debug.add("no items inserted to the database"); response.writeHead(400, {"Content-Type": "text/plain"}); err = true; } else{ // debug.add("successfully inserted " + items.length + " item(s) to the database"); response.writeHead(200, {"Content-Type": "text/plain"}); } if(!err){ var forwarded = 0; var now = new Date().getTime(); var minDate = now - 60*60*1000; var maxDate = now + 60*60*1000; for(var i = 0; i < items.length; i++){ var date = items[i].date.getTime(); // only send items that are not older than an hour and not more than an hour in the future if(date >= minDate && date < maxDate){ io.sockets.emit("markIncident", items[i]); // TODO send as one packet / one array forwarded++; } } //debug.add("forwarded " + forwarded + " item(s) live to the webclients"); } response.write(debug.get()); response.end(); }); } else{ debug.add("no items left to insert to the database!"); response.writeHead(400, {"Content-Type": "text/plain"}); response.write(debug.get()); response.end(); } }); parser.on('error', function(e){ console.log(arguments); debug.add("Received invalid POST data. "); response.writeHead(400, {"Content-Type": "text/plain"}); response.write(debug.get()); response.end(); }); nlj.pipe(parser); } catch (e) { debug.add("Received invalid POST data. " + e); debug.add(e.stack); response.writeHead(400, {"Content-Type": "text/plain"}); response.write(debug.get()); response.end(); } } } exports.process = process;