|
- #!/usr/bin/env python
- #
- # Smoke test tool for the INET Framework: checks that simulations don't crash
- # or stop with a runtime error when run for a reasonable (CPU) time.
- #
- # Accepts one or more CSV files with (at least) two columns: working directory,
- # and options to opp_run. More than two columns are accepted so that the
- # tool can share the same input files with the fingerprint tester.
- # The program runs the INET simulations in the CSV files, and reports crashes
- # and runtime errors as FAILs.
- #
- # Implementation is based on Python's unit testing library, so it can be
- # integrated into larger test suites with minimal effort.
- #
- # Shares a fair amount of code with the fingerprint tester; those parts
- # should sometime be factored out as a Python module.
- #
- # Author: Andras Varga
- #
- import argparse
- import copy
- import csv
- import glob
- import multiprocessing
- import os
- import re
- import subprocess
- import sys
- import threading
- import time
- import unittest
- from StringIO import StringIO
- inetRoot = os.path.abspath("../..")
- sep = ";" if sys.platform == 'win32' else ':'
- nedPath = inetRoot + "/src" + sep + inetRoot + "/examples" + sep + inetRoot + "/showcases" + sep + inetRoot + "/tutorials"
- inetLib = inetRoot + "/src/INET"
- opp_run = "opp_run"
- cpuTimeLimit = "6s"
- logFile = "test.out"
- memcheck = False
- memcheckFailedErrorCode = 66
- class SmokeTestCaseGenerator():
- fileToSimulationsMap = {}
- def generateFromCSV(self, csvFileList, filterRegexList, excludeFilterRegexList, repeat):
- testcases = []
- for csvFile in csvFileList:
- simulations = self.parseSimulationsTable(csvFile)
- self.fileToSimulationsMap[csvFile] = simulations
- testcases.extend(self.generateFromDictList(simulations, filterRegexList, excludeFilterRegexList, repeat))
- return testcases
- def generateFromDictList(self, simulations, filterRegexList, excludeFilterRegexList, repeat):
- class StoreFingerprintCallback:
- def __init__(self, simulation):
- self.simulation = simulation
- def __call__(self, fingerprint):
- self.simulation['computedFingerprint'] = fingerprint
- class StoreExitcodeCallback:
- def __init__(self, simulation):
- self.simulation = simulation
- def __call__(self, exitcode):
- self.simulation['exitcode'] = exitcode
- testcases = []
- for simulation in simulations:
- title = simulation['wd'] + " " + simulation['args']
- if not filterRegexList or ['x' for regex in filterRegexList if re.search(regex, title)]: # if any regex matches title
- if not excludeFilterRegexList or not ['x' for regex in excludeFilterRegexList if re.search(regex, title)]: # if NO exclude-regex matches title
- testcases.append(SmokeTestCase(title, simulation['file'], simulation['wd'], simulation['args']))
- return testcases
- def commentRemover(self, csvData):
- p = re.compile(' *#.*$')
- for line in csvData:
- yield p.sub('',line)
- # parse the CSV into a list of dicts
- def parseSimulationsTable(self, csvFile):
- simulations = []
- f = open(csvFile, 'rb')
- csvReader = csv.reader(self.commentRemover(f), delimiter=',', quotechar='"', skipinitialspace=True)
- for fields in csvReader:
- if len(fields) == 0:
- continue # empty line
- if len(fields) != 2:
- raise Exception("Line " + str(csvReader.line_num) + " must contain 2 items, but contains " + str(len(fields)) + ": " + '"' + '", "'.join(fields) + '"')
- simulations.append({'file': csvFile, 'line' : csvReader.line_num, 'wd': fields[0], 'args': fields[1]})
- f.close()
- return simulations
- class SimulationResult:
- def __init__(self, command, workingdir, exitcode, errorMsg=None, isFingerprintOK=None,
- computedFingerprint=None, simulatedTime=None, numEvents=None, elapsedTime=None, cpuTimeLimitReached=None):
- self.command = command
- self.workingdir = workingdir
- self.exitcode = exitcode
- self.errorMsg = errorMsg
- self.isFingerprintOK = isFingerprintOK
- self.computedFingerprint = computedFingerprint
- self.simulatedTime = simulatedTime
- self.numEvents = numEvents
- self.elapsedTime = elapsedTime
- self.cpuTimeLimitReached = cpuTimeLimitReached
- class SimulationTestCase(unittest.TestCase):
- def runSimulation(self, title, command, workingdir):
- global logFile
- # run the program and log the output
- t0 = time.time()
- (exitcode, out) = self.runProgram(command, workingdir)
- elapsedTime = time.time() - t0
- FILE = open(logFile, "a")
- FILE.write("------------------------------------------------------\n"
- + "Running: " + title + "\n\n"
- + "$ cd " + workingdir + "\n"
- + "$ " + command + "\n\n"
- + out.strip() + "\n\n"
- + "Exit code: " + str(exitcode) + "\n"
- + "Elapsed time: " + str(round(elapsedTime,2)) + "s\n\n")
- FILE.close()
- result = SimulationResult(command, workingdir, exitcode, elapsedTime=elapsedTime)
- # process error messages
- errorLines = re.findall("<!>.*", out, re.M)
- errorMsg = ""
- for err in errorLines:
- err = err.strip()
- if re.search("Fingerprint", err):
- if re.search("successfully", err):
- result.isFingerprintOK = True
- else:
- m = re.search("(computed|calculated): ([-a-zA-Z0-9]+)", err)
- if m:
- result.isFingerprintOK = False
- result.computedFingerprint = m.group(2)
- else:
- raise Exception("Cannot parse fingerprint-related error message: " + err)
- else:
- errorMsg += "\n" + err
- if re.search("CPU time limit reached", err):
- result.cpuTimeLimitReached = True
- m = re.search("at event #([0-9]+), t=([0-9]*(\\.[0-9]+)?)", err)
- if m:
- result.numEvents = int(m.group(1))
- result.simulatedTime = float(m.group(2))
- result.errormsg = errorMsg.strip()
- return result
- def runProgram(self, command, workingdir):
- process = subprocess.Popen(command, shell=True, cwd=workingdir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- out = process.communicate()[0]
- out = re.sub("\r", "", out)
- return (process.returncode, out)
- class SmokeTestCase(SimulationTestCase):
- def __init__(self, title, csvFile, wd, args):
- SimulationTestCase.__init__(self)
- self.title = title
- self.csvFile = csvFile
- self.wd = wd
- self.args = args
- def runTest(self):
- global inetRoot, opp_run, nedPath, cpuTimeLimit, memcheck
- # run the simulation
- workingdir = _iif(self.wd.startswith('/'), inetRoot + "/" + self.wd, self.wd)
- resultdir = os.path.abspath(".") + "/results/";
- if not os.path.exists(resultdir):
- os.makedirs(resultdir)
- memcheckPrefix = ""
- if memcheck:
- memcheckPrefix = "valgrind --leak-check=full --track-origins=yes --error-exitcode=" + str(memcheckFailedErrorCode) + " --suppressions=" + inetRoot + "/tests/smoke/valgrind.supp "
- command = memcheckPrefix + opp_run + " -n " + nedPath + " -l " + inetLib + " -u Cmdenv " + self.args + \
- " --cpu-time-limit=" + cpuTimeLimit + \
- " --vector-recording=false --scalar-recording=false" \
- " --result-dir=" + resultdir
- # print "COMMAND: " + command + '\n'
- result = self.runSimulation(self.title, command, workingdir)
- # process the result
- if result.exitcode == memcheckFailedErrorCode:
- assert False, "memcheck detected errors"
- elif result.exitcode != 0:
- assert False, result.errormsg
- elif result.simulatedTime == 0:
- assert False, "zero time simulated"
- else:
- pass
- def __str__(self):
- return self.title
- class ThreadSafeIter:
- """Takes an iterator/generator and makes it thread-safe by
- serializing call to the `next` method of given iterator/generator.
- """
- def __init__(self, it):
- self.it = it
- self.lock = threading.Lock()
- def __iter__(self):
- return self
- def next(self):
- with self.lock:
- return self.it.next()
- class ThreadedTestSuite(unittest.BaseTestSuite):
- """ runs toplevel tests in n threads
- """
- # How many test process at the time.
- thread_count = multiprocessing.cpu_count()
- def run(self, result):
- it = ThreadSafeIter(self.__iter__())
- result.buffered = True
- threads = []
- for i in range(self.thread_count):
- # Create self.thread_count number of threads that together will
- # cooperate removing every ip in the list. Each thread will do the
- # job as fast as it can.
- t = threading.Thread(target=self.runThread, args=(result, it))
- t.daemon = True
- t.start()
- threads.append(t)
- # Wait until all the threads are done. .join() is blocking.
- #for t in threads:
- # t.join()
- runApp = True
- while runApp and threading.active_count() > 1:
- try:
- time.sleep(0.1)
- except KeyboardInterrupt:
- runApp = False
- return result
- def runThread(self, result, it):
- tresult = result.startThread()
- for test in it:
- if result.shouldStop:
- break
- test(tresult)
- tresult.stopThread()
- class ThreadedTestResult(unittest.TestResult):
- """TestResult with threads
- """
- def __init__(self, stream=None, descriptions=None, verbosity=None):
- super(ThreadedTestResult, self).__init__()
- self.parent = None
- self.lock = threading.Lock()
- def startThread(self):
- ret = copy.copy(self)
- ret.parent = self
- return ret
- def stop():
- super(ThreadedTestResult, self).stop()
- if self.parent:
- self.parent.stop()
- def stopThread(self):
- if self.parent == None:
- return 0
- self.parent.testsRun += self.testsRun
- return 1
- def startTest(self, test):
- "Called when the given test is about to be run"
- super(ThreadedTestResult, self).startTest(test)
- self.oldstream = self.stream
- self.stream = StringIO()
- def stopTest(self, test):
- """Called when the given test has been run"""
- super(ThreadedTestResult, self).stopTest(test)
- out = self.stream.getvalue()
- with self.lock:
- self.stream = self.oldstream
- self.stream.write(out)
- #
- # Copy/paste of TextTestResult, with minor modifications in the output:
- # we want to print the error text after ERROR and FAIL, but we don't want
- # to print stack traces.
- #
- class SimulationTextTestResult(ThreadedTestResult):
- """A test result class that can print formatted text results to a stream.
- Used by TextTestRunner.
- """
- separator1 = '=' * 70
- separator2 = '-' * 70
- def __init__(self, stream, descriptions, verbosity):
- super(SimulationTextTestResult, self).__init__()
- self.stream = stream
- self.showAll = verbosity > 1
- self.dots = verbosity == 1
- self.descriptions = descriptions
- def getDescription(self, test):
- doc_first_line = test.shortDescription()
- if self.descriptions and doc_first_line:
- return '\n'.join((str(test), doc_first_line))
- else:
- return str(test)
- def startTest(self, test):
- super(SimulationTextTestResult, self).startTest(test)
- if self.showAll:
- self.stream.write(self.getDescription(test))
- self.stream.write(" ... ")
- self.stream.flush()
- def addSuccess(self, test):
- super(SimulationTextTestResult, self).addSuccess(test)
- if self.showAll:
- self.stream.write(": PASS\n")
- elif self.dots:
- self.stream.write('.')
- self.stream.flush()
- def addError(self, test, err):
- # modified
- super(SimulationTextTestResult, self).addError(test, err)
- self.errors[-1] = (test, err[1]) # super class method inserts stack trace; we don't need that, so overwrite it
- if self.showAll:
- (cause, detail) = self._splitMsg(err[1])
- self.stream.write(": ERROR (%s)\n" % cause)
- if detail:
- self.stream.write(detail)
- self.stream.write("\n")
- elif self.dots:
- self.stream.write('E')
- self.stream.flush()
- def addFailure(self, test, err):
- # modified
- super(SimulationTextTestResult, self).addFailure(test, err)
- self.failures[-1] = (test, err[1]) # super class method inserts stack trace; we don't need that, so overwrite it
- if self.showAll:
- (cause, detail) = self._splitMsg(err[1])
- self.stream.write(": FAIL (%s)\n" % cause)
- if detail:
- self.stream.write(detail)
- self.stream.write("\n")
- elif self.dots:
- self.stream.write('F')
- self.stream.flush()
- def addSkip(self, test, reason):
- super(SimulationTextTestResult, self).addSkip(test, reason)
- if self.showAll:
- self.stream.write(": skipped {0!r}".format(reason))
- self.stream.write("\n")
- elif self.dots:
- self.stream.write("s")
- self.stream.flush()
- def addExpectedFailure(self, test, err):
- super(SimulationTextTestResult, self).addExpectedFailure(test, err)
- if self.showAll:
- self.stream.write(": expected failure\n")
- elif self.dots:
- self.stream.write("x")
- self.stream.flush()
- def addUnexpectedSuccess(self, test):
- super(SimulationTextTestResult, self).addUnexpectedSuccess(test)
- if self.showAll:
- self.stream.write(": unexpected success\n")
- elif self.dots:
- self.stream.write("u")
- self.stream.flush()
- def printErrors(self):
- # modified
- if self.dots or self.showAll:
- self.stream.write("\n")
- self.printErrorList('Errors', self.errors)
- self.printErrorList('Failures', self.failures)
- def printErrorList(self, flavour, errors):
- # modified
- if errors:
- self.stream.writeln("%s:" % flavour)
- for test, err in errors:
- self.stream.write(" %s (%s)\n" % (self.getDescription(test), self._splitMsg(err)[0]))
- def _splitMsg(self, msg):
- cause = str(msg)
- detail = None
- if cause.count(': '):
- (cause, detail) = cause.split(': ',1)
- return (cause, detail)
- def _iif(cond,t,f):
- return t if cond else f
- def ensure_dir(f):
- if not os.path.exists(f):
- os.makedirs(f)
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Run the fingerprint tests specified in the input files.')
- parser.add_argument('testspecfiles', nargs='*', metavar='testspecfile', help='CSV files that contain the tests to run (default: *.csv). Expected CSV file columns: workingdir, args, simtimelimit, fingerprint')
- parser.add_argument('-m', '--match', action='append', metavar='regex', help='Line filter: a line (more precisely, workingdir+SPACE+args) must match any of the regular expressions in order for that test case to be run')
- parser.add_argument('-x', '--exclude', action='append', metavar='regex', help='Negative line filter: a line (more precisely, workingdir+SPACE+args) must NOT match any of the regular expressions in order for that test case to be run')
- parser.add_argument('-c', '--memcheck', default=False, dest='memcheck', action='store_true', help='run using valgrind memcheck (default: false)')
- parser.add_argument('-s', '--cpu-time-limit', default='6s', dest='cpuTimeLimit', help='cpu time limit (default: 6s)')
- parser.add_argument('-t', '--threads', type=int, default=multiprocessing.cpu_count(), help='number of parallel threads (default: number of CPUs, currently '+str(multiprocessing.cpu_count())+')')
- parser.add_argument('-l', '--logfile', default='test.out', dest='logFile', help='name of logfile (default: test.out)')
- args = parser.parse_args()
- memcheck = args.memcheck
- cpuTimeLimit = args.cpuTimeLimit
- logFile = args.logFile
- if os.path.isfile(logFile):
- FILE = open(logFile, "w")
- FILE.close()
- if not args.testspecfiles:
- args.testspecfiles = glob.glob('*.csv')
- generator = SmokeTestCaseGenerator()
- testcases = generator.generateFromCSV(args.testspecfiles, args.match, args.exclude, 1)
- testSuite = ThreadedTestSuite()
- testSuite.addTests(testcases)
- testSuite.thread_count = args.threads
- testRunner = unittest.TextTestRunner(stream=sys.stdout, verbosity=9, resultclass=SimulationTextTestResult)
- testRunner.run(testSuite)
- print
- print "Log has been saved to %s" % logFile
|