utils.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from typing import Any, Dict, List, Tuple
  2. import torch
  3. import torch.distributed as dist
  4. from torch.nn.parallel._functions import _get_stream
  5. from torch.nn.parallel.scatter_gather import ( # type: ignore[attr-defined]
  6. _is_namedtuple,
  7. )
  8. from torch.nn.utils.rnn import PackedSequence
  9. __all__ = [] # type: ignore[var-annotated]
  10. def _pack_kwargs(*args: Any, **kwargs: Any) -> Tuple[Tuple[Any, ...], Tuple[str, ...]]:
  11. """
  12. Turn argument list into separate key list and value list (unpack_kwargs does the opposite)
  13. Inspiration: https://github.com/facebookresearch/fairscale/blob/eeb6684/fairscale/internal/containers.py#L70
  14. Usage::
  15. kwarg_keys, flat_args = pack_kwargs(1, 2, a=3, b=4)
  16. assert kwarg_keys == ("a", "b")
  17. assert flat_args == (1, 2, 3, 4)
  18. args, kwargs = unpack_kwargs(kwarg_keys, flat_args)
  19. assert args == (1, 2)
  20. assert kwargs == {"a": 3, "b": 4}
  21. Returns:
  22. Tuple[Tuple[Any, ...], Tuple[str, ...]]: The first tuple element gives
  23. gives both positional args and kwarg values, where the positional args
  24. proceed kwarg values and kwarg values are ordered consistently with the
  25. kwarg keys. The second tuple element gives the kwarg keys.
  26. The second tuple element's length is at most the first tuple element's length.
  27. """
  28. kwarg_keys: List[str] = []
  29. flat_args: List[Any] = list(args)
  30. for k, v in kwargs.items():
  31. kwarg_keys.append(k)
  32. flat_args.append(v)
  33. return tuple(flat_args), tuple(kwarg_keys)
  34. def _unpack_kwargs(flat_args: Tuple[Any, ...], kwarg_keys: Tuple[str, ...]) -> Tuple[Tuple[Any, ...], Dict[str, Any]]:
  35. """See _pack_kwargs."""
  36. assert len(kwarg_keys) <= len(flat_args), f"too many keys {len(kwarg_keys)} vs. {len(flat_args)}"
  37. if len(kwarg_keys) == 0:
  38. return flat_args, {}
  39. args = flat_args[: -len(kwarg_keys)]
  40. kwargs = {k: v for k, v in zip(kwarg_keys, flat_args[-len(kwarg_keys) :])}
  41. return args, kwargs
  42. def _recursive_to(inputs, target_gpu, use_side_stream_for_tensor_copies):
  43. r"""
  44. Recursively moves input to the target_gpu.
  45. """
  46. def to_map(obj):
  47. if isinstance(obj, (torch.Tensor, PackedSequence)):
  48. device = obj.data.device if isinstance(obj, PackedSequence) else obj.device
  49. if device == torch.device("cuda", target_gpu):
  50. return (obj,)
  51. if not use_side_stream_for_tensor_copies:
  52. return (obj.to(target_gpu),)
  53. else:
  54. # Perform CPU -> GPU copies in a background stream. This code is
  55. # motivated from similar logic in torch/nn/parallel/_functions.py
  56. stream = _get_stream(target_gpu)
  57. with torch.cuda.stream(stream):
  58. output = obj.to(target_gpu)
  59. # synchronize with the copy stream
  60. with torch.cuda.device(target_gpu):
  61. current_stream = torch.cuda.current_stream()
  62. # Sync the current stream with the copy stream
  63. current_stream.wait_stream(stream)
  64. # Ensure tensor memory is not reused until work on
  65. # main stream is complete
  66. if isinstance(obj, PackedSequence):
  67. output.data.record_stream(current_stream) # type: ignore[arg-type]
  68. else:
  69. output.record_stream(current_stream) # type: ignore[arg-type]
  70. return (output,)
  71. if _is_namedtuple(obj):
  72. return [type(obj)(*args) for args in zip(*map(to_map, obj))]
  73. if isinstance(obj, tuple) and len(obj) > 0:
  74. return list(zip(*map(to_map, obj)))
  75. if isinstance(obj, list) and len(obj) > 0:
  76. return [list(i) for i in zip(*map(to_map, obj))]
  77. if isinstance(obj, dict) and len(obj) > 0:
  78. return [type(obj)(i) for i in zip(*map(to_map, obj.items()))]
  79. return [obj]
  80. # Avoid reference cycle
  81. try:
  82. res = to_map(inputs)
  83. finally:
  84. to_map = None # type: ignore[assignment]
  85. return res
  86. def _to_kwargs(inputs, kwargs, device_id, use_side_stream_for_tensor_copies):
  87. inputs = (
  88. _recursive_to(inputs, device_id, use_side_stream_for_tensor_copies)
  89. if inputs
  90. else []
  91. )
  92. kwargs = (
  93. _recursive_to(kwargs, device_id, use_side_stream_for_tensor_copies)
  94. if kwargs
  95. else []
  96. )
  97. if len(inputs) < len(kwargs):
  98. inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
  99. elif len(kwargs) < len(inputs):
  100. kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
  101. inputs = tuple(inputs)
  102. kwargs = tuple(kwargs)
  103. return inputs, kwargs
  104. def _verify_param_shape_across_processes(process_group, tensors, logger=None):
  105. return dist._verify_params_across_processes(process_group, tensors, logger)
  106. def _sync_module_states(
  107. module,
  108. process_group,
  109. broadcast_bucket_size,
  110. src,
  111. params_and_buffers_to_ignore,
  112. ):
  113. """
  114. Syncs ``module``'s parameters and buffers state so that all ranks contain
  115. the same module state across all ranks. Note that this API assumes that all
  116. parameter shapes are consistent before running the synchronization. This can
  117. be checked with ``_verify_param_shape_across_processes``.
  118. """
  119. module_states = []
  120. for name, param in module.named_parameters():
  121. if name not in params_and_buffers_to_ignore:
  122. module_states.append(param.detach())
  123. for name, buffer in module.named_buffers():
  124. if name not in params_and_buffers_to_ignore:
  125. module_states.append(buffer.detach())
  126. _sync_params_and_buffers(
  127. process_group,
  128. module_states,
  129. broadcast_bucket_size,
  130. src
  131. )
  132. def _sync_params_and_buffers(
  133. process_group: dist.ProcessGroup,
  134. module_states: List[torch.Tensor],
  135. broadcast_bucket_size: int,
  136. src: int,
  137. ):
  138. """
  139. Synchronizes ``module_states`` (list of tensors) across all processes by
  140. broadcasting them from rank 0.
  141. """
  142. if len(module_states) > 0:
  143. dist._broadcast_coalesced(
  144. process_group, module_states, broadcast_bucket_size, src
  145. )
  146. def _replace_by_prefix(
  147. state_dict: Dict[str, Any],
  148. old_prefix: str,
  149. new_prefix: str,
  150. ) -> None:
  151. """
  152. Replace all keys that match a given old_prefix with a new_prefix (in-place).
  153. Usage::
  154. state_dict = {"layer.xyz": torch.tensor(1)}
  155. replace_by_prefix_(state_dict, "layer.", "module.layer.")
  156. assert state_dict == {"module.layer.xyz": torch.tensor(1)}
  157. """
  158. if old_prefix == new_prefix:
  159. raise ValueError("old_prefix and new_prefix must be distinct")
  160. for key in list(state_dict.keys()):
  161. if not key.startswith(old_prefix):
  162. continue
  163. new_key = new_prefix + key[len(old_prefix) :]
  164. state_dict[new_key] = state_dict[key]
  165. del state_dict[key]