pool.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. import multiprocessing.pool
  2. import multiprocessing.util as util
  3. from .queue import SimpleQueue
  4. def clean_worker(*args, **kwargs):
  5. import gc
  6. multiprocessing.pool.worker(*args, **kwargs)
  7. # Regular multiprocessing workers don't fully clean up after themselves,
  8. # so we have to explicitly trigger garbage collection to make sure that all
  9. # destructors are called...
  10. gc.collect()
  11. class Pool(multiprocessing.pool.Pool):
  12. """Pool implementation which uses our version of SimpleQueue.
  13. This lets us pass tensors in shared memory across processes instead of
  14. serializing the underlying data."""
  15. def _setup_queues(self):
  16. self._inqueue = SimpleQueue()
  17. self._outqueue = SimpleQueue()
  18. self._quick_put = self._inqueue._writer.send
  19. self._quick_get = self._outqueue._reader.recv
  20. def _repopulate_pool(self):
  21. """Bring the number of pool processes up to the specified number,
  22. for use after reaping workers which have exited.
  23. """
  24. for i in range(self._processes - len(self._pool)):
  25. # changed worker -> clean_worker
  26. args = (self._inqueue, self._outqueue,
  27. self._initializer,
  28. self._initargs, self._maxtasksperchild)
  29. if hasattr(self, '_wrap_exception'):
  30. args += (self._wrap_exception,)
  31. w = self.Process(target=clean_worker, args=args)
  32. self._pool.append(w)
  33. w.name = w.name.replace('Process', 'PoolWorker')
  34. w.daemon = True
  35. w.start()
  36. util.debug('added worker')