#!/usr/bin/python3 ## New implementation of TrustMiner using python and mongodb ## Nikos import sys import os from pymongo import MongoClient #mongodb assumes database at default path import logging, sys import configparser import json import urllib.request import datetime import debian_advisory logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) #load config file as library config = configparser.ConfigParser() config.read('config_test') if config.sections == []: print('configuration file not found\n') sys.exit(1) #global variables secperday = 60*60*24 now = datetime.datetime.now() verbosity = 1 ############################################################################### ## logging # 1 fatal errors # 2 errors # 3 note # 4 trace # 5 debug def msg(lvl,msg): if lvl <= int(config['LOG']['loglevel']): print(msg) def debug(msg): msg(5, msg) # Need to see if this is necessary ## load state, different from DBs in that we always need it def load_state(): cache = config['DIR']['cache_dir'] + 'state' err = 0 state = dict() try: with open(cache) as json_data: state = json.load(json_data) except FileNotFoundError: # Load default state - start from the beginning state['cache_dir'] = cache state['next_adv'] = 0 state['next_fsa'] = 0 state['Packages'] = '' state['Sources'] = '' state['Sha1Sums'] = '' err += 1 return (state, err) ############################################################################### ## save state, different from DBs in that we always need it def save_state(state): cache = config['DIR']['cache_dir'] + 'state' try: with open(cache, 'w') as fp: json.dump(state, fp) except IOError: print('write cache failed!! Fatal error') sys.exit(1) ############################################################################### ## load sha lists :TODO later def load_sha1lists(): cache = config['DIR']['cache_dir'] + 'state' ############################################################################### ## save sha lists :TODO later def save_sha1lists(): pass ############################################################################### ## load from files :TODO later def load_DBs(): pass ############################################################################### ## save to files :TODO later def save_DBs(): pass ############################################################################### ## Fetch current Packages, Sources and sha1sums files ## These are needed to find CVE stats by sha1sums/pkg-names ## Only Sha1Sums is custom generated, others are from Debian. ## FIXME: Server might do on-the-fly gzip (but should not for bzip2) ## Return: 1 on success, to signal that new parsing is needed. def fetchMeta(filename): urlbase = config['URL']['pkg_base_url'] mydir = config['DIR']['cache_dir'] bzFile = filename + '.bz2' url = urlbase + bzFile logging.info('Checking meta file from ' + url + '\n') # Download file urllib.request.urlretrieve(url, mydir + bzfile) # TODO catch exceptions like file not found # TODO check if file has changed, if it is new unpack ############################################################################### # Sources and Packages are not completely consistent, esp for debian-multimedia # He we store manual mappings for these.. def addOrphanPkgs(pkg2src): pkg2src['liblame-dev'] = "lame"; pkg2src['lame-extras'] = "lame"; pkg2src['moonlight'] = "moon"; pkg2src['libmoon0'] = "moon"; pkg2src['xmms-mp4'] = "xmms2"; pkg2src['xmms-mp4'] = "xmms2"; pkg2src['lazarus-src-0.9.30'] = "lazarus"; pkg2src['lazarus-ide-0.9.30'] = "lazarus"; pkg2src['lcl-qt4-0.9.30'] = "lazarus"; pkg2src['lazarus-ide-qt4-0.9.30'] = "lazarus"; pkg2src['lcl-gtk2-0.9.30'] = "lazarus"; pkg2src['lazarus-ide-gtk2-0.9.30'] = "lazarus"; pkg2src['lcl-units-0.9.30'] = "lazarus"; pkg2src['lazarus-0.9.30'] = "lazarus"; pkg2src['lazarus-doc-0.9.30'] = "lazarus"; pkg2src['lcl-0.9.30'] = "lazarus"; pkg2src['lcl-utils-0.9.30'] = "lazarus"; pkg2src['lcl-nogui-0.9.30'] = "lazarus"; pkg2src['libx264-65'] = "x264"; pkg2src['libx264-114'] = "x264"; pkg2src['libx264-60'] = "x264"; # pkg2src['libmlt3'] # pkg2src['libgmerlin-avdec0'] # pkg2src['libxul-dev'] # pkg2src['libmyth-0.23.1-0'] # pkg2src['libmpeg3hv'] # pkg2src['libquicktimehv'] # pkg2src['libxul0d'] # pkg2src['acroread-fonts-kor'] ############################################################################### ## Parse dpkg Packages file, create map deb-name->pkg-name def parsePackages(pkgfile): mydir = cache = config['DIR']['cache_dir'] deb2pkg = dict() pkg2virt = dict() virt2pkg = () logging.info('Parsing Packages file...\n') pkgfile = mydir + pkgfile #TODO open and parse pkg file ############################################################################### ## Parse dpkg Sources file, create map pkg-name->src-name def parseSources(srcfile): mydir = cache = config['DIR']['cache_dir'] checklinecont = 0 pkg2src = dict() logging.info('Parsing Sources file...\n') srcfile = mydir + srcfile #TODO open and parse sources file ############################################################################### def getSHA1(myhash, collection): return collection.find({"hash": myhash}) ############################################################################### def addSHA1(myhash, deb, src): dic = getSHA1(myhash) thash = dic["hash"] tdeb = dic["deb"] tsrc = dic["src"] #TODO insert SHA to database ############################################################################### ## Parse Sha1Sums file. Format: "sha1sum::deb-name::unix-file-path" ## Create 2 maps: sha1sum->file, file->deb-name def parseSha1Sums(sha1file): pass ############################################################################### ## Parse local dpkg status, return list of debs def parseStatus(stsfile): pass ############################################################################### ## Parse Advisory (only Debian supported atm def parseAdvisory(adv): if state['vendor'] == 'debian': return parseDSAhtml(adv) else: print('Unsupported distribution. We only support Debian at the moment') system.exit(1) ############################################################################### ## Manually fix problems with Advisory entries def fixAdvisoryQuirks(arg, state): if state['vendor'] == 'debian': return fixDSAquirks(arg) else: print('Unsupported distribution. We only support Debian at the moment') system.exit(1) ############################################################################### ## Extract CVE ids from new advisories and print URL for mirror script def printCVEs(myid,adv): logging.info('Looking for CVEs in advisory...\n') dsastats = parseAdvisory(adv) if dsastats == []: return ## fix DSAs that don't contain correct CVE refs dsastats = fixAdvisoryQuirks(myid, dsastats); #TODO Fix this part ##for cve_id in dsastats ############################################################################### ## Update internal vuln. DB with new Advisory info ## Creates CVEtable for MTBF computation: ## ( cve-id => (date, delay, score1, score2, score3)) def updateCVETables(myid, dsatable, state): logging.info('Updating vulnerability database with advisory ' + state['vendor'] + myid + ' \n') adv = dsatable[myid] dsastats = parseAdvisory(adv) if dsastats == []: return dsastats = fixAdvisoryQuirks(myid, dsastats) for srcpkg in dsastats[0]: src2dsa[srcpkg] = myid dsa2cve[myid] = dsastats[2] for cve_id in dsastats[2]: # No fetch CVE We use mongodb and cve-search cve = fetchCVE(cve_id) cvestats = parseCVE(cve_id, cve) if vestats[0] > dsastats[1] or cvestats[0] == 0: cvestats[0] = dsastats[1] cvedata = (cvestats[0], dsastats[1]-cvestats[0], cvestats[1], cvestats[2], cvestats[3]) cvetable[cve_id] = cvedata return cvetable ############################################################################### ## Check for updates on Package information def aptsec_update(state, config): args = sys.argv if not('--offline' in args): fetchMeta('Packages') fetchMeta('Sources') fetchMeta('Sha1Sums') if not('--cves' in args): parsePackages('Packages') parseSources('Sources') if not('--nosha1' in args): parseSha1sums('Sha1Sums') if state['vendor'] == 'debian': newAdv = checkDSAs(state, config) else: print('Unsupported distribution. We only support Debian at the moment') system.exit(1) for myid in newAdv: if myid in dsatable: logging.info(state['vendor'] + ' advisory ' + myid + ' already known.\n') elif '--cves' in args: ## scan for CVE urls only? printCVEs(myid, newAdv[myid]) else: ## store advisory and parse it dsatable[myid] = newAdv[myid] updateCVETables(myid) # recompute all pkg statistics for srcpkg in src2dsa: processCVEs(srcpkg) ############################################################################### ## find list of src pkgs from bin pkgs based on pkg2src def resolvePkg2Src(pkglist, pkg2src): srclist = [] for pkg in pkglist: if pkg in pkg2src: srcpkg = pkg2src[pkg] srclist.append(srcpkg) else: logging.info('Could not find source package for: ' + pkg + ' .\n') return srclist ############################################################################### ## compute and store MTBF, MTBR and Scores of each src pkg ## output: %src2mtbf: ## (srcpkg=> (begin, num, delaysum, scoresum, maximpact, MTTF, MTTFl)) def processCVEs(pkg, now, src2dsa, cvetable, config): stats = [now, 0, 0, 0, 0, 0, 0] mylambda = config['TRUST']['lambda'] logging.info('Processing package: ' + pkg + ' .\n') ## @cvestats = (date base-score impact-score exploit-score) for dsa_id in src2dsa[pkg]: for cve_id in dsa2cve[dsa_id]: cvestats[cvetable[cve_id][0]] += 1 stats[1] += 1 stats[2] += cvetable[cve_id][1] stats[3] += cvetable[cve_id][2] if stats[4] < cvetable[cve_id][3]: stats[4] = cvetable[cve_id][3] # Ignore pkgs with less than one incident, should not happen.. if stats[1] < 1: return prev_date = 0 weight = 0 dates = sorted(cvestats, key = cvestats.get) stats[0] = dates[0] for date in dates: pass ## Need to do compute value ##TODO Code to compute trust goes here ############################################################################### ## print some meta-info on internal data def aptsec_about(dsatable, cvetable, pkg2src, src2dsa): num_dsa = len(dsatable) num_cve = len(cvetable) num_pkg = len(pkg2src) num_src = len(src2dsa) print('\nThe current database records %d binary packages and %d DSAs.\n', num_pkg, num_src) print('%d CVEs are assiciated with %d source packages.\n', num_cve, num_src) ############################################################################### ## use scores to suggest alternative packages def aptsec_alternatives(pkg): pass ############################################################################### ## print overview for pkg high scores def aptsec_hitlist(): pass ############################################################################### ## evaluation helper ## compute stats until date given in $2, then compute stats ## for the next year to check accuracy of the prediction. ## @cvestats = (date base-score impact-score exploit-score) def simulate_stats(pkg, year): pass ############################################################################### ##TODO Printing functions ############################################################################### ## show info on a single src pkg, resolv to src if needed def aptsec_show(pkg, state, pkg2src, src2dsa, src2mtbf, cvetable): if state['vendor'] == 'debian': ADV = 'DSA-' else: print('Unsupported distribution. We only support Debian at the moment') system.exit(1) if (not(pkg in src2dsa)) and (pkg in pkg2src): print('\nResolving ' + pkg + ' to ' + pkg2src[pkg] + '\n') pkg = pkg2src[pkg] print('\nThe following binary packages are created from ' + pkg + ' :\n\n') lines = 0 for i in pkg2src: if pkg2src[i] == pkg: print(i + '\n') lines += 1 if lines < 1: print('-\n') if not (pkg in src2dsa and pkg in src2mtbf): print('\nNo vulnerabilities recorded for source package ' + pkg + '.\n') return print('\nAdvisories on package ' + pkg + ':\n\n') for dsa_id in sorted(src2dsa[pkg], key = src2dsa[pkg].get): print(ADV + dsa_id + '\n') for cve_id in dsa2cve[dsa_id]: (sec, minut, hrs, day, mon, yr) = gmtime(cvetable[cve_id][0]) print('%s: Base Score: %04.1f, %02d.%02d.%04d\n', cve_id, cvetable[cve_id][2], day, mon+1, yr+1900) stats = src2mtbf[pkg] (sec, minut, hrs, day, mon, yr) = gmtime(stats[0]) print('Now we print various iformation \n') ############################################################################### ## print help text def aptsec_help(): print('See manual for correct usage\n') ############################################################################### ## Print system status report from component(files) measurements (sha1sums) ## Expected input format is Linux IMA. We assume input was validated. ## ## Note: aptsec_status(), considers *reportedly installed* packages, while this ## one looks at *actually loaded* software that influenced the CPU since bootup. def aptsec_attest(sha1file): pass ## Main Program starts here!! try: action = sys.argv[1] except IndexError: # print('No argument given') # aptsec_help() # sys.exit(0) action = '' client = MongoClient() cve_db = client.cvedb (state, err) = load_state() #detect_distribution() #d = state['cache_dir'] #if not os.path.exists(d): # os.makedirs(d) if action == 'update': load_DBs() # loadsha1lists() aptsec_update() # save_sha1lists() save_DBs() save_state() elif action == 'status': load_DBs or exit(1) #handle errors more gracefully aptsec_status(sys.argv[2]) elif action == 'show': load_DBs or exit(1) #handle errors more gracefully aptsec_show(sys.argv[2]) else: aptsec_help() print(state) save_state(state) #client = MongoClient() #cve_db = client.cvedb #collection = db.cves #testcvss = collection.find_one({"cvss": 9.3}) #print(testcvss)