123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- #!/usr/bin/env python3
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Unit test runner, providing new features on top of unittest module:
- - colourized output
- - parallel run (UNIX only)
- - print failures/tracebacks on CTRL+C
- - re-run failed tests only (make test-failed).
- Invocation examples:
- - make test
- - make test-failed
- Parallel:
- - make test-parallel
- - make test-process ARGS=--parallel
- """
- from __future__ import print_function
- import atexit
- import optparse
- import os
- import sys
- import textwrap
- import time
- import unittest
- try:
- import ctypes
- except ImportError:
- ctypes = None
- try:
- import concurrencytest # pip install concurrencytest
- except ImportError:
- concurrencytest = None
- import psutil
- from psutil._common import hilite
- from psutil._common import print_color
- from psutil._common import term_supports_colors
- from psutil._compat import super
- from psutil.tests import CI_TESTING
- from psutil.tests import import_module_by_path
- from psutil.tests import print_sysinfo
- from psutil.tests import reap_children
- from psutil.tests import safe_rmpath
- VERBOSITY = 2
- FAILED_TESTS_FNAME = '.failed-tests.txt'
- NWORKERS = psutil.cpu_count() or 1
- USE_COLORS = not CI_TESTING and term_supports_colors()
- HERE = os.path.abspath(os.path.dirname(__file__))
- loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase # noqa
- def cprint(msg, color, bold=False, file=None):
- if file is None:
- file = sys.stderr if color == 'red' else sys.stdout
- if USE_COLORS:
- print_color(msg, color, bold=bold, file=file)
- else:
- print(msg, file=file)
- class TestLoader:
- testdir = HERE
- skip_files = ['test_memleaks.py']
- if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ:
- skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py'])
- def _get_testmods(self):
- return [os.path.join(self.testdir, x)
- for x in os.listdir(self.testdir)
- if x.startswith('test_') and x.endswith('.py') and
- x not in self.skip_files]
- def _iter_testmod_classes(self):
- """Iterate over all test files in this directory and return
- all TestCase classes in them.
- """
- for path in self._get_testmods():
- mod = import_module_by_path(path)
- for name in dir(mod):
- obj = getattr(mod, name)
- if isinstance(obj, type) and \
- issubclass(obj, unittest.TestCase):
- yield obj
- def all(self):
- suite = unittest.TestSuite()
- for obj in self._iter_testmod_classes():
- test = loadTestsFromTestCase(obj)
- suite.addTest(test)
- return suite
- def last_failed(self):
- # ...from previously failed test run
- suite = unittest.TestSuite()
- if not os.path.isfile(FAILED_TESTS_FNAME):
- return suite
- with open(FAILED_TESTS_FNAME) as f:
- names = f.read().split()
- for n in names:
- test = unittest.defaultTestLoader.loadTestsFromName(n)
- suite.addTest(test)
- return suite
- def from_name(self, name):
- if name.endswith('.py'):
- name = os.path.splitext(os.path.basename(name))[0]
- return unittest.defaultTestLoader.loadTestsFromName(name)
- class ColouredResult(unittest.TextTestResult):
- def addSuccess(self, test):
- unittest.TestResult.addSuccess(self, test)
- cprint("OK", "green")
- def addError(self, test, err):
- unittest.TestResult.addError(self, test, err)
- cprint("ERROR", "red", bold=True)
- def addFailure(self, test, err):
- unittest.TestResult.addFailure(self, test, err)
- cprint("FAIL", "red")
- def addSkip(self, test, reason):
- unittest.TestResult.addSkip(self, test, reason)
- cprint("skipped: %s" % reason.strip(), "brown")
- def printErrorList(self, flavour, errors):
- flavour = hilite(flavour, "red", bold=flavour == 'ERROR')
- super().printErrorList(flavour, errors)
- class ColouredTextRunner(unittest.TextTestRunner):
- """A coloured text runner which also prints failed tests on
- KeyboardInterrupt and save failed tests in a file so that they can
- be re-run.
- """
- resultclass = ColouredResult if USE_COLORS else unittest.TextTestResult
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.failed_tnames = set()
- def _makeResult(self):
- # Store result instance so that it can be accessed on
- # KeyboardInterrupt.
- self.result = super()._makeResult()
- return self.result
- def _write_last_failed(self):
- if self.failed_tnames:
- with open(FAILED_TESTS_FNAME, "w") as f:
- for tname in self.failed_tnames:
- f.write(tname + '\n')
- def _save_result(self, result):
- if not result.wasSuccessful():
- for t in result.errors + result.failures:
- tname = t[0].id()
- self.failed_tnames.add(tname)
- def _run(self, suite):
- try:
- result = super().run(suite)
- except (KeyboardInterrupt, SystemExit):
- result = self.runner.result
- result.printErrors()
- raise sys.exit(1)
- else:
- self._save_result(result)
- return result
- def _exit(self, success):
- if success:
- cprint("SUCCESS", "green", bold=True)
- safe_rmpath(FAILED_TESTS_FNAME)
- sys.exit(0)
- else:
- cprint("FAILED", "red", bold=True)
- self._write_last_failed()
- sys.exit(1)
- def run(self, suite):
- result = self._run(suite)
- self._exit(result.wasSuccessful())
- class ParallelRunner(ColouredTextRunner):
- @staticmethod
- def _parallelize(suite):
- def fdopen(fd, mode, *kwds):
- stream = orig_fdopen(fd, mode)
- atexit.register(stream.close)
- return stream
- # Monkey patch concurrencytest lib bug (fdopen() stream not closed).
- # https://github.com/cgoldberg/concurrencytest/issues/11
- orig_fdopen = os.fdopen
- concurrencytest.os.fdopen = fdopen
- forker = concurrencytest.fork_for_tests(NWORKERS)
- return concurrencytest.ConcurrentTestSuite(suite, forker)
- @staticmethod
- def _split_suite(suite):
- serial = unittest.TestSuite()
- parallel = unittest.TestSuite()
- for test in suite:
- if test.countTestCases() == 0:
- continue
- elif isinstance(test, unittest.TestSuite):
- test_class = test._tests[0].__class__
- elif isinstance(test, unittest.TestCase):
- test_class = test
- else:
- raise TypeError("can't recognize type %r" % test)
- if getattr(test_class, '_serialrun', False):
- serial.addTest(test)
- else:
- parallel.addTest(test)
- return (serial, parallel)
- def run(self, suite):
- ser_suite, par_suite = self._split_suite(suite)
- par_suite = self._parallelize(par_suite)
- # run parallel
- cprint("starting parallel tests using %s workers" % NWORKERS,
- "green", bold=True)
- t = time.time()
- par = self._run(par_suite)
- par_elapsed = time.time() - t
- # At this point we should have N zombies (the workers), which
- # will disappear with wait().
- orphans = psutil.Process().children()
- gone, alive = psutil.wait_procs(orphans, timeout=1)
- if alive:
- cprint("alive processes %s" % alive, "red")
- reap_children()
- # run serial
- t = time.time()
- ser = self._run(ser_suite)
- ser_elapsed = time.time() - t
- # print
- if not par.wasSuccessful() and ser_suite.countTestCases() > 0:
- par.printErrors() # print them again at the bottom
- par_fails, par_errs, par_skips = map(len, (par.failures,
- par.errors,
- par.skipped))
- ser_fails, ser_errs, ser_skips = map(len, (ser.failures,
- ser.errors,
- ser.skipped))
- print(textwrap.dedent("""
- +----------+----------+----------+----------+----------+----------+
- | | total | failures | errors | skipped | time |
- +----------+----------+----------+----------+----------+----------+
- | parallel | %3s | %3s | %3s | %3s | %.2fs |
- +----------+----------+----------+----------+----------+----------+
- | serial | %3s | %3s | %3s | %3s | %.2fs |
- +----------+----------+----------+----------+----------+----------+
- """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed,
- ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed)))
- print("Ran %s tests in %.3fs using %s workers" % (
- par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS))
- ok = par.wasSuccessful() and ser.wasSuccessful()
- self._exit(ok)
- def get_runner(parallel=False):
- def warn(msg):
- cprint(msg + " Running serial tests instead.", "red")
- if parallel:
- if psutil.WINDOWS:
- warn("Can't run parallel tests on Windows.")
- elif concurrencytest is None:
- warn("concurrencytest module is not installed.")
- elif NWORKERS == 1:
- warn("Only 1 CPU available.")
- else:
- return ParallelRunner(verbosity=VERBOSITY)
- return ColouredTextRunner(verbosity=VERBOSITY)
- # Used by test_*,py modules.
- def run_from_name(name):
- if CI_TESTING:
- print_sysinfo()
- suite = TestLoader().from_name(name)
- runner = get_runner()
- runner.run(suite)
- def setup():
- psutil._set_debug(True)
- def main():
- setup()
- usage = "python3 -m psutil.tests [opts] [test-name]"
- parser = optparse.OptionParser(usage=usage, description="run unit tests")
- parser.add_option("--last-failed",
- action="store_true", default=False,
- help="only run last failed tests")
- parser.add_option("--parallel",
- action="store_true", default=False,
- help="run tests in parallel")
- opts, args = parser.parse_args()
- if not opts.last_failed:
- safe_rmpath(FAILED_TESTS_FNAME)
- # loader
- loader = TestLoader()
- if args:
- if len(args) > 1:
- parser.print_usage()
- return sys.exit(1)
- else:
- suite = loader.from_name(args[0])
- elif opts.last_failed:
- suite = loader.last_failed()
- else:
- suite = loader.all()
- if CI_TESTING:
- print_sysinfo()
- runner = get_runner(opts.parallel)
- runner.run(suite)
- if __name__ == '__main__':
- main()
|