comm.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import warnings
  2. import torch
  3. from torch.cuda import nccl
  4. from torch._utils import _take_tensors, _flatten_dense_tensors, \
  5. _unflatten_dense_tensors, _reorder_tensors_as, _get_device_index, _handle_complex
  6. from typing import List
  7. def broadcast(tensor, devices=None, *, out=None):
  8. r"""Broadcasts a tensor to specified GPU devices.
  9. Args:
  10. tensor (Tensor): tensor to broadcast. Can be on CPU or GPU.
  11. devices (Iterable[torch.device, str or int], optional): an iterable of
  12. GPU devices, among which to broadcast.
  13. out (Sequence[Tensor], optional, keyword-only): the GPU tensors to
  14. store output results.
  15. .. note::
  16. Exactly one of :attr:`devices` and :attr:`out` must be specified.
  17. Returns:
  18. - If :attr:`devices` is specified,
  19. a tuple containing copies of :attr:`tensor`, placed on
  20. :attr:`devices`.
  21. - If :attr:`out` is specified,
  22. a tuple containing :attr:`out` tensors, each containing a copy of
  23. :attr:`tensor`.
  24. """
  25. tensor = _handle_complex(tensor)
  26. if not ((devices is None) ^ (out is None)):
  27. raise RuntimeError(
  28. "Exactly one of 'devices' and 'out' must be specified, but got "
  29. "devices={} and out={}".format(devices, out))
  30. if devices is not None:
  31. devices = [_get_device_index(d) for d in devices]
  32. return torch._C._broadcast(tensor, devices)
  33. else:
  34. return torch._C._broadcast_out(tensor, out)
  35. def broadcast_coalesced(tensors, devices, buffer_size=10485760):
  36. """Broadcasts a sequence tensors to the specified GPUs.
  37. Small tensors are first coalesced into a buffer to reduce the number
  38. of synchronizations.
  39. Args:
  40. tensors (sequence): tensors to broadcast. Must be on the same device,
  41. either CPU or GPU.
  42. devices (Iterable[torch.device, str or int]): an iterable of GPU
  43. devices, among which to broadcast.
  44. buffer_size (int): maximum size of the buffer used for coalescing
  45. Returns:
  46. A tuple containing copies of :attr:`tensor`, placed on :attr:`devices`.
  47. """
  48. devices = [_get_device_index(d) for d in devices]
  49. tensors = [_handle_complex(t) for t in tensors]
  50. return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
  51. def reduce_add(inputs, destination=None):
  52. """Sums tensors from multiple GPUs.
  53. All inputs should have matching shapes, dtype, and layout. The output tensor
  54. will be of the same shape, dtype, and layout.
  55. Args:
  56. inputs (Iterable[Tensor]): an iterable of tensors to add.
  57. destination (int, optional): a device on which the output will be
  58. placed (default: current device).
  59. Returns:
  60. A tensor containing an elementwise sum of all inputs, placed on the
  61. :attr:`destination` device.
  62. """
  63. destination = _get_device_index(destination, optional=True)
  64. input_size = inputs[0].size()
  65. root_index = None # index of input tensor that already is on the correct device
  66. for i, inp in enumerate(inputs):
  67. assert inp.device.type != "cpu", "reduce_add expects all inputs to be on GPUs"
  68. if inp.get_device() == destination:
  69. root_index = i
  70. if inp.size() != input_size:
  71. got = 'x'.join(str(x) for x in inp.size())
  72. expected = 'x'.join(str(x) for x in input_size)
  73. raise ValueError("input {} has invalid size: got {}, but expected "
  74. "{}".format(i, got, expected))
  75. if root_index is None:
  76. raise RuntimeError("reduce_add expects destination to be on the same GPU with one of the tensors")
  77. if len(inputs) == 1:
  78. return inputs[0]
  79. if nccl.is_available(inputs):
  80. result = torch.empty_like(inputs[root_index])
  81. nccl.reduce(inputs, output=result, root=root_index)
  82. else:
  83. destination_device = torch.device(inputs[root_index].device.type, destination)
  84. nonroot = [t for i, t in enumerate(inputs) if i != root_index]
  85. # make a new tensor w/o clone
  86. result = inputs[root_index] + nonroot[0].to(device=destination_device, non_blocking=True)
  87. for other in nonroot[1:]:
  88. result.add_(other.to(device=destination_device, non_blocking=True))
  89. return result
  90. def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760):
  91. """Sums tensors from multiple GPUs.
  92. Small tensors are first coalesced into a buffer to reduce the number
  93. of synchronizations.
  94. Args:
  95. inputs (Iterable[Iterable[Tensor]]): iterable of iterables that
  96. contain tensors from a single device.
  97. destination (int, optional): a device on which the output will be
  98. placed (default: current device).
  99. buffer_size (int): maximum size of the buffer used for coalescing
  100. Returns:
  101. A tuple of tensors containing an elementwise sum of each group of
  102. inputs, placed on the ``destination`` device.
  103. """
  104. # TODO: When `len(inputs) == 1` and all inputs are on `destination`, just
  105. # return `inputs`.
  106. dense_tensors: List[List] = [[] for _ in inputs] # shape (num_gpus, num_tensors)
  107. output = []
  108. ref_order = []
  109. # process sparse ones first since they may have different sizes on different gpus
  110. for tensor_at_gpus in zip(*inputs):
  111. if all(t.is_sparse for t in tensor_at_gpus):
  112. result = reduce_add(tensor_at_gpus, destination) # this will be sparse too
  113. output.append(result)
  114. ref_order.append(tensor_at_gpus[0])
  115. else:
  116. for coll, t in zip(dense_tensors, tensor_at_gpus):
  117. coll.append(t.to_dense() if t.is_sparse else t)
  118. ref_order.append(dense_tensors[0][-1])
  119. itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors]
  120. # now the dense ones, which have consistent sizes
  121. for chunks in zip(*itrs):
  122. flat_tensors = [_flatten_dense_tensors(chunk) for chunk in chunks] # (num_gpus,)
  123. flat_result = reduce_add(flat_tensors, destination)
  124. for t in _unflatten_dense_tensors(flat_result, chunks[0]):
  125. # The unflattened tensors do not share storage, and we don't expose
  126. # base flat tensor anyways, so give them different version counters.
  127. # See NOTE [ Version Counter in comm.*_coalesced ]
  128. output.append(t.data)
  129. return tuple(_reorder_tensors_as(output, ref_order))
  130. def scatter(tensor, devices=None, chunk_sizes=None, dim=0, streams=None, *, out=None):
  131. """Scatters tensor across multiple GPUs.
  132. Args:
  133. tensor (Tensor): tensor to scatter. Can be on CPU or GPU.
  134. devices (Iterable[torch.device, str or int], optional): an iterable of
  135. GPU devices, among which to scatter.
  136. chunk_sizes (Iterable[int], optional): sizes of chunks to be placed on
  137. each device. It should match :attr:`devices` in length and sums to
  138. ``tensor.size(dim)``. If not specified, :attr:`tensor` will be divided
  139. into equal chunks.
  140. dim (int, optional): A dimension along which to chunk :attr:`tensor`.
  141. Default: ``0``.
  142. streams (Iterable[Stream], optional): an iterable of Streams, among
  143. which to execute the scatter. If not specified, the default stream will
  144. be utilized.
  145. out (Sequence[Tensor], optional, keyword-only): the GPU tensors to
  146. store output results. Sizes of these tensors must match that of
  147. :attr:`tensor`, except for :attr:`dim`, where the total size must
  148. sum to ``tensor.size(dim)``.
  149. .. note::
  150. Exactly one of :attr:`devices` and :attr:`out` must be specified. When
  151. :attr:`out` is specified, :attr:`chunk_sizes` must not be specified and
  152. will be inferred from sizes of :attr:`out`.
  153. Returns:
  154. - If :attr:`devices` is specified,
  155. a tuple containing chunks of :attr:`tensor`, placed on
  156. :attr:`devices`.
  157. - If :attr:`out` is specified,
  158. a tuple containing :attr:`out` tensors, each containing a chunk of
  159. :attr:`tensor`.
  160. """
  161. tensor = _handle_complex(tensor)
  162. if out is None:
  163. devices = [_get_device_index(d) for d in devices]
  164. return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams))
  165. else:
  166. if devices is not None:
  167. raise RuntimeError(
  168. "'devices' must not be specified when 'out' is specified, but "
  169. "got devices={}".format(devices))
  170. if chunk_sizes is not None:
  171. raise RuntimeError(
  172. "'chunk_sizes' must not be specified when 'out' is specified, "
  173. "but got chunk_sizes={}".format(chunk_sizes))
  174. return tuple(torch._C._scatter_out(tensor, out, dim, streams))
  175. def gather(tensors, dim=0, destination=None, *, out=None):
  176. r"""Gathers tensors from multiple GPU devices.
  177. Args:
  178. tensors (Iterable[Tensor]): an iterable of tensors to gather.
  179. Tensor sizes in all dimensions other than :attr:`dim` have to match.
  180. dim (int, optional): a dimension along which the tensors will be
  181. concatenated. Default: ``0``.
  182. destination (torch.device, str, or int, optional): the output device.
  183. Can be CPU or CUDA. Default: the current CUDA device.
  184. out (Tensor, optional, keyword-only): the tensor to store gather result.
  185. Its sizes must match those of :attr:`tensors`, except for :attr:`dim`,
  186. where the size must equal ``sum(tensor.size(dim) for tensor in tensors)``.
  187. Can be on CPU or CUDA.
  188. .. note::
  189. :attr:`destination` must not be specified when :attr:`out` is specified.
  190. Returns:
  191. - If :attr:`destination` is specified,
  192. a tensor located on :attr:`destination` device, that is a result of
  193. concatenating :attr:`tensors` along :attr:`dim`.
  194. - If :attr:`out` is specified,
  195. the :attr:`out` tensor, now containing results of concatenating
  196. :attr:`tensors` along :attr:`dim`.
  197. """
  198. tensors = [_handle_complex(t) for t in tensors]
  199. if out is None:
  200. if destination == -1:
  201. warnings.warn(
  202. 'Using -1 to represent CPU tensor is deprecated. Please use a '
  203. 'device object or string instead, e.g., "cpu".')
  204. destination = _get_device_index(destination, allow_cpu=True, optional=True)
  205. return torch._C._gather(tensors, dim, destination)
  206. else:
  207. if destination is not None:
  208. raise RuntimeError(
  209. "'destination' must not be specified when 'out' is specified, but "
  210. "got destination={}".format(destination))
  211. return torch._C._gather_out(tensors, out, dim)