runner.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Unit test runner, providing new features on top of unittest module:
  6. - colourized output
  7. - parallel run (UNIX only)
  8. - print failures/tracebacks on CTRL+C
  9. - re-run failed tests only (make test-failed).
  10. Invocation examples:
  11. - make test
  12. - make test-failed
  13. Parallel:
  14. - make test-parallel
  15. - make test-process ARGS=--parallel
  16. """
  17. from __future__ import print_function
  18. import atexit
  19. import optparse
  20. import os
  21. import sys
  22. import textwrap
  23. import time
  24. import unittest
  25. try:
  26. import ctypes
  27. except ImportError:
  28. ctypes = None
  29. try:
  30. import concurrencytest # pip install concurrencytest
  31. except ImportError:
  32. concurrencytest = None
  33. import psutil
  34. from psutil._common import hilite
  35. from psutil._common import print_color
  36. from psutil._common import term_supports_colors
  37. from psutil._compat import super
  38. from psutil.tests import CI_TESTING
  39. from psutil.tests import import_module_by_path
  40. from psutil.tests import print_sysinfo
  41. from psutil.tests import reap_children
  42. from psutil.tests import safe_rmpath
  43. VERBOSITY = 2
  44. FAILED_TESTS_FNAME = '.failed-tests.txt'
  45. NWORKERS = psutil.cpu_count() or 1
  46. USE_COLORS = not CI_TESTING and term_supports_colors()
  47. HERE = os.path.abspath(os.path.dirname(__file__))
  48. loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase # noqa
  49. def cprint(msg, color, bold=False, file=None):
  50. if file is None:
  51. file = sys.stderr if color == 'red' else sys.stdout
  52. if USE_COLORS:
  53. print_color(msg, color, bold=bold, file=file)
  54. else:
  55. print(msg, file=file)
  56. class TestLoader:
  57. testdir = HERE
  58. skip_files = ['test_memleaks.py']
  59. if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ:
  60. skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py'])
  61. def _get_testmods(self):
  62. return [os.path.join(self.testdir, x)
  63. for x in os.listdir(self.testdir)
  64. if x.startswith('test_') and x.endswith('.py') and
  65. x not in self.skip_files]
  66. def _iter_testmod_classes(self):
  67. """Iterate over all test files in this directory and return
  68. all TestCase classes in them.
  69. """
  70. for path in self._get_testmods():
  71. mod = import_module_by_path(path)
  72. for name in dir(mod):
  73. obj = getattr(mod, name)
  74. if isinstance(obj, type) and \
  75. issubclass(obj, unittest.TestCase):
  76. yield obj
  77. def all(self):
  78. suite = unittest.TestSuite()
  79. for obj in self._iter_testmod_classes():
  80. test = loadTestsFromTestCase(obj)
  81. suite.addTest(test)
  82. return suite
  83. def last_failed(self):
  84. # ...from previously failed test run
  85. suite = unittest.TestSuite()
  86. if not os.path.isfile(FAILED_TESTS_FNAME):
  87. return suite
  88. with open(FAILED_TESTS_FNAME) as f:
  89. names = f.read().split()
  90. for n in names:
  91. test = unittest.defaultTestLoader.loadTestsFromName(n)
  92. suite.addTest(test)
  93. return suite
  94. def from_name(self, name):
  95. if name.endswith('.py'):
  96. name = os.path.splitext(os.path.basename(name))[0]
  97. return unittest.defaultTestLoader.loadTestsFromName(name)
  98. class ColouredResult(unittest.TextTestResult):
  99. def addSuccess(self, test):
  100. unittest.TestResult.addSuccess(self, test)
  101. cprint("OK", "green")
  102. def addError(self, test, err):
  103. unittest.TestResult.addError(self, test, err)
  104. cprint("ERROR", "red", bold=True)
  105. def addFailure(self, test, err):
  106. unittest.TestResult.addFailure(self, test, err)
  107. cprint("FAIL", "red")
  108. def addSkip(self, test, reason):
  109. unittest.TestResult.addSkip(self, test, reason)
  110. cprint("skipped: %s" % reason.strip(), "brown")
  111. def printErrorList(self, flavour, errors):
  112. flavour = hilite(flavour, "red", bold=flavour == 'ERROR')
  113. super().printErrorList(flavour, errors)
  114. class ColouredTextRunner(unittest.TextTestRunner):
  115. """A coloured text runner which also prints failed tests on
  116. KeyboardInterrupt and save failed tests in a file so that they can
  117. be re-run.
  118. """
  119. resultclass = ColouredResult if USE_COLORS else unittest.TextTestResult
  120. def __init__(self, *args, **kwargs):
  121. super().__init__(*args, **kwargs)
  122. self.failed_tnames = set()
  123. def _makeResult(self):
  124. # Store result instance so that it can be accessed on
  125. # KeyboardInterrupt.
  126. self.result = super()._makeResult()
  127. return self.result
  128. def _write_last_failed(self):
  129. if self.failed_tnames:
  130. with open(FAILED_TESTS_FNAME, "w") as f:
  131. for tname in self.failed_tnames:
  132. f.write(tname + '\n')
  133. def _save_result(self, result):
  134. if not result.wasSuccessful():
  135. for t in result.errors + result.failures:
  136. tname = t[0].id()
  137. self.failed_tnames.add(tname)
  138. def _run(self, suite):
  139. try:
  140. result = super().run(suite)
  141. except (KeyboardInterrupt, SystemExit):
  142. result = self.runner.result
  143. result.printErrors()
  144. raise sys.exit(1)
  145. else:
  146. self._save_result(result)
  147. return result
  148. def _exit(self, success):
  149. if success:
  150. cprint("SUCCESS", "green", bold=True)
  151. safe_rmpath(FAILED_TESTS_FNAME)
  152. sys.exit(0)
  153. else:
  154. cprint("FAILED", "red", bold=True)
  155. self._write_last_failed()
  156. sys.exit(1)
  157. def run(self, suite):
  158. result = self._run(suite)
  159. self._exit(result.wasSuccessful())
  160. class ParallelRunner(ColouredTextRunner):
  161. @staticmethod
  162. def _parallelize(suite):
  163. def fdopen(fd, mode, *kwds):
  164. stream = orig_fdopen(fd, mode)
  165. atexit.register(stream.close)
  166. return stream
  167. # Monkey patch concurrencytest lib bug (fdopen() stream not closed).
  168. # https://github.com/cgoldberg/concurrencytest/issues/11
  169. orig_fdopen = os.fdopen
  170. concurrencytest.os.fdopen = fdopen
  171. forker = concurrencytest.fork_for_tests(NWORKERS)
  172. return concurrencytest.ConcurrentTestSuite(suite, forker)
  173. @staticmethod
  174. def _split_suite(suite):
  175. serial = unittest.TestSuite()
  176. parallel = unittest.TestSuite()
  177. for test in suite:
  178. if test.countTestCases() == 0:
  179. continue
  180. elif isinstance(test, unittest.TestSuite):
  181. test_class = test._tests[0].__class__
  182. elif isinstance(test, unittest.TestCase):
  183. test_class = test
  184. else:
  185. raise TypeError("can't recognize type %r" % test)
  186. if getattr(test_class, '_serialrun', False):
  187. serial.addTest(test)
  188. else:
  189. parallel.addTest(test)
  190. return (serial, parallel)
  191. def run(self, suite):
  192. ser_suite, par_suite = self._split_suite(suite)
  193. par_suite = self._parallelize(par_suite)
  194. # run parallel
  195. cprint("starting parallel tests using %s workers" % NWORKERS,
  196. "green", bold=True)
  197. t = time.time()
  198. par = self._run(par_suite)
  199. par_elapsed = time.time() - t
  200. # At this point we should have N zombies (the workers), which
  201. # will disappear with wait().
  202. orphans = psutil.Process().children()
  203. gone, alive = psutil.wait_procs(orphans, timeout=1)
  204. if alive:
  205. cprint("alive processes %s" % alive, "red")
  206. reap_children()
  207. # run serial
  208. t = time.time()
  209. ser = self._run(ser_suite)
  210. ser_elapsed = time.time() - t
  211. # print
  212. if not par.wasSuccessful() and ser_suite.countTestCases() > 0:
  213. par.printErrors() # print them again at the bottom
  214. par_fails, par_errs, par_skips = map(len, (par.failures,
  215. par.errors,
  216. par.skipped))
  217. ser_fails, ser_errs, ser_skips = map(len, (ser.failures,
  218. ser.errors,
  219. ser.skipped))
  220. print(textwrap.dedent("""
  221. +----------+----------+----------+----------+----------+----------+
  222. | | total | failures | errors | skipped | time |
  223. +----------+----------+----------+----------+----------+----------+
  224. | parallel | %3s | %3s | %3s | %3s | %.2fs |
  225. +----------+----------+----------+----------+----------+----------+
  226. | serial | %3s | %3s | %3s | %3s | %.2fs |
  227. +----------+----------+----------+----------+----------+----------+
  228. """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed,
  229. ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed)))
  230. print("Ran %s tests in %.3fs using %s workers" % (
  231. par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS))
  232. ok = par.wasSuccessful() and ser.wasSuccessful()
  233. self._exit(ok)
  234. def get_runner(parallel=False):
  235. def warn(msg):
  236. cprint(msg + " Running serial tests instead.", "red")
  237. if parallel:
  238. if psutil.WINDOWS:
  239. warn("Can't run parallel tests on Windows.")
  240. elif concurrencytest is None:
  241. warn("concurrencytest module is not installed.")
  242. elif NWORKERS == 1:
  243. warn("Only 1 CPU available.")
  244. else:
  245. return ParallelRunner(verbosity=VERBOSITY)
  246. return ColouredTextRunner(verbosity=VERBOSITY)
  247. # Used by test_*,py modules.
  248. def run_from_name(name):
  249. if CI_TESTING:
  250. print_sysinfo()
  251. suite = TestLoader().from_name(name)
  252. runner = get_runner()
  253. runner.run(suite)
  254. def setup():
  255. psutil._set_debug(True)
  256. def main():
  257. setup()
  258. usage = "python3 -m psutil.tests [opts] [test-name]"
  259. parser = optparse.OptionParser(usage=usage, description="run unit tests")
  260. parser.add_option("--last-failed",
  261. action="store_true", default=False,
  262. help="only run last failed tests")
  263. parser.add_option("--parallel",
  264. action="store_true", default=False,
  265. help="run tests in parallel")
  266. opts, args = parser.parse_args()
  267. if not opts.last_failed:
  268. safe_rmpath(FAILED_TESTS_FNAME)
  269. # loader
  270. loader = TestLoader()
  271. if args:
  272. if len(args) > 1:
  273. parser.print_usage()
  274. return sys.exit(1)
  275. else:
  276. suite = loader.from_name(args[0])
  277. elif opts.last_failed:
  278. suite = loader.last_failed()
  279. else:
  280. suite = loader.all()
  281. if CI_TESTING:
  282. print_sysinfo()
  283. runner = get_runner(opts.parallel)
  284. runner.run(suite)
  285. if __name__ == '__main__':
  286. main()