1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import atexit
- from threading import Event, Thread, current_thread
- from time import time
- from warnings import warn
- __all__ = ["TMonitor", "TqdmSynchronisationWarning"]
- class TqdmSynchronisationWarning(RuntimeWarning):
- """tqdm multi-thread/-process errors which may cause incorrect nesting
- but otherwise no adverse effects"""
- pass
- class TMonitor(Thread):
- """
- Monitoring thread for tqdm bars.
- Monitors if tqdm bars are taking too much time to display
- and readjusts miniters automatically if necessary.
- Parameters
- ----------
- tqdm_cls : class
- tqdm class to use (can be core tqdm or a submodule).
- sleep_interval : float
- Time to sleep between monitoring checks.
- """
- _test = {} # internal vars for unit testing
- def __init__(self, tqdm_cls, sleep_interval):
- Thread.__init__(self)
- self.daemon = True # kill thread when main killed (KeyboardInterrupt)
- self.woken = 0 # last time woken up, to sync with monitor
- self.tqdm_cls = tqdm_cls
- self.sleep_interval = sleep_interval
- self._time = self._test.get("time", time)
- self.was_killed = self._test.get("Event", Event)()
- atexit.register(self.exit)
- self.start()
- def exit(self):
- self.was_killed.set()
- if self is not current_thread():
- self.join()
- return self.report()
- def get_instances(self):
- # returns a copy of started `tqdm_cls` instances
- return [i for i in self.tqdm_cls._instances.copy()
- # Avoid race by checking that the instance started
- if hasattr(i, 'start_t')]
- def run(self):
- cur_t = self._time()
- while True:
- # After processing and before sleeping, notify that we woke
- # Need to be done just before sleeping
- self.woken = cur_t
- # Sleep some time...
- self.was_killed.wait(self.sleep_interval)
- # Quit if killed
- if self.was_killed.is_set():
- return
- # Then monitor!
- # Acquire lock (to access _instances)
- with self.tqdm_cls.get_lock():
- cur_t = self._time()
- # Check tqdm instances are waiting too long to print
- instances = self.get_instances()
- for instance in instances:
- # Check event in loop to reduce blocking time on exit
- if self.was_killed.is_set():
- return
- # Only if mininterval > 1 (else iterations are just slow)
- # and last refresh exceeded maxinterval
- if (
- instance.miniters > 1
- and (cur_t - instance.last_print_t) >= instance.maxinterval
- ):
- # force bypassing miniters on next iteration
- # (dynamic_miniters adjusts mininterval automatically)
- instance.miniters = 1
- # Refresh now! (works only for manual tqdm)
- instance.refresh(nolock=True)
- # Remove accidental long-lived strong reference
- del instance
- if instances != self.get_instances(): # pragma: nocover
- warn("Set changed size during iteration" +
- " (see https://github.com/tqdm/tqdm/issues/481)",
- TqdmSynchronisationWarning, stacklevel=2)
- # Remove accidental long-lived strong references
- del instances
- def report(self):
- return not self.was_killed.is_set()
|