_monitor.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import atexit
  2. from threading import Event, Thread, current_thread
  3. from time import time
  4. from warnings import warn
  5. __all__ = ["TMonitor", "TqdmSynchronisationWarning"]
  6. class TqdmSynchronisationWarning(RuntimeWarning):
  7. """tqdm multi-thread/-process errors which may cause incorrect nesting
  8. but otherwise no adverse effects"""
  9. pass
  10. class TMonitor(Thread):
  11. """
  12. Monitoring thread for tqdm bars.
  13. Monitors if tqdm bars are taking too much time to display
  14. and readjusts miniters automatically if necessary.
  15. Parameters
  16. ----------
  17. tqdm_cls : class
  18. tqdm class to use (can be core tqdm or a submodule).
  19. sleep_interval : float
  20. Time to sleep between monitoring checks.
  21. """
  22. _test = {} # internal vars for unit testing
  23. def __init__(self, tqdm_cls, sleep_interval):
  24. Thread.__init__(self)
  25. self.daemon = True # kill thread when main killed (KeyboardInterrupt)
  26. self.woken = 0 # last time woken up, to sync with monitor
  27. self.tqdm_cls = tqdm_cls
  28. self.sleep_interval = sleep_interval
  29. self._time = self._test.get("time", time)
  30. self.was_killed = self._test.get("Event", Event)()
  31. atexit.register(self.exit)
  32. self.start()
  33. def exit(self):
  34. self.was_killed.set()
  35. if self is not current_thread():
  36. self.join()
  37. return self.report()
  38. def get_instances(self):
  39. # returns a copy of started `tqdm_cls` instances
  40. return [i for i in self.tqdm_cls._instances.copy()
  41. # Avoid race by checking that the instance started
  42. if hasattr(i, 'start_t')]
  43. def run(self):
  44. cur_t = self._time()
  45. while True:
  46. # After processing and before sleeping, notify that we woke
  47. # Need to be done just before sleeping
  48. self.woken = cur_t
  49. # Sleep some time...
  50. self.was_killed.wait(self.sleep_interval)
  51. # Quit if killed
  52. if self.was_killed.is_set():
  53. return
  54. # Then monitor!
  55. # Acquire lock (to access _instances)
  56. with self.tqdm_cls.get_lock():
  57. cur_t = self._time()
  58. # Check tqdm instances are waiting too long to print
  59. instances = self.get_instances()
  60. for instance in instances:
  61. # Check event in loop to reduce blocking time on exit
  62. if self.was_killed.is_set():
  63. return
  64. # Only if mininterval > 1 (else iterations are just slow)
  65. # and last refresh exceeded maxinterval
  66. if (
  67. instance.miniters > 1
  68. and (cur_t - instance.last_print_t) >= instance.maxinterval
  69. ):
  70. # force bypassing miniters on next iteration
  71. # (dynamic_miniters adjusts mininterval automatically)
  72. instance.miniters = 1
  73. # Refresh now! (works only for manual tqdm)
  74. instance.refresh(nolock=True)
  75. # Remove accidental long-lived strong reference
  76. del instance
  77. if instances != self.get_instances(): # pragma: nocover
  78. warn("Set changed size during iteration" +
  79. " (see https://github.com/tqdm/tqdm/issues/481)",
  80. TqdmSynchronisationWarning, stacklevel=2)
  81. # Remove accidental long-lived strong references
  82. del instances
  83. def report(self):
  84. return not self.was_killed.is_set()