123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638 |
- #!/usr/bin/python3
- ## Based on the perl code of Trustminer by CASED
- ## Nikos
- import sys
- import os
- from pymongo import MongoClient
- #mongodb assumes database at default path
- import logging, sys
- import configparser
- import json
- import urllib.request
- import datetime
- import debian_advisory as da
- import cveparse as cv
- import matplotlib.pyplot as plt
- import numpy as np
- from dateutil import parser
- logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
- ## Increase the recursion limit by much to allow bs to parse large files
- ## This is not good practise
- sys.setrecursionlimit(6000)
- #load config file as library
- config = configparser.ConfigParser()
- config.read('config_test')
- if config.sections == []:
- print('configuration file not found\n')
- sys.exit(1)
- #global variables
- secperday = 60*60*24
- now = datetime.datetime.now()
- verbosity = 1
- ###############################################################################
- ## logging
- # 1 fatal errors
- # 2 errors
- # 3 note
- # 4 trace
- # 5 debug
- def msg(lvl,msg):
- if lvl <= int(config['LOG']['loglevel']):
- print(msg)
- def debug(msg):
- msg(5, msg)
- # Need to see if this is necessary
- ## load state, different from DBs in that we always need it
- def load_state():
- cache = config['DIR']['cache_dir'] + 'state'
- err = 0
- state = dict()
- try:
- with open(cache) as json_data:
- state = json.load(json_data)
- except FileNotFoundError:
- # Load default state - start from the beginning
- state['cache_dir'] = cache
- state['next_adv'] = 0
- state['next_fsa'] = 0
- state['Packages'] = ''
- state['Sources'] = ''
- state['Sha1Sums'] = ''
- err += 1
- return (state, err)
- ###############################################################################
- ## save state, different from DBs in that we always need it
- def save_state(state):
- cache = config['DIR']['cache_dir'] + 'state'
-
- try:
- with open(cache, 'w') as fp:
- json.dump(state, fp)
- except IOError:
- print('write cache state failed!! Fatal error')
- sys.exit(1)
- ###############################################################################
- ## load sha lists :TODO later
- def load_sha1lists():
- cache = config['DIR']['cache_dir'] + 'state'
- ###############################################################################
- ## save sha lists :TODO later
- def save_sha1lists():
- pass
- ###############################################################################
- ## load from files
- def load_DBs():
-
- dsatable = dict()
- src2dsa = dict()
- dsa2cve = dict()
- cvetable = dict()
-
- cache = config['DIR']['cache_dir']
-
- cache_dsatable = cache + 'dsatable'
- try:
- with open(cache_dsatable) as fp:
- dsatable = json.load(fp)
- except (IOError, ValueError):
- print('read cache dsatable failed!! Maybe first run of the system?')
- cache_src2dsa = cache + 'src2dsa'
- try:
- with open(cache_src2dsa) as fp:
- src2dsa = json.load(fp)
- except (IOError, ValueError):
- print('read cache src2dsa failed!! Maybe first run of the system?')
- cache_dsa2cve = cache + 'dsa2cve'
- try:
- with open(cache_dsa2cve) as fp:
- dsa2cve = json.load(fp)
- except (IOError, ValueError):
- print('read cache dsa2cve failed!! Maybe first run of the system?')
- cache_cvetable = cache + 'cvetable'
- try:
- with open(cache_cvetable) as fp:
- cvetable = json.load(fp)
- except (IOError, ValueError):
- print('read cache cvetable failed!! Maybe first run of the system?')
- return(dsatable, src2dsa, dsa2cve, cvetable)
- ###############################################################################
- ## help for save_DBs
- def myconverter(o):
- if isinstance(o, datetime.datetime) or isinstance(o, datetime.timedelta):
- return str(o)
- ###############################################################################
- ## save to files
- def save_DBs(dsatable, src2dsa, dsa2cve, cvetable):
- cache = config['DIR']['cache_dir']
-
- cache_dsatable = cache + 'dsatable'
- try:
- with open(cache_dsatable, 'w') as fp:
- json.dump(dsatable, fp, default = myconverter)
- except IOError:
- print('write cache dsatable failed!! Fatal error')
- sys.exit(1)
- cache_src2dsa = cache + 'src2dsa'
- try:
- with open(cache_src2dsa, 'w') as fp:
- json.dump(src2dsa, fp)
- except IOError:
- print('write cache src2dsa failed!! Fatal error')
- sys.exit(1)
- cache_dsa2cve = cache + 'dsa2cve'
- try:
- with open(cache_dsa2cve, 'w') as fp:
- json.dump(dsa2cve, fp)
- except IOError:
- print('write cache dsa2cve failed!! Fatal error')
- sys.exit(1)
- cache_cvetable = cache + 'cvetable'
- try:
- with open(cache_cvetable, 'w') as fp:
- json.dump(cvetable, fp, default = myconverter)
- except IOError:
- print('write cache cvetable failed!! Fatal error')
- sys.exit(1)
- ###############################################################################
- ## Fetch current Packages, Sources and sha1sums files
- ## These are needed to find CVE stats by sha1sums/pkg-names
- ## Only Sha1Sums is custom generated, others are from Debian.
- ## FIXME: Server might do on-the-fly gzip (but should not for bzip2)
- ## Return: 1 on success, to signal that new parsing is needed.
- def fetchMeta(filename):
- urlbase = config['URL']['pkg_base_url']
- mydir = config['DIR']['cache_dir']
- bzFile = filename + '.bz2'
- url = urlbase + bzFile
- logging.info('Checking meta file from ' + url + '\n')
- # Download file
- urllib.request.urlretrieve(url, mydir + bzfile)
- # TODO catch exceptions like file not found
- # TODO check if file has changed, if it is new unpack
- ###############################################################################
- # Sources and Packages are not completely consistent, esp for debian-multimedia
- # He we store manual mappings for these..
- def addOrphanPkgs(pkg2src):
- pkg2src['liblame-dev'] = "lame";
- pkg2src['lame-extras'] = "lame";
- pkg2src['moonlight'] = "moon";
- pkg2src['libmoon0'] = "moon";
- pkg2src['xmms-mp4'] = "xmms2";
- pkg2src['xmms-mp4'] = "xmms2";
- pkg2src['lazarus-src-0.9.30'] = "lazarus";
- pkg2src['lazarus-ide-0.9.30'] = "lazarus";
- pkg2src['lcl-qt4-0.9.30'] = "lazarus";
- pkg2src['lazarus-ide-qt4-0.9.30'] = "lazarus";
- pkg2src['lcl-gtk2-0.9.30'] = "lazarus";
- pkg2src['lazarus-ide-gtk2-0.9.30'] = "lazarus";
- pkg2src['lcl-units-0.9.30'] = "lazarus";
- pkg2src['lazarus-0.9.30'] = "lazarus";
- pkg2src['lazarus-doc-0.9.30'] = "lazarus";
- pkg2src['lcl-0.9.30'] = "lazarus";
- pkg2src['lcl-utils-0.9.30'] = "lazarus";
- pkg2src['lcl-nogui-0.9.30'] = "lazarus";
- pkg2src['libx264-65'] = "x264";
- pkg2src['libx264-114'] = "x264";
- pkg2src['libx264-60'] = "x264";
- # pkg2src['libmlt3']
- # pkg2src['libgmerlin-avdec0']
- # pkg2src['libxul-dev']
- # pkg2src['libmyth-0.23.1-0']
- # pkg2src['libmpeg3hv']
- # pkg2src['libquicktimehv']
- # pkg2src['libxul0d']
- # pkg2src['acroread-fonts-kor']
- ###############################################################################
- ## Parse dpkg Packages file, create map deb-name->pkg-name
- def parsePackages(pkgfile):
- mydir = cache = config['DIR']['cache_dir']
- deb2pkg = dict()
- pkg2virt = dict()
- virt2pkg = ()
- logging.info('Parsing Packages file...\n')
- pkgfile = mydir + pkgfile
- #TODO open and parse pkg file
- ###############################################################################
- ## Parse dpkg Sources file, create map pkg-name->src-name
- def parseSources(srcfile):
- mydir = cache = config['DIR']['cache_dir']
- checklinecont = 0
- pkg2src = dict()
- logging.info('Parsing Sources file...\n')
- srcfile = mydir + srcfile
-
- #TODO open and parse sources file
- ###############################################################################
- def getSHA1(myhash, collection):
- return collection.find({"hash": myhash})
- ###############################################################################
- def addSHA1(myhash, deb, src):
- dic = getSHA1(myhash)
- thash = dic["hash"]
- tdeb = dic["deb"]
- tsrc = dic["src"]
- #TODO insert SHA to database
- ###############################################################################
- ## Parse Sha1Sums file. Format: "sha1sum::deb-name::unix-file-path"
- ## Create 2 maps: sha1sum->file, file->deb-name
- def parseSha1Sums(sha1file):
- pass
- ###############################################################################
- ## Parse local dpkg status, return list of debs
- def parseStatus(stsfile):
- pass
- ###############################################################################
- ## Parse Advisory (only Debian supported atm
- def parseAdvisory(adv):
- if state['vendor'] == 'debian':
- return da.parseDSAhtml(adv)
- else:
- print('Unsupported distribution. We only support Debian at the moment')
- system.exit(1)
- ###############################################################################
- ## Manually fix problems with Advisory entries
- def fixAdvisoryQuirks(arg, state, dsastats):
- if state['vendor'] == 'debian':
- return da.fixDSAquirks(arg, dsastats)
- else:
- print('Unsupported distribution. We only support Debian at the moment')
- system.exit(1)
- ###############################################################################
- ## Extract CVE ids from new advisories and print URL for mirror script
- def printCVEs(myid,adv, state):
- logging.info('Looking for CVEs in advisory...\n')
- dsastats = parseAdvisory(adv)
- if dsastats == []:
- return
- ## fix DSAs that don't contain correct CVE refs
- dsastats = fixAdvisoryQuirks(myid, state, dsastats);
- #TODO Fix this part
- ##for cve_id in dsastats
- ###############################################################################
- ## Update internal vuln. DB with new Advisory info
- ## Creates CVEtable for MTBF computation:
- ## ( cve-id => (date, delay, score1, score2, score3))
- def updateCVETables(myid, dsatable, state, src2dsa, dsa2cve, cvetable, client):
- logging.info('Updating vulnerability database with advisory ' + state['vendor'] + str(myid) + ' \n')
-
- adv = dsatable[myid]
- dsastats = parseAdvisory(adv)
- if dsastats == []:
- return
- dsastats = fixAdvisoryQuirks(myid, state, dsastats)
- for srcpkg in dsastats[0]:
- if srcpkg in src2dsa:
- src2dsa[srcpkg].append(myid)
- else:
- src2dsa[srcpkg] = []
- src2dsa[srcpkg].append(myid)
- dsa2cve[str(myid)] = dsastats[2]
- for cve_id in dsastats[2]:
- # No fetch CVE We use mongodb and cve-search
- cve = cv.fetchCVE(cve_id, client)
- cvestats = cv.parseCVE(cve_id, cve)
- # print(cvestats)
- # print(dsastats)
- finaldate = cvestats[0]
- if cvestats[0] > dsastats[1] or cvestats[0] == 0:
- finaldate = dsastats[1]
- cvedata = (finaldate, dsastats[1]-finaldate, cvestats[1], cvestats[2], cvestats[3])
- ## print(cvedata)
- cvetable[cve_id] = cvedata
- return cvetable
- ###############################################################################
- ## Check for updates on Package information
- def aptsec_update(state, config, dsatable, client, src2dsa, dsa2cve, cvetable):
- args = sys.argv
- # if not('--offline' in args):
- # fetchMeta('Packages')
- # fetchMeta('Sources')
- # fetchMeta('Sha1Sums')
- now = datetime.datetime.now()
-
- if not('--cves' in args):
- parsePackages('Packages')
- parseSources('Sources')
-
- # if not('--nosha1' in args):
- # parseSha1sums('Sha1Sums')
- if state['vendor'] == 'debian':
- newAdv = da.checkDSAs(state, config)
- else:
- print('Unsupported distribution. We only support Debian at the moment')
- system.exit(1)
- for myid in newAdv:
- if myid in dsatable:
- logging.info(state['vendor'] + ' advisory ' + myid + ' already known.\n')
- elif '--cves' in args:
- ## scan for CVE urls only?
- printCVEs(myid, newAdv[myid])
- else:
- ## store advisory and parse it
- dsatable[myid] = newAdv[myid]
- updateCVETables(myid, dsatable, state, src2dsa, dsa2cve, cvetable, client)
-
- # recompute all pkg statistics
- for srcpkg in src2dsa:
- processCVEs(srcpkg, now, src2dsa, dsa2cve, cvetable, config)
-
- return 0
- ###############################################################################
- ## find list of src pkgs from bin pkgs based on pkg2src
- def resolvePkg2Src(pkglist, pkg2src):
- srclist = []
- for pkg in pkglist:
- if pkg in pkg2src:
- srcpkg = pkg2src[pkg]
- srclist.append(srcpkg)
- else:
- logging.info('Could not find source package for: ' + pkg + ' .\n')
- return srclist
- ###############################################################################
- ## compute and store MTBF, MTBR and Scores of each src pkg
- ## output: %src2mtbf:
- ## (srcpkg=> ())
- def processCVEs(pkg, now, src2dsa, dsa2cve, cvetable, config):
- stats = [now, 0, 0, 0, 0, 0, 0]
- mylambda = config['TRUST']['lambda']
- cvestats = dict()
- logging.info('Processing package: ' + pkg + '.\n')
- # print(dsa2cve)
- ## @cvestats = (date base-score impact-score exploit-score)
- for dsa_id in src2dsa[pkg]:
- try:
- for cve_id in dsa2cve[dsa_id]:
- tt = cvetable[cve_id][0]
- if tt in cvestats:
- cvestats[cvetable[cve_id][0]] += 1
- else:
- cvestats[cvetable[cve_id][0]] = 1
- stats[1] += 1
- except KeyError:
- for cve_id in dsa2cve[str(dsa_id)]:
- tt = cvetable[cve_id][0]
- if tt in cvestats:
- cvestats[cvetable[cve_id][0]] += 1
- else:
- cvestats[cvetable[cve_id][0]] = 1
- stats[1] += 1
- # Ignore pkgs with less than one incident, should not happen..
- if stats[1] < 1:
- return
- prev_date = 0
- weight = 0
- dates = sorted(cvestats, key = cvestats.get)
- stats[0] = dates[0]
- count = sum(cvestats.values())
- print(pkg + ' ' + str(count))
- if pkg == 'linux-2.6':
- # print(src2dsa[pkg])
- pkg_plot(pkg, cvestats)
- for date in dates:
- pass
- ## Need to do compute value
- ##TODO Code to compute trust goes here
- ###############################################################################
- ## plot vulnerability time distribution for a single package
- def pkg_plot(pkg, cvestats):
-
- colors = list("rgbcmyk")
- items = list(cvestats.items())
- print(items)
- items.sort(key=lambda tup: tup[0])
- x = []
- y = []
- for data_dict in items:
- x.append(parser.parse(data_dict[0]))
- y.append(data_dict[1])
-
- plt.plot_date(x, y)
-
- # plt.legend(data_dict.keys())
- plt.show()
- return 0
- ###############################################################################
- ## print some meta-info on internal data
- def aptsec_about(dsatable, cvetable, pkg2src, src2dsa):
- num_dsa = len(dsatable)
- num_cve = len(cvetable)
- num_pkg = len(pkg2src)
- num_src = len(src2dsa)
- print('\nThe current database records %d binary packages and %d DSAs.\n', num_pkg, num_src)
- print('%d CVEs are assiciated with %d source packages.\n', num_cve, num_src)
- ###############################################################################
- ## use scores to suggest alternative packages
- def aptsec_alternatives(pkg):
- pass
- ###############################################################################
- ## print overview for pkg high scores
- def aptsec_hitlist():
- pass
- ###############################################################################
- ## evaluation helper
- ## compute stats until date given in $2, then compute stats
- ## for the next year to check accuracy of the prediction.
- ## @cvestats = (date base-score impact-score exploit-score)
- def simulate_stats(pkg, year):
- pass
- ###############################################################################
- ##TODO Printing functions
- ###############################################################################
- ## show info on a single src pkg, resolv to src if needed
- def aptsec_show(pkg, state, pkg2src, src2dsa, src2mtbf, cvetable):
- if state['vendor'] == 'debian':
- ADV = 'DSA-'
- else:
- print('Unsupported distribution. We only support Debian at the moment')
- system.exit(1)
- if (not(pkg in src2dsa)) and (pkg in pkg2src):
- print('\nResolving ' + pkg + ' to ' + pkg2src[pkg] + '\n')
- pkg = pkg2src[pkg]
- print('\nThe following binary packages are created from ' + pkg + ' :\n\n')
- lines = 0
-
- for i in pkg2src:
- if pkg2src[i] == pkg:
- print(i + '\n')
- lines += 1
- if lines < 1:
- print('-\n')
- if not (pkg in src2dsa and pkg in src2mtbf):
- print('\nNo vulnerabilities recorded for source package ' + pkg + '.\n')
- return
- print('\nAdvisories on package ' + pkg + ':\n\n')
- for dsa_id in sorted(src2dsa[pkg], key = src2dsa[pkg].get):
- print(ADV + dsa_id + '\n')
- for cve_id in dsa2cve[dsa_id]:
- (sec, minut, hrs, day, mon, yr) = gmtime(cvetable[cve_id][0])
- print('%s: Base Score: %04.1f, %02d.%02d.%04d\n', cve_id, cvetable[cve_id][2], day, mon+1, yr+1900)
- stats = src2mtbf[pkg]
- (sec, minut, hrs, day, mon, yr) = gmtime(stats[0])
- print('Now we print various iformation \n')
- ###############################################################################
- ## print help text
- def aptsec_help():
- print('See manual for correct usage\n')
- ###############################################################################
- ## Print system status report from component(files) measurements (sha1sums)
- ## Expected input format is Linux IMA. We assume input was validated.
- ##
- ## Note: aptsec_status(), considers *reportedly installed* packages, while this
- ## one looks at *actually loaded* software that influenced the CPU since bootup.
- def aptsec_attest(sha1file):
- pass
- ## Main Program starts here!!
- try:
- action = sys.argv[1]
- except IndexError:
- # print('No argument given')
- # aptsec_help()
- # sys.exit(0)
- action = ''
- client = MongoClient()
- dsatable = dict()
- cve_db = client.cvedb
- src2dsa = dict()
- dsa2cve = dict()
- cvetable = dict()
- (state, err) = load_state()
- state['vendor'] = 'debian'
- #detect_distribution()
- #d = state['cache_dir']
- #if not os.path.exists(d):
- # os.makedirs(d)
- if action == 'update':
- (dsatable, src2dsa, dsa2cve, cvetable) = load_DBs()
- # loadsha1lists()
- aptsec_update(state,config, dsatable, client, src2dsa, dsa2cve, cvetable)
- # save_sha1lists()
- save_DBs(dsatable, src2dsa, dsa2cve, cvetable)
- save_state(state)
- elif action == 'status':
- load_DBs or exit(1)
- #handle errors more gracefully
- aptsec_status(sys.argv[2])
- elif action == 'show':
- load_DBs or exit(1)
- #handle errors more gracefully
- aptsec_show(sys.argv[2])
- else:
- aptsec_help()
- #print(state)
- save_state(state)
- #cve_db = client.cvedb
- #collection = db.cves
- #testcvss = collection.find_one({"cvss": 9.3})
- #print(testcvssi
|