reductions.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. import torch
  2. import torch.utils.hooks
  3. from torch._namedtensor_internals import check_serializing_named_tensor
  4. import os
  5. import threading
  6. import multiprocessing
  7. from multiprocessing.util import register_after_fork
  8. from multiprocessing.reduction import ForkingPickler
  9. from typing import Union
  10. try:
  11. # Early load resource_sharer to prevent a partially initialized instance
  12. # from being inherited in a forked child process. The reduce_storage method
  13. # requires this module indirectly through DupFd(). The built-in mp.Queue
  14. # class pickles arguments in a background thread which may overlap with the
  15. # fork.
  16. import multiprocessing.resource_sharer
  17. except ImportError:
  18. pass
  19. class StorageWeakRef:
  20. r"""A weak reference to a Storage.
  21. The cdata member is a Python number containing the integer representation of
  22. the Storage pointer."""
  23. def __init__(self, storage):
  24. self.cdata = storage._weak_ref()
  25. # Save a direct reference to _free_weak_ref because the `torch` module
  26. # might be cleared during Python shutdown before this module is cleared.
  27. self._free_weak_ref = torch.Storage._free_weak_ref # type: ignore[attr-defined]
  28. def expired(self):
  29. return torch.Storage._expired(self.cdata) # type: ignore[attr-defined]
  30. def __del__(self):
  31. self._free_weak_ref(self.cdata)
  32. def __hash__(self):
  33. return self.cdata
  34. def __eq__(self, other):
  35. if id(self) == id(other):
  36. return True
  37. return self.cdata == other.cdata
  38. class SharedCache(dict):
  39. """dictionary from multiprocessing handles to StorageWeakRef"""
  40. def __init__(self):
  41. # free_dead_references() is called if the len exceeds the current
  42. # limit. The limit scales with the number of remaining live objects.
  43. self.limit = 128
  44. # `fork` inherits lock state, so in case we fork when the lock is held,
  45. # we register a function to reset the lock to a new object to avoid
  46. # possible deadlocks, following python multiprocessing library design.
  47. self._after_fork()
  48. register_after_fork(self, SharedCache._after_fork)
  49. def _after_fork(self):
  50. self.lock = threading.Lock()
  51. def get(self, key):
  52. with self.lock:
  53. return dict.get(self, key)
  54. def __setitem__(self, key, storage_ref):
  55. with self.lock:
  56. dict.__setitem__(self, key, storage_ref)
  57. if len(self) > self.limit:
  58. self.free_dead_references()
  59. def free_dead_references(self):
  60. live = 0
  61. for key, storage_ref in list(self.items()):
  62. if storage_ref.expired():
  63. del self[key]
  64. else:
  65. live += 1
  66. self.limit = max(128, live * 2)
  67. # mapping from handles to StorageWeakRef objects
  68. shared_cache = SharedCache()
  69. def rebuild_event(device, handle):
  70. return torch.cuda.Event.from_ipc_handle(device, handle)
  71. def reduce_event(event):
  72. handle = event.ipc_handle()
  73. return (rebuild_event, (event.device, handle))
  74. def rebuild_tensor(cls, storage, metadata):
  75. storage_offset, size, stride, requires_grad = metadata
  76. t = torch._utils._rebuild_tensor(storage, storage_offset, size, stride)
  77. if cls == torch.nn.parameter.Parameter:
  78. # we have to pass requires_grad into constructor, rather than set it as an
  79. # attribute later, because it's an important check for Integer Tensors to
  80. # have requires_grad=False (or else they raise an error)
  81. t = torch.nn.parameter.Parameter(t, requires_grad=requires_grad)
  82. else:
  83. t.requires_grad = requires_grad
  84. return t
  85. def rebuild_cuda_tensor(tensor_cls, tensor_size, tensor_stride, tensor_offset,
  86. storage_cls, dtype, storage_device, storage_handle, storage_size_bytes, storage_offset_bytes,
  87. requires_grad, ref_counter_handle, ref_counter_offset, event_handle, event_sync_required):
  88. # If storage_handle is None, storage points to nullptr.
  89. if storage_handle is None or storage_size_bytes == 0:
  90. storage = storage_cls(0, dtype=dtype, device=storage_device, _internal=True)
  91. else:
  92. storage = storage_from_cache(storage_cls, (storage_handle, storage_offset_bytes))
  93. if storage is None:
  94. torch.cuda._lazy_init()
  95. storage = storage_cls._new_shared_cuda(
  96. storage_device,
  97. storage_handle,
  98. storage_size_bytes,
  99. storage_offset_bytes,
  100. ref_counter_handle,
  101. ref_counter_offset,
  102. event_handle,
  103. event_sync_required)
  104. shared_cache[(storage_handle, storage_offset_bytes)] = StorageWeakRef(storage)
  105. else:
  106. # We already ref counting this Storage, but producer needs new ref-counters to be released.
  107. storage_cls._release_ipc_counter(ref_counter_handle, ref_counter_offset, device=storage_device)
  108. _storage = storage if isinstance(storage, torch.UntypedStorage) else storage._untyped_storage
  109. t = torch._utils._rebuild_tensor(
  110. torch.storage.TypedStorage(wrap_storage=_storage, dtype=dtype, _internal=True),
  111. tensor_offset, tensor_size, tensor_stride)
  112. if tensor_cls == torch.nn.parameter.Parameter:
  113. # It is crucial for integer tensors to receive
  114. # the requires_grad=False as an argument in the constructor
  115. t = torch.nn.parameter.Parameter(t, requires_grad=requires_grad)
  116. else:
  117. t.requires_grad = requires_grad
  118. return t
  119. def reduce_tensor(tensor):
  120. storage = tensor._typed_storage()
  121. if tensor.requires_grad and not tensor.is_leaf:
  122. raise RuntimeError("Cowardly refusing to serialize non-leaf tensor which requires_grad, "
  123. "since autograd does not support crossing process boundaries. "
  124. "If you just want to transfer the data, call detach() on the tensor "
  125. "before serializing (e.g., putting it on the queue).")
  126. check_serializing_named_tensor(tensor)
  127. torch.utils.hooks.warn_if_has_hooks(tensor)
  128. # Note [CUDA IPC and the caching allocator]
  129. # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  130. # When you send a CUDA tensor over IPC, you might expect that you will
  131. # get out the same storage from the other end. However, the CUDA caching
  132. # allocator makes it difficult to preserve this invariant. Consider
  133. # the following situation: a tensor of size 0x100 points to offset 0x20 of
  134. # a storage at 0xA100 of size 0x100. (For simplicity, all of these
  135. # sizes are given in bytes). HOWEVER, with the caching allocator, this storage
  136. # might be part of a larger cudaMalloc allocation 0xA000 of size 0x4000.
  137. #
  138. # When we want to send this CUDA tensor over IPC, we must send the
  139. # *entire* cudaMalloc allocation, i.e., the 0xA000 region, not just
  140. # the storage 0xA100 (because that is what CUDA supports). So, on the
  141. # other end, there simply isn't any way to say, "Wait, you gave me
  142. # a bigger region (0xA000) than the one I wanted (0xA100)".
  143. #
  144. # OK, so if you sent the cudaMalloc allocation, can you just wrap that up as
  145. # one storage itself? No, because this cudaMalloc allocation might contain
  146. # storages of mixed types: float, bytes, double... If you make the entire
  147. # allocation a single storage of a type A, we'll hit an error when constructing
  148. # a tensor of type B on the storage.
  149. #
  150. # cudaIpcMemHandle is an identifier to access the sender cudaMalloc allocation on the
  151. # receiver side. However, cudaIpcMemHandles from each device in a given process may
  152. # only be opened by one context per device per other process.
  153. # If we open and close a memory handle multiples times in a process, CUDA is allowed
  154. # to give it a different address; similarly, once we close the memory, we're not
  155. # allowed to access it(and the storage/tensor built on top of it), even if it is
  156. # still live in the original process. As we cannot make a cudaMalloc allocation
  157. # to a single storage in one go, this requires us to cache the device pointer for
  158. # each cudaIpcMemHandle on C++ side to reconstruct types of storages, while keep
  159. # the old ones alives.
  160. # See [https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__DEVICE.html]
  161. #
  162. # This is fine, because all we need to do is to save our position in the allocation,
  163. # and reconstruct storage and tensor from it.
  164. # 0xA000 -> -------CUDA Allocation------
  165. # | |
  166. # | |
  167. # | |
  168. # | |
  169. # 0xA100 -> --------storage1 begin------
  170. # | |
  171. # 0xA120 -> --------tensor1 begin ------
  172. # | |
  173. # | |
  174. # | |
  175. # | |
  176. # | |
  177. # 0xA160 -> --------tensor1 end---------
  178. # | |
  179. # | |
  180. # | |
  181. # 0xA200 -> --------storage1 end--------
  182. # | |
  183. # 0xE000 -> --------CUDA allocation-----
  184. #
  185. # To send tensor1, the following info are required from sender to receiver for
  186. # storage recontruction.
  187. # 1. cudaIpcMemHandle of 0xA000(which can be mapped to a basePtr in receiver process).
  188. # basePtr may not be exactly 0xA000 since it's a different process.
  189. # 2. offset(0xA100) of storage1 in the CUDA allocation.
  190. # 3. size of storage1(0x100).
  191. #
  192. # On receiver side:
  193. # 1. Get the devPtr of the MemHandle to access the memory, reconstruct a storage
  194. # of the same type using (basePtr, offset, size).
  195. # 2. we can reconstruct the tensor on top of the reconstructed storage
  196. # Tensor(size=0x040, offset=0x020, storage=Storage(data=basePtr+0xA100, size=0x0100))
  197. #
  198. # This strategy has a few implications:
  199. #
  200. # 1. When we serialize a CUDA tensor for IPC, we cannot do it all in one
  201. # go (non-compositionally), and this requires to have a global map
  202. # memHandle -> devPtr for each process.
  203. #
  204. # 2. We MUST NOT let the new IPC tensor be resizable. Originally, a resize
  205. # of the storage beyond 0x100 would merely have caused us to do a
  206. # reallocation. You don't really want to do this, but if you did,
  207. # all that would happen is that you would lose IPC sharing. But if
  208. # you do this in the new world, we will happily let you write out of
  209. # bounds of your "allocation", clobbering unrelated data in the cached
  210. # allocator block. BAD!
  211. #
  212. # By the way, in old versions of PyTorch, we supported this situation
  213. # natively using a "storage view", which permitted multiple storages to be
  214. # views on each other. But this was the *only* use of storage views, so we
  215. # eliminated it so that we could just use tensor views to implement the same
  216. # thing.
  217. #
  218. if storage._untyped_storage.device.type == 'cuda':
  219. (device,
  220. handle,
  221. storage_size_bytes,
  222. storage_offset_bytes,
  223. ref_counter_handle,
  224. ref_counter_offset,
  225. event_handle,
  226. event_sync_required) = storage._share_cuda_()
  227. tensor_offset = tensor.storage_offset()
  228. shared_cache[handle] = StorageWeakRef(storage)
  229. # _backward_hooks purposely omitted here, see
  230. # Note [Don't serialize hooks]
  231. return (rebuild_cuda_tensor,
  232. (type(tensor),
  233. tensor.size(),
  234. tensor.stride(),
  235. tensor_offset, # tensor offset in its storage
  236. type(storage),
  237. tensor.dtype,
  238. device,
  239. handle, # identifier which CUDA allocation is the storage in.
  240. storage_size_bytes, # size(in bytes) of the storage
  241. storage_offset_bytes, # offset(in bytes) of the storage in the CUDA allocation
  242. tensor.requires_grad,
  243. ref_counter_handle,
  244. ref_counter_offset,
  245. event_handle,
  246. event_sync_required))
  247. # _backward_hooks purposely omitted here, see Note [Don't serialize hooks]
  248. metadata = (tensor.storage_offset(), tensor.size(), tensor.stride(), tensor.requires_grad)
  249. return (rebuild_tensor, (
  250. type(tensor),
  251. storage,
  252. metadata))
  253. def fd_id(fd):
  254. # Returns a tuple which uniquely identifies a file descriptor. In Mac OS,
  255. # this doesn't work with shared memory handles, which is why we don't
  256. # support the "file_descriptor" sharing method on that platform.
  257. stat = os.fstat(fd)
  258. return (stat.st_ino, stat.st_dev)
  259. def storage_from_cache(cls, key):
  260. storage_ref = shared_cache.get(key)
  261. if storage_ref is None:
  262. return None
  263. return torch.UntypedStorage._new_with_weak_ptr(storage_ref.cdata)
  264. def rebuild_storage_fd(cls, df, size):
  265. fd = df.detach()
  266. try:
  267. storage = storage_from_cache(cls, fd_id(fd))
  268. if storage is not None:
  269. return storage
  270. storage = cls._new_shared_fd_cpu(fd, size)
  271. shared_cache[fd_id(fd)] = StorageWeakRef(storage)
  272. return storage
  273. finally:
  274. os.close(fd)
  275. def rebuild_storage_filename(cls, manager, handle, size, dtype=None):
  276. storage: Union[torch.TypedStorage, torch.UntypedStorage] = storage_from_cache(cls, handle)
  277. if storage is not None:
  278. return storage._shared_decref()
  279. if dtype is None:
  280. storage = torch.UntypedStorage._new_shared_filename_cpu(manager, handle, size)
  281. else:
  282. byte_size = size * torch._utils._element_size(dtype)
  283. untyped_storage: torch.UntypedStorage = torch.UntypedStorage._new_shared_filename_cpu(manager, handle, byte_size)
  284. storage = torch.TypedStorage(
  285. wrap_storage=untyped_storage,
  286. dtype=dtype,
  287. _internal=True)
  288. shared_cache[handle] = StorageWeakRef(storage)
  289. return storage._shared_decref()
  290. def rebuild_storage_empty(cls):
  291. return cls()
  292. def rebuild_typed_storage(storage, dtype):
  293. return torch.storage.TypedStorage(wrap_storage=storage, dtype=dtype, _internal=True)
  294. # Use for torch.storage.TypedStorage
  295. def reduce_typed_storage(storage):
  296. return (rebuild_typed_storage, (storage._untyped_storage, storage.dtype))
  297. def rebuild_typed_storage_child(storage, storage_type):
  298. return storage_type(wrap_storage=storage, _internal=True)
  299. # Use for child classes of torch.storage.TypedStorage, like torch.FloatStorage
  300. def reduce_typed_storage_child(storage):
  301. return (rebuild_typed_storage_child, (storage._untyped_storage, type(storage)))
  302. def reduce_storage(storage):
  303. from . import get_sharing_strategy
  304. if storage.is_cuda:
  305. raise RuntimeError("Cannot pickle CUDA storage; try pickling a CUDA tensor instead")
  306. elif get_sharing_strategy() == 'file_system':
  307. metadata = storage._share_filename_cpu_()
  308. cache_key = metadata[1]
  309. rebuild = rebuild_storage_filename
  310. if isinstance(storage, torch.TypedStorage):
  311. metadata += (storage.dtype,)
  312. storage._shared_incref()
  313. elif storage.size() == 0:
  314. # This is special cased because Empty tensors
  315. # (with size 0) cannot be mmapped.
  316. return (rebuild_storage_empty, (type(storage),))
  317. else:
  318. fd, size = storage._share_fd_cpu_()
  319. df = multiprocessing.reduction.DupFd(fd)
  320. cache_key = fd_id(fd)
  321. metadata = (df, size)
  322. rebuild = rebuild_storage_fd # type: ignore[assignment]
  323. shared_cache[cache_key] = StorageWeakRef(storage)
  324. return (rebuild, (type(storage),) + metadata)
  325. def init_reductions():
  326. ForkingPickler.register(torch.cuda.Event, reduce_event)
  327. for t in torch._storage_classes:
  328. if t.__name__ == 'UntypedStorage':
  329. ForkingPickler.register(t, reduce_storage)
  330. else:
  331. ForkingPickler.register(t, reduce_typed_storage_child)
  332. ForkingPickler.register(torch.storage.TypedStorage, reduce_typed_storage)
  333. for t in torch._tensor_classes:
  334. ForkingPickler.register(t, reduce_tensor)
  335. # TODO: Maybe this should be in tensor_classes? :)
  336. ForkingPickler.register(torch.Tensor, reduce_tensor)
  337. ForkingPickler.register(torch.nn.parameter.Parameter, reduce_tensor)