queue.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. import io
  2. import multiprocessing.queues
  3. from multiprocessing.reduction import ForkingPickler
  4. import pickle
  5. class ConnectionWrapper:
  6. """Proxy class for _multiprocessing.Connection which uses ForkingPickler to
  7. serialize objects"""
  8. def __init__(self, conn):
  9. self.conn = conn
  10. def send(self, obj):
  11. buf = io.BytesIO()
  12. ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
  13. self.send_bytes(buf.getvalue())
  14. def recv(self):
  15. buf = self.recv_bytes()
  16. return pickle.loads(buf)
  17. def __getattr__(self, name):
  18. if 'conn' in self.__dict__:
  19. return getattr(self.conn, name)
  20. raise AttributeError("'{}' object has no attribute '{}'".format(
  21. type(self).__name__, 'conn'))
  22. class Queue(multiprocessing.queues.Queue):
  23. def __init__(self, *args, **kwargs):
  24. super().__init__(*args, **kwargs)
  25. self._reader: ConnectionWrapper = ConnectionWrapper(self._reader)
  26. self._writer: ConnectionWrapper = ConnectionWrapper(self._writer)
  27. self._send = self._writer.send
  28. self._recv = self._reader.recv
  29. class SimpleQueue(multiprocessing.queues.SimpleQueue):
  30. def _make_methods(self):
  31. if not isinstance(self._reader, ConnectionWrapper):
  32. self._reader: ConnectionWrapper = ConnectionWrapper(self._reader)
  33. self._writer: ConnectionWrapper = ConnectionWrapper(self._writer)
  34. super()._make_methods() # type: ignore[misc]