/** * TraCINg-Server - Gathering and visualizing cyber incidents on the world * * Copyright 2013 Matthias Gazzari, Annemarie Mattmann, André Wolski * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * This module provides methods of receiving POST data and storing them in a * database. Additionally it is send via socket.io to every client connected to * the server. */ // load node modules var City = require("geoip").City; // requires 'npm install geoip' and on ubuntu 'aptitude install libgeoip-dev' var city = new City("GeoLiteCity.dat"); var stream = require("stream"); var Parser = require("newline-json").Parser; // load internal modules //var db = require("./db/db.js"); var db = require("./db/database.js"); var praMitigation = require("./proberesponseMitigation.js"); //praMitigation.setActive(false); praMitigation.setActive(true); var debugInstance = 0; var Debug = function(){ var output = ""; var instance = debugInstance++; this.get = function(){ return output; }; this.add = function(text){ text = "PostHandler["+instance+"]: " + text; console.log(text); output += text + "\n"; }; }; var cacheIPLocations = {}; function lookUpIp(ip){ cacheIPLocations[ip] = cacheIPLocations[ip] || city.lookupSync(ip); return cacheIPLocations[ip]; } // process incomding data, respond with a status code, broadcast the received // data and store the data in a database function process (response, postData, authorized, sensor, io, request_ip) { var debug = new Debug(); //debug.add("Processing " + (authorized?"authorized":"unauthorized") + " incoming data"); // "from '" + response.connection.remoteAddress + "'" if(postData.length == 0) { debug.add("Received no POST data."); response.writeHead(400, {"Content-Type": "text/plain"}); response.write(debug.get()); response.end(); } else { try { var items = []; var dataStr = postData.trim() + "\n"; //console.log(dataStr); var nlj = new stream.Readable(); var parser = new Parser(); var read = false; nlj._read = function _read(){ if(!read) { nlj.push(dataStr); read = true; } else nlj.push(null); }; parser.on('data', function(parsedData){ // get geodata from ip var ipa, ipd; ipa = lookUpIp(parsedData.src.ip); if(parsedData.dst) { ipd = lookUpIp(parsedData.dst.ip); } // generate data var data = { sensorname: sensor.name || parsedData.sensor && parsedData.sensor.name || "Unknown", sensortype: sensor.type || parsedData.sensor && parsedData.sensor.type || "Unknown", src: { ip: parsedData.src && parsedData.src.ip || null, port: parsedData.src && parsedData.src.port && (parsedData.src.port >= 0) && (parsedData.src.port <= 65535) && parsedData.src.port || 0, ll: [0,0], country: "", cc: 0, city: "" }, dst: { ip: parsedData.dst && parsedData.dst.ip || null, port: parsedData.dst && parsedData.dst.port && (parsedData.dst.port >= 0) && (parsedData.dst.port <= 65535) && parsedData.dst.port || 0, ll: [0,0], country: "", cc: 0, city: "" }, type: parsedData.type || 0, log: parsedData.log || null, md5sum: parsedData.md5sum || null, date: (parsedData.date || parsedData.date === 0) && new Date(parsedData.date*1000) || new Date(), // now authorized: authorized }; if(!data.dst.ip || parsedData.internal_attack || !ipd){ ipd = lookUpIp(request_ip); } if(ipa){ data.src["ll"] = [ipa.longitude, ipa.latitude]; data.src["country"] = ipa.country_name; data.src["cc"] = ipa.country_code; data.src["city"] = ipa.city || ""; } if(ipd){ data.dst["ll"] = [ipd.longitude, ipd.latitude]; data.dst["country"] = ipd.country_name; data.dst["cc"] = ipd.country_code; data.dst["city"] = ipd.city || ""; } if(parsedData.hasOwnProperty("sync_id")){ data["sync_id"] = parseInt(parsedData.sync_id, 10); } if(parsedData.hasOwnProperty("device")){ data["device"] = parsedData.device; } if(parsedData.hasOwnProperty("bssid")){ data["bssid"] = parsedData.bssid; } if(parsedData.hasOwnProperty("ssid")){ data["ssid"] = parsedData.ssid; } if(parsedData.hasOwnProperty("ssid")){ data["internal_attack"] = parsedData.internal_attack; } if(parsedData.hasOwnProperty("external_ip")){ data["external_ip"] = parsedData.external_ip; } items.push(data); if(!ipa) { // TODO: remove debug for simulation //debug.add("An invalid source IP: " + parsedData.src.ip + " (need to be a valid IP that could be resolved to a location via GeoIP)"); } }); parser.on('end', function(){ if(items.length > 0){ //debug.add("try to insert " + items.length + " item(s) to the database"); /* =================== */ // for simulation: don't store to database to avoid overhead, after all MongoDB seems to be too slow response.writeHead(200, {"Content-Type": "text/plain"}); var forwarded = 0; var now = new Date().getTime(); var minDate = now - 60*60*1000; var maxDate = now + 60*60*1000; //debug.add("after db insert: emitting"); for(var i = 0; i < items.length; i++){ var date = items[i].date.getTime(); // only send items that are not older than an hour and not more than an hour in the future if(date >= minDate && date < maxDate){ if (!praMitigation.applyMitigation(items[i].src.ip, items[i].dst.ip, items[i].dst.port, items[i].sensorname)) { //debug.add("emitting for attacked monitor: "+ items[i].dst.ip); io.sockets.emit("markIncident", items[i]); // TODO send as one packet / one array } forwarded++; } } response.end(); /* =================== */ /* // function(..) gets called on succesfull insert into database db.insert(items, function(err, items) { //console.log("dbInsertCallback", arguments); if(err){ debug.add("dbInsert error: " + err); response.writeHead(400, {"Content-Type": "text/plain"}); } else if(!items || items.length == 0){ debug.add("no items inserted to the database"); response.writeHead(400, {"Content-Type": "text/plain"}); err = true; } else{ // debug.add("successfully inserted " + items.length + " item(s) to the database"); response.writeHead(200, {"Content-Type": "text/plain"}); } if(!err){ var forwarded = 0; var now = new Date().getTime(); var minDate = now - 60*60*1000; var maxDate = now + 60*60*1000; //debug.add("after db insert: emitting"); for(var i = 0; i < items.length; i++){ var date = items[i].date.getTime(); // only send items that are not older than an hour and not more than an hour in the future if(date >= minDate && date < maxDate){ if (!praMitigation.applyMitigation(items[i].src.ip)) { //debug.add("emitting for attacked monitor: "+ items[i].dst.ip); io.sockets.emit("markIncident", items[i]); // TODO send as one packet / one array } forwarded++; } } //debug.add("forwarded " + forwarded + " item(s) live to the webclients"); } response.write(debug.get()); response.end(); }); */ } else{ debug.add("no items left to insert to the database!"); response.writeHead(400, {"Content-Type": "text/plain"}); response.write(debug.get()); response.end(); } }); parser.on('error', function(e){ console.log(arguments); debug.add("Received invalid POST data. "); response.writeHead(400, {"Content-Type": "text/plain"}); response.write(debug.get()); response.end(); }); nlj.pipe(parser); } catch (e) { debug.add("Received invalid POST data. " + e); debug.add(e.stack); response.writeHead(400, {"Content-Type": "text/plain"}); response.write(debug.get()); response.end(); } } } exports.process = process;