_init_utils.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000
  1. import collections
  2. import warnings
  3. from typing import (
  4. Any,
  5. Callable,
  6. Dict,
  7. Generator,
  8. Iterable,
  9. Iterator,
  10. List,
  11. no_type_check,
  12. Optional,
  13. Set,
  14. Tuple,
  15. Type,
  16. Union,
  17. )
  18. import torch
  19. import torch.distributed as dist
  20. import torch.distributed.fsdp._exec_order_utils as exec_order_utils
  21. import torch.distributed.fsdp._traversal_utils as traversal_utils
  22. import torch.distributed.fsdp.fully_sharded_data_parallel as fsdp_file
  23. import torch.nn as nn
  24. from torch.distributed.algorithms._comm_hooks import default_hooks
  25. from torch.distributed.distributed_c10d import _get_default_group
  26. from torch.distributed.fsdp._common_utils import (
  27. _FSDPState,
  28. _get_module_fsdp_state,
  29. _is_fsdp_flattened,
  30. clean_tensor_name,
  31. TrainingState,
  32. )
  33. from torch.distributed.fsdp._limiter_utils import _FreeEventQueue
  34. from torch.distributed.fsdp._wrap_utils import _get_fully_sharded_module_to_states
  35. from torch.distributed.fsdp.api import (
  36. BackwardPrefetch,
  37. CPUOffload,
  38. FullOptimStateDictConfig,
  39. FullStateDictConfig,
  40. MixedPrecision,
  41. ShardingStrategy,
  42. StateDictConfig,
  43. StateDictType,
  44. )
  45. from torch.distributed.fsdp.flat_param import (
  46. _HandlesKey,
  47. FlatParameter,
  48. FlatParamHandle,
  49. HandleShardingStrategy,
  50. )
  51. from torch.distributed.fsdp.wrap import _FSDPPolicy
  52. from torch.distributed.utils import _sync_params_and_buffers
  53. from torch.utils.hooks import RemovableHandle
  54. _TORCHDISTX_AVAIL = True
  55. try:
  56. from torchdistx import deferred_init, fake # type: ignore[import]
  57. except ImportError:
  58. _TORCHDISTX_AVAIL = False
  59. PARAM_BROADCAST_BUCKET_SIZE = int(250 * 1024 * 1024)
  60. FSDP_SYNCED = "_fsdp_synced"
  61. # Specification of process groups for hybrid sharding strategies.
  62. HybridShardProcessGroupType = Tuple[dist.ProcessGroup, dist.ProcessGroup]
  63. # Overall specification of process group.
  64. ProcessGroupType = Optional[Union[dist.ProcessGroup, HybridShardProcessGroupType]]
  65. # TODO (awgu): Refactor this later
  66. SHARDING_STRATEGY_MAP = {
  67. ShardingStrategy.NO_SHARD: HandleShardingStrategy.NO_SHARD,
  68. ShardingStrategy.FULL_SHARD: HandleShardingStrategy.FULL_SHARD,
  69. ShardingStrategy.SHARD_GRAD_OP: HandleShardingStrategy.SHARD_GRAD_OP,
  70. ShardingStrategy.HYBRID_SHARD: HandleShardingStrategy.HYBRID_SHARD,
  71. ShardingStrategy._HYBRID_SHARD_ZERO2: HandleShardingStrategy._HYBRID_SHARD_ZERO2,
  72. }
  73. HYBRID_SHARDING_STRATEGIES = {
  74. ShardingStrategy.HYBRID_SHARD,
  75. ShardingStrategy._HYBRID_SHARD_ZERO2,
  76. }
  77. # NOTE: Since non-self attributes cannot be type annotated, several attributes
  78. # on `state` are defined first as local variables before being assigned.
  79. @no_type_check
  80. def _init_process_group_state(
  81. state: _FSDPState,
  82. process_group: ProcessGroupType,
  83. sharding_strategy: ShardingStrategy,
  84. policy: Optional[_FSDPPolicy],
  85. ) -> _FSDPState:
  86. if sharding_strategy in HYBRID_SHARDING_STRATEGIES:
  87. if process_group is None and policy is None:
  88. # Raise an error here, since this is manual wrapping with no process group
  89. # passed in, there is no way to ensure all wrapped FSDP instances use the same
  90. # process groups.
  91. raise ValueError(
  92. f"Manual wrapping with {sharding_strategy} requires explicit specification of process group."
  93. )
  94. else:
  95. state = _init_process_group_state_for_hybrid_shard(state, process_group)
  96. assert (
  97. state.process_group is not None
  98. ), "Expected to populate state.process_group for hybrid shard"
  99. assert (
  100. state._inter_node_pg is not None
  101. ), "Expected to populate state._inter_node_pg for hybrid shard"
  102. assert (
  103. state._inter_node_state is not None
  104. ), "Expected to populate state._inter_node_state for hybrid shad."
  105. else:
  106. state.process_group = (
  107. process_group if process_group is not None else _get_default_group()
  108. )
  109. state.rank = state.process_group.rank()
  110. state.world_size = state.process_group.size()
  111. return state
  112. @no_type_check
  113. def _init_process_group_state_for_hybrid_shard(
  114. state: _FSDPState, process_group
  115. ) -> _FSDPState:
  116. if process_group is None:
  117. default_group = _get_default_group()
  118. intra_node_group, inter_node_group = _init_intra_and_inter_node_groups(
  119. default_group
  120. )
  121. # we shard across intra-node
  122. state.process_group = intra_node_group
  123. # save _inter_node_pg to allreduce across.
  124. state._inter_node_pg = inter_node_group
  125. else:
  126. # Check type and assign state.process_group and state._inter_node_pg.
  127. if _is_valid_hybrid_shard_pg_type(process_group):
  128. # Assuming that user passed in as intra node group and inter node group
  129. # as documented.
  130. state.process_group, state._inter_node_pg = process_group
  131. else:
  132. raise ValueError(
  133. "Expected process_group to be passed in as either None or "
  134. f"Tuple[dist.ProcessGroup, dist.ProcessGroup] but got {type(process_group)}"
  135. )
  136. # Create state for allreduce
  137. state._inter_node_state = _get_default_comm_hook_state(
  138. process_group=state._inter_node_pg,
  139. )
  140. return state
  141. @no_type_check
  142. def _is_valid_hybrid_shard_pg_type(process_group: Any) -> bool:
  143. return (
  144. isinstance(process_group, tuple)
  145. and len(process_group) == 2
  146. and all(isinstance(pg, dist.ProcessGroup) for pg in process_group)
  147. )
  148. @no_type_check
  149. def _init_intra_node_process_group() -> dist.ProcessGroup:
  150. """
  151. Returns a process group across the current node.
  152. For example, given each row is a distinct node:
  153. 0 1 2 3 4 5 6 7 8
  154. 9 10 11 12 13 14 15
  155. This API would return an intra-node subgroup across
  156. [0, 7] or [8, 15] depending on the process's rank.
  157. For example, rank 3 would get [0, 7].
  158. """
  159. intra_node_subgroup, _ = dist.new_subgroups()
  160. return intra_node_subgroup
  161. @no_type_check
  162. def _init_inter_node_process_group(
  163. global_process_group: dist.ProcessGroup,
  164. ) -> dist.ProcessGroup:
  165. """
  166. Returns an inter-node process group where each contained rank has
  167. the same local rank. For example, given each column is a distinct node:
  168. 0 1 2 3 4 5 6 7 8
  169. 9 10 11 12 13 14 15
  170. This API would return inter-node process group {0, 8}, {1, 9}, {2, 10}, and so forth
  171. depending on the process's rank. For example, rank 1 would get {1, 9}, rank 5
  172. would get {5, 13}.
  173. """
  174. # the inter-node pg that is returned
  175. inter_node_pg = None
  176. sharding_backend = dist.get_backend(global_process_group)
  177. world_size = dist.get_world_size(global_process_group)
  178. # Assuming fully homogeneous setup
  179. num_devices = torch.cuda.device_count()
  180. num_nodes = world_size // num_devices
  181. my_local_rank = dist.get_rank(global_process_group) % num_devices
  182. for local_rank in range(num_devices):
  183. ranks_for_inter_group = [
  184. local_rank + (i * num_devices) for i in range(num_nodes)
  185. ]
  186. # every rank always needs to call dist.new_group
  187. grp = dist.new_group(ranks=ranks_for_inter_group, backend=sharding_backend)
  188. if local_rank == my_local_rank:
  189. print(f"{local_rank} created process group for {ranks_for_inter_group}")
  190. inter_node_pg = grp
  191. assert (
  192. inter_node_pg is not None
  193. ), f"{my_local_rank} expected to assign inter-node pg, but did not"
  194. return inter_node_pg
  195. def _init_intra_and_inter_node_groups(
  196. global_process_group: dist.ProcessGroup,
  197. ) -> Tuple[dist.ProcessGroup, dist.ProcessGroup]:
  198. """
  199. Initializes intra and inter-node process groups and returns the ones corresponding
  200. to this process's rank.
  201. This function can be used to initialize process groups for ``HYBRID_SHARD`` or
  202. ``_HYBRID_SHARD_ZERO2`` in FSDP.
  203. This function assumes each node has an equal number of CUDA-enabled devices.
  204. Returns:
  205. Tuple[dist.ProcessGroup, dist.ProcessGroup]: Intra and inter-node process group.
  206. """
  207. return (
  208. _init_intra_node_process_group(),
  209. _init_inter_node_process_group(global_process_group),
  210. )
  211. @no_type_check
  212. def _init_ignored_module_states(
  213. state: _FSDPState,
  214. module: nn.Module,
  215. ignored_modules: Optional[Iterable[torch.nn.Module]],
  216. ignored_parameters: Optional[Iterable[torch.nn.Parameter]] = None,
  217. ) -> _FSDPState:
  218. assert (
  219. ignored_modules is None or ignored_parameters is None
  220. ), "Can not pass `ignored_modules` and `ignored_parameters` at the same time. \
  221. Please either pass `ignored_modules` or `ignored_parameters`."
  222. state._ignored_modules = _get_ignored_modules(module, ignored_modules)
  223. state._ignored_params = _get_ignored_params(
  224. module,
  225. state._ignored_modules,
  226. ignored_parameters,
  227. )
  228. # TODO: FSDP's contract for buffers is not well-defined. They are
  229. # implicitly ignored for most functionality since they are not sharded;
  230. # however, FSDP still imposes some semantics on buffers (e.g. buffer mixed
  231. # precision). We should formalize this contract and decide if we need to
  232. # compute and store `_ignored_buffers`.
  233. return state
  234. @no_type_check
  235. def _init_buffer_state(
  236. state: _FSDPState,
  237. module: nn.Module,
  238. ) -> _FSDPState:
  239. state._buffer_names = _get_buffer_names(module)
  240. # Save a mapping from clean fully-qualified buffer name (starting from
  241. # `module`) to its original dtype for restoring that dtype during model
  242. # checkpointing when buffer mixed precision is enabled. The names should
  243. # be clean since the casting happens in a `summon_full_params()` context.
  244. _buffer_name_to_orig_dtype: Dict[str, torch.dtype] = {}
  245. for buffer_name, buffer in module.named_buffers():
  246. buffer_name = clean_tensor_name(buffer_name)
  247. _buffer_name_to_orig_dtype[buffer_name] = buffer.dtype
  248. state._buffer_name_to_orig_dtype = _buffer_name_to_orig_dtype
  249. return state
  250. @no_type_check
  251. def _init_core_state(
  252. state: _FSDPState,
  253. sharding_strategy: Optional[ShardingStrategy],
  254. mixed_precision: Optional[MixedPrecision],
  255. cpu_offload: Optional[CPUOffload],
  256. limit_all_gathers: bool,
  257. use_orig_params: bool,
  258. backward_prefetch_limit: int,
  259. forward_prefetch_limit: int,
  260. ) -> _FSDPState:
  261. # We clamp the strategy to `NO_SHARD` for world size of 1 since they are
  262. # currently functionally equivalent. This may change if/when we integrate
  263. # FSDP with MoE.
  264. if state.world_size == 1:
  265. if sharding_strategy != ShardingStrategy.NO_SHARD:
  266. warnings.warn(
  267. "FSDP is switching to use `NO_SHARD` instead of "
  268. f"{sharding_strategy or ShardingStrategy.FULL_SHARD} since "
  269. "the world size is 1."
  270. )
  271. sharding_strategy = ShardingStrategy.NO_SHARD
  272. state.sharding_strategy = sharding_strategy or ShardingStrategy.FULL_SHARD
  273. state.mixed_precision = mixed_precision or MixedPrecision()
  274. state.cpu_offload = cpu_offload or CPUOffload()
  275. state.limit_all_gathers = limit_all_gathers
  276. state._use_orig_params = use_orig_params
  277. state.training_state = TrainingState.IDLE
  278. state._is_root = None
  279. _streams: Dict[str, torch.cuda.Stream] = {}
  280. state._streams = _streams
  281. _stream_to_name: Dict[torch.cuda.Stream, str] = {}
  282. state._stream_to_name = _stream_to_name
  283. state._free_event_queue = _FreeEventQueue()
  284. state._debug_level = dist.get_debug_level()
  285. state._exec_order_data = exec_order_utils._ExecOrderData(
  286. state._debug_level,
  287. backward_prefetch_limit,
  288. forward_prefetch_limit,
  289. )
  290. # Mapping from fully sharded module to the handles it is responsible to
  291. # unshard and reshard (see [Note: Fully Sharded Module])
  292. _fully_sharded_module_to_handles: Dict[
  293. nn.Module, List[FlatParamHandle]
  294. ] = collections.defaultdict(list)
  295. state._fully_sharded_module_to_handles = _fully_sharded_module_to_handles
  296. # Invariant: `state.params` contains exactly the `FlatParameter`s of the
  297. # handles in `state._handles`
  298. _handles: List[FlatParamHandle] = []
  299. state._handles = _handles
  300. params: List[FlatParameter] = []
  301. state.params = params
  302. return state
  303. @no_type_check
  304. def _init_runtime_state(
  305. state: _FSDPState,
  306. ) -> _FSDPState:
  307. _root_pre_forward_handles: List[RemovableHandle] = []
  308. state._root_pre_forward_handles = _root_pre_forward_handles
  309. _pre_forward_handles: List[RemovableHandle] = []
  310. state._pre_forward_handles = _pre_forward_handles
  311. _post_forward_handles: List[RemovableHandle] = []
  312. state._post_forward_handles = _post_forward_handles
  313. state._sync_gradients = True
  314. state._communication_hook = _get_default_comm_hook(state.sharding_strategy)
  315. state._communication_hook_state = _get_default_comm_hook_state(state.process_group)
  316. state._hook_registered = False
  317. # Used to prevent running the pre-backward hook multiple times
  318. _ran_pre_backward_hook: Dict[_HandlesKey, bool] = {}
  319. state._ran_pre_backward_hook = _ran_pre_backward_hook
  320. return state
  321. @no_type_check
  322. def _init_prefetching_state(
  323. state: _FSDPState,
  324. backward_prefetch: BackwardPrefetch,
  325. forward_prefetch: bool,
  326. ) -> _FSDPState:
  327. state.backward_prefetch = backward_prefetch
  328. state.forward_prefetch = forward_prefetch
  329. _handles_prefetched: Dict[_HandlesKey, bool] = {}
  330. state._handles_prefetched = _handles_prefetched
  331. # Used for guarding against mistargeted backward prefetches
  332. _needs_pre_backward_unshard: Dict[_HandlesKey, bool] = {}
  333. state._needs_pre_backward_unshard = _needs_pre_backward_unshard
  334. # Used for guarding against mistargeted forward prefetches
  335. _needs_pre_forward_unshard: Dict[_HandlesKey, bool] = {}
  336. state._needs_pre_forward_unshard = _needs_pre_forward_unshard
  337. # The data structures use tuples of handles to generalize over the case
  338. # where a module's forward involves multiple handles.
  339. return state
  340. def _init_state_dict_state(state: _FSDPState) -> _FSDPState:
  341. state._state_dict_type = StateDictType.FULL_STATE_DICT
  342. state_dict_config: StateDictConfig = FullStateDictConfig()
  343. state._optim_state_dict_config = FullOptimStateDictConfig()
  344. state._state_dict_config = state_dict_config
  345. unshard_params_ctx: Dict[nn.Module, Generator] = {}
  346. state._unshard_params_ctx = unshard_params_ctx
  347. return state
  348. @no_type_check
  349. def _init_param_handle_from_module(
  350. state: _FSDPState,
  351. fully_sharded_module: nn.Module,
  352. device_id: Optional[Union[int, torch.device]],
  353. param_init_fn: Optional[Callable[[nn.Module], None]],
  354. sync_module_states: bool,
  355. module_wrapper_cls: Type,
  356. ) -> _FSDPState:
  357. """
  358. Initializes a ``FlatParamHandle`` from a module ``fully_sharded_module``.
  359. This is the module wrapper code path.
  360. """
  361. _check_single_device_module(fully_sharded_module, state._ignored_params)
  362. device_from_device_id = _get_device_from_device_id(device_id, state.rank)
  363. is_meta_module, is_torchdistX_deferred_init = _need_to_materialize_module(
  364. fully_sharded_module, state._ignored_params
  365. )
  366. # Materialize the module if needed
  367. if (is_meta_module or is_torchdistX_deferred_init) and param_init_fn is not None:
  368. _materialize_with_param_init_fn(fully_sharded_module, param_init_fn)
  369. elif is_meta_module:
  370. _materialize_meta_module(fully_sharded_module, device_id)
  371. elif is_torchdistX_deferred_init:
  372. deferred_init.materialize_module(
  373. fully_sharded_module,
  374. check_fn=lambda k: not isinstance(k, module_wrapper_cls),
  375. )
  376. # TODO: Investigate refactoring `_move_module_to_device()` to
  377. # `_move_states_to_device()` to avoid the `device_id` + CPU offload hack
  378. _move_module_to_device(
  379. fully_sharded_module, state._ignored_params, device_from_device_id
  380. )
  381. state.compute_device = _get_compute_device(
  382. fully_sharded_module,
  383. state._ignored_params,
  384. device_from_device_id,
  385. state.rank,
  386. )
  387. managed_params = list(_get_orig_params(fully_sharded_module, state._ignored_params))
  388. if sync_module_states:
  389. _sync_module_params_and_buffers(
  390. fully_sharded_module, managed_params, state.process_group
  391. )
  392. _init_param_handle_from_params(state, managed_params, fully_sharded_module)
  393. return state
  394. @no_type_check
  395. def _init_param_handles_from_module(
  396. state: _FSDPState,
  397. root_module: nn.Module,
  398. policy: _FSDPPolicy,
  399. device_id: Optional[Union[int, torch.device]],
  400. param_init_fn: Optional[Callable[[nn.Module], None]],
  401. sync_module_states: bool,
  402. ) -> _FSDPState:
  403. """
  404. Initializes all ``FlatParamHandle`` s from a module ``root_module``. This
  405. is the non-module-wrapper code path. ``root_module`` is guaranteed to be
  406. a fully sharded module, and some of its submodules may be as well,
  407. depending on ``policy``. See [Note: Fully Sharded Module].
  408. """
  409. fully_sharded_module_to_states = _get_fully_sharded_module_to_states(
  410. root_module,
  411. policy,
  412. state._ignored_modules,
  413. state._ignored_params,
  414. )
  415. _check_single_device_module(root_module, state._ignored_params)
  416. device_from_device_id = _get_device_from_device_id(device_id, state.rank)
  417. # Initialize and shard `FlatParamHandle`s one by one following reverse
  418. # depth-first order (i.e. reverse `.modules()` order), which represents a
  419. # reverse topological sort order. This avoids increasing peak GPU memory
  420. # usage when the unsharded model exists on CPU or meta device.
  421. # NOTE: This order differs from that followed by the wrapper path when
  422. # using auto wrapping, which also represents a valid reverse toplogical
  423. # sort order, but the difference does not matter.
  424. materialized_module = False
  425. for fully_sharded_module, (params, buffers) in reversed(
  426. fully_sharded_module_to_states.items()
  427. ):
  428. # Materialize the module if needed
  429. is_meta_module, is_torchdistX_deferred_init = _need_to_materialize_module(
  430. fully_sharded_module, state._ignored_params
  431. )
  432. if is_meta_module or is_torchdistX_deferred_init:
  433. materialized_module = True
  434. # Save the parameter and buffer names to reacquire references after
  435. # after materialization since their variables may change
  436. param_names, buffer_names = _get_state_names_for_states(
  437. fully_sharded_module, params, buffers
  438. )
  439. if (
  440. is_meta_module or is_torchdistX_deferred_init
  441. ) and param_init_fn is not None:
  442. _materialize_with_param_init_fn(fully_sharded_module, param_init_fn)
  443. elif is_meta_module:
  444. _materialize_meta_module(fully_sharded_module, device_id)
  445. elif is_torchdistX_deferred_init:
  446. deferred_init.materialize_module(
  447. root_module,
  448. check_fn=lambda _: True,
  449. )
  450. if materialized_module:
  451. # Reacquire references using the pre-computed state names
  452. params = [
  453. fully_sharded_module.get_parameter(param_name)
  454. for param_name in param_names
  455. ]
  456. buffers = [
  457. fully_sharded_module.get_buffer(buffer_name)
  458. for buffer_name in buffer_names
  459. ]
  460. _move_states_to_device(params, buffers, device_from_device_id)
  461. if not hasattr(state, "compute_device"): # only need to set once
  462. state.compute_device = _get_compute_device(
  463. fully_sharded_module,
  464. state._ignored_params,
  465. device_from_device_id,
  466. state.rank,
  467. )
  468. if sync_module_states:
  469. _sync_module_states(params, buffers, state.process_group)
  470. _init_param_handle_from_params(state, params, fully_sharded_module)
  471. # Reverse `_handles` to preserve depth-first `.modules()` order for
  472. # consistency with the wrapper path (namely, so that `_get_fsdp_handles()`
  473. # returns the same ordering for both paths).
  474. state._handles.reverse()
  475. return state
  476. @no_type_check
  477. def _init_param_handle_from_params(
  478. state: _FSDPState,
  479. params: List[nn.Parameter],
  480. fully_sharded_module: nn.Module,
  481. ):
  482. if len(params) == 0:
  483. return
  484. handle = FlatParamHandle(
  485. params,
  486. fully_sharded_module,
  487. state.compute_device,
  488. SHARDING_STRATEGY_MAP[state.sharding_strategy],
  489. state.cpu_offload.offload_params,
  490. state.mixed_precision.param_dtype,
  491. state.mixed_precision.reduce_dtype,
  492. state.mixed_precision.keep_low_precision_grads,
  493. state.process_group,
  494. state._use_orig_params,
  495. )
  496. # TODO: Can simplify call `shard()` in the `FlatParamHandle` ctor
  497. handle.shard()
  498. assert handle not in state._handles
  499. state.params.append(handle.flat_param)
  500. state._handles.append(handle)
  501. state._fully_sharded_module_to_handles[handle._fully_sharded_module].append(handle)
  502. num_fully_sharded_module_handles = len(
  503. state._fully_sharded_module_to_handles[handle._fully_sharded_module]
  504. )
  505. assert num_fully_sharded_module_handles == 1, (
  506. "The current design assumes a module manages at most one "
  507. f"`FlatParamHandle` but got {num_fully_sharded_module_handles}"
  508. )
  509. cpu_device = torch.device("cpu")
  510. if state.cpu_offload.offload_params and handle.flat_param.device != cpu_device:
  511. handle.flat_param_to(cpu_device)
  512. def _get_state_names_for_states(
  513. module: nn.Module,
  514. params: List[nn.Parameter],
  515. buffers: List[torch.Tensor],
  516. ) -> Tuple[List[str], List[str]]:
  517. """
  518. Returns the parameter and buffer names of the given ``params`` and
  519. ``buffers``, where the names are prefixed starting from ``module``. This
  520. function assumes that the parameters and buffers are in the module tree.
  521. """
  522. param_names: List[str] = []
  523. buffer_names: List[str] = []
  524. param_to_param_name = {
  525. param: param_name for param_name, param in module.named_parameters()
  526. }
  527. buffer_to_buffer_name = {
  528. buffer: buffer_name for buffer_name, buffer in module.named_buffers()
  529. }
  530. for param in params:
  531. assert (
  532. param in param_to_param_name
  533. ), f"Parameter not in the module tree:\n{module}\n{param}"
  534. param_names.append(param_to_param_name[param])
  535. for buffer in buffers:
  536. assert (
  537. buffer in buffer_to_buffer_name
  538. ), f"Buffer not in the module tree:\n{module}\n{buffer}"
  539. buffer_names.append(buffer_to_buffer_name[buffer])
  540. return param_names, buffer_names
  541. def _get_ignored_modules(
  542. root_module: nn.Module,
  543. _ignored_modules: Optional[Iterable[torch.nn.Module]],
  544. ) -> Set[nn.Module]:
  545. """
  546. Checks that ``_ignored_modules`` is an iterable of ``nn.Module`` s without
  547. any FSDP instances, and returns the modules contained in their module
  548. subtrees as a :class:`set`. Nested FSDP instances are excluded, but their
  549. already-computed ignored modules are included.
  550. ``_ignored_modules`` represents the argument passed by the user to FSDP.
  551. """
  552. msg_prefix = "`ignored_modules` should be an iterable of `torch.nn.Module`s "
  553. try:
  554. ignored_root_modules = (
  555. set(_ignored_modules) if _ignored_modules is not None else set()
  556. )
  557. except TypeError as e:
  558. raise TypeError(msg_prefix + f"but got {type(_ignored_modules)}") from e
  559. for module in ignored_root_modules:
  560. if not isinstance(module, torch.nn.Module):
  561. raise TypeError(msg_prefix + f"but got an iterable with {type(module)}")
  562. if isinstance(module, fsdp_file.FullyShardedDataParallel):
  563. # TODO: We may relax this by taking the FSDP instance's wrapped
  564. # module to provide more flexibility to the user.
  565. raise ValueError("`ignored_modules` should not include FSDP modules")
  566. # Treat modules that cannot compose with `fully_shard` as ignored modules,
  567. # meaning that their subtrees are ignored
  568. for module in root_module.modules():
  569. if not traversal_utils._composable(module):
  570. ignored_root_modules.add(module)
  571. # NOTE: Even if `ignored_root_modules` is empty, do not return early so
  572. # that this FSDP instance can get any ignored modules from its children.
  573. # Include child modules and exclude nested FSDP modules themselves
  574. ignored_modules = {
  575. child
  576. for module in ignored_root_modules
  577. for child in module.modules()
  578. if not isinstance(child, fsdp_file.FullyShardedDataParallel)
  579. }
  580. if root_module in ignored_modules:
  581. warnings.warn(
  582. "Trying to ignore the top-level module passed into the FSDP "
  583. "constructor itself will result in all parameters being "
  584. f"ignored and is not well-supported: {module}"
  585. )
  586. # Include nested FSDP modules' ignored modules
  587. for submodule in root_module.modules():
  588. optional_fsdp_state = _get_module_fsdp_state(submodule)
  589. if optional_fsdp_state is not None:
  590. assert hasattr(optional_fsdp_state, "_ignored_modules")
  591. ignored_modules.update(optional_fsdp_state._ignored_modules)
  592. return ignored_modules
  593. def _get_ignored_params(
  594. root_module: torch.nn.Module,
  595. ignored_modules: Set[torch.nn.Module],
  596. ignored_parameters: Optional[Iterable[torch.nn.Parameter]] = None,
  597. ) -> Set[torch.nn.Parameter]:
  598. """
  599. Returns the parameters of the modules in ``ignored_modules`` and
  600. the parameters in ``ignored_parameters``, excluding any :class:`FlatParameter` s.
  601. """
  602. all_ignored_params: Set[torch.nn.Parameter] = set()
  603. params_in_ignored_modules = {
  604. p for m in ignored_modules for p in m.parameters() if not _is_fsdp_flattened(p)
  605. }
  606. all_ignored_params.update(params_in_ignored_modules)
  607. if ignored_parameters is not None:
  608. params_in_ignored_parameters = {
  609. p for p in ignored_parameters if not _is_fsdp_flattened(p)
  610. }
  611. all_ignored_params.update(params_in_ignored_parameters)
  612. # Include nested FSDP modules' ignored parameters
  613. for submodule in root_module.modules():
  614. optional_fsdp_state = _get_module_fsdp_state(submodule)
  615. if optional_fsdp_state is not None:
  616. assert hasattr(optional_fsdp_state, "_ignored_params")
  617. all_ignored_params.update(optional_fsdp_state._ignored_params)
  618. return all_ignored_params
  619. def _get_buffer_names(root_module: nn.Module) -> Set[str]:
  620. """
  621. Returns the fully prefixed names of all buffers in the module hierarchy
  622. rooted at ``root_module`` as a class:`set`.
  623. """
  624. return {
  625. clean_tensor_name(buffer_name) for buffer_name, _ in root_module.named_buffers()
  626. }
  627. def _check_single_device_module(
  628. module: nn.Module,
  629. ignored_params: Set[nn.Parameter],
  630. ) -> None:
  631. """
  632. Raises an error if ``module`` has original parameters on multiple devices,
  633. ignoring the parameters in ``ignored_params``. Thus, after this method, the
  634. module must be either fully on the CPU or fully on a non-CPU device.
  635. """
  636. devices = {param.device for param in _get_orig_params(module, ignored_params)}
  637. if len(devices) > 1:
  638. raise RuntimeError(
  639. f"FSDP only supports single device modules but got params on {devices}"
  640. )
  641. def _get_device_from_device_id(
  642. device_id: Optional[Union[int, torch.device]],
  643. rank: int,
  644. ) -> Optional[torch.device]:
  645. """
  646. Processes ``device_id`` and returns either the corresponding device or
  647. ``None`` if ``device_id`` is ``None``.
  648. """
  649. if device_id is None:
  650. return None
  651. device = (
  652. device_id if isinstance(device_id, torch.device) else torch.device(device_id)
  653. )
  654. if device == torch.device("cuda"):
  655. warnings.warn(
  656. f"FSDP got the argument `device_id` {device_id} on rank "
  657. f"{rank}, which does not have an explicit index. "
  658. f"FSDP will use the current device {torch.cuda.current_device()}. "
  659. "If this is incorrect, please explicitly call `torch.cuda.set_device()` "
  660. "before FSDP initialization or pass in the explicit device "
  661. "index as the `device_id` argument."
  662. )
  663. device = torch.device("cuda", torch.cuda.current_device())
  664. return device
  665. def _need_to_materialize_module(
  666. module: nn.Module,
  667. ignored_params: Set[nn.Parameter],
  668. ) -> Tuple[bool, bool]:
  669. """
  670. Returns if ``module`` has parameters on meta device and if ``module`` is
  671. using torchdistX deferred initialization. At most of the returned bools can
  672. be ``True``. If either is ``True``, then ``module`` needs to be
  673. materialized.
  674. """
  675. managed_params = _get_orig_params(module, ignored_params)
  676. is_meta_module = any(param.is_meta for param in managed_params)
  677. is_torchdistX_deferred_init = (
  678. not is_meta_module
  679. and _TORCHDISTX_AVAIL
  680. and any(fake.is_fake(param) for param in managed_params)
  681. )
  682. return is_meta_module, is_torchdistX_deferred_init
  683. def _materialize_with_param_init_fn(
  684. module: nn.Module,
  685. param_init_fn,
  686. ) -> None:
  687. if not callable(param_init_fn):
  688. raise ValueError(
  689. f"Expected {param_init_fn} to be callable but got {type(param_init_fn)}"
  690. )
  691. param_init_fn(module)
  692. def _materialize_meta_module(
  693. module: nn.Module,
  694. device_from_device_id: Optional[torch.device],
  695. ):
  696. # Run default meta device initialization
  697. materialization_device = device_from_device_id or torch.device(
  698. torch.cuda.current_device()
  699. )
  700. module.to_empty(device=materialization_device)
  701. try:
  702. with torch.no_grad():
  703. module.reset_parameters() # type: ignore[operator]
  704. except BaseException as e:
  705. warnings.warn(
  706. "Unable to call `reset_parameters()` for module on meta "
  707. f"device with error {str(e)}. Please ensure your "
  708. "module implements a `reset_parameters()` method."
  709. )
  710. raise e
  711. def _move_module_to_device(
  712. module: nn.Module,
  713. ignored_params: Set[nn.Parameter],
  714. device_from_device_id: Optional[torch.device],
  715. ) -> None:
  716. """
  717. Moves ``module`` depending on ``device_from_device_id`` and its current
  718. device. This includes moving ignored modules' parameters.
  719. - If ``device_from_device_id`` is not ``None``, then this moves
  720. ``module`` to the device.
  721. - If ``device_from_device_id`` is ``None``, then this does not move
  722. ``module`` but warns the user if it is on CPU.
  723. Precondition: ``_check_single_device_module()``.
  724. """
  725. param = next(_get_orig_params(module, ignored_params), None)
  726. if param is None:
  727. return # no original parameters to manage
  728. cpu_device = torch.device("cpu")
  729. if device_from_device_id is not None:
  730. if param.device == cpu_device:
  731. # NOTE: This includes moving ignored modules' parameters.
  732. module = module.to(device_from_device_id)
  733. # TODO: This is a temporary fix to move already-constructed
  734. # `FlatParameter`s back to CPU if needed. This is needed to
  735. # make CPU offload work with `device_id`.
  736. for submodule in module.modules():
  737. if (
  738. isinstance(submodule, fsdp_file.FullyShardedDataParallel)
  739. and submodule.cpu_offload.offload_params
  740. ):
  741. for handle in submodule._handles:
  742. handle.flat_param_to(torch.device("cpu"))
  743. elif param.device == cpu_device:
  744. _warn_cpu_init()
  745. def _move_states_to_device(
  746. params: List[nn.Parameter],
  747. buffers: List[torch.Tensor],
  748. device_from_device_id: Optional[torch.device],
  749. ) -> None:
  750. """
  751. Precondition: ``_check_single_device_module()``.
  752. """
  753. if len(params) == 0 and len(buffers) == 0:
  754. return
  755. if len(params) > 0:
  756. current_device = params[0].device
  757. elif len(buffers) > 0:
  758. current_device = buffers[0].device
  759. cpu_device = torch.device("cpu")
  760. if device_from_device_id is not None:
  761. # Move the parameters and buffers like the `.data` code path in
  762. # `nn.Module._apply()`, which underlies `nn.Module.to()`
  763. for param in params:
  764. with torch.no_grad():
  765. param.data = param.to(device_from_device_id)
  766. if param.grad is not None:
  767. param.grad.data = param.grad.to(device_from_device_id)
  768. for buffer in buffers:
  769. buffer.data = buffer.to(device_from_device_id)
  770. elif current_device == cpu_device:
  771. _warn_cpu_init()
  772. def _warn_cpu_init():
  773. warnings.warn(
  774. "The passed-in `module` is on CPU and will thus have FSDP's sharding "
  775. "initialization run on CPU, which may be slower than on GPU. We "
  776. "recommend passing in the `device_id` argument for FSDP to move "
  777. "`module` to GPU for the sharding initialization. `module` must also "
  778. "be on GPU device to work with the `sync_module_states=True` flag "
  779. "since that requires GPU communication."
  780. )
  781. def _get_compute_device(
  782. module: nn.Module,
  783. ignored_params: Set[nn.Parameter],
  784. device_from_device_id: Optional[torch.device],
  785. rank: int,
  786. ) -> torch.device:
  787. """
  788. Determines and returns this FSDP instance's compute device. If the module
  789. is already on a non-CPU device, then the compute device is that non-CPU
  790. device. If the module is on CPU, then the compute device is the current
  791. device.
  792. Since this method should be called after materializing the module, any
  793. non-CPU device should not be meta device. For now, the compute device is
  794. always a CUDA GPU device with its explicit index.
  795. Precondition: ``_check_single_device_module()`` and
  796. ``_move_module_to_device()``.
  797. """
  798. # If the module is on GPU already, then that GPU device has priority
  799. # over the current device
  800. param = next(_get_orig_params(module, ignored_params), None)
  801. if param is not None and param.device.type == "cuda":
  802. compute_device = param.device
  803. else:
  804. compute_device = torch.device("cuda", torch.cuda.current_device())
  805. if device_from_device_id is not None and compute_device != device_from_device_id:
  806. raise ValueError(
  807. f"Inconsistent compute device and `device_id` on rank {rank}: "
  808. f"{compute_device} vs {device_from_device_id}"
  809. )
  810. return compute_device
  811. # TODO: See how to deprecate!
  812. def _sync_module_params_and_buffers(
  813. module: nn.Module,
  814. params: List[nn.Parameter],
  815. process_group: dist.ProcessGroup,
  816. ) -> None:
  817. """
  818. Synchronizes module states (i.e. parameters ``params`` and all
  819. not-yet-synced buffers) by broadcasting from rank 0 to all ranks.
  820. Precondition: ``sync_module_states == True`` and ``self.process_group`` has
  821. been set.
  822. """
  823. _check_params_for_sync_module_states(params)
  824. module_states: List[torch.Tensor] = []
  825. for buffer in module.buffers():
  826. # Avoid re-synchronizing buffers in case of nested wrapping
  827. if not getattr(buffer, FSDP_SYNCED, False):
  828. setattr(buffer, FSDP_SYNCED, True)
  829. module_states.append(buffer.detach())
  830. module_states.extend(param.detach() for param in params)
  831. _sync_params_and_buffers(
  832. process_group,
  833. module_states,
  834. PARAM_BROADCAST_BUCKET_SIZE,
  835. src=0,
  836. )
  837. def _sync_module_states(
  838. params: List[nn.Parameter],
  839. buffers: List[torch.Tensor],
  840. process_group: dist.ProcessGroup,
  841. ) -> None:
  842. _check_params_for_sync_module_states(params)
  843. # Assumes that each call to this method passes in disjoint `params` and
  844. # and `buffers` across calls, so there is no chance of re-synchronizing
  845. params_and_buffers = [param.detach() for param in params] + [
  846. buffer.detach() for buffer in buffers
  847. ]
  848. _sync_params_and_buffers(
  849. process_group,
  850. params_and_buffers,
  851. PARAM_BROADCAST_BUCKET_SIZE,
  852. src=0,
  853. )
  854. def _check_params_for_sync_module_states(
  855. params: List[nn.Parameter],
  856. ) -> None:
  857. if params and any(param.device == torch.device("cpu") for param in params):
  858. raise ValueError(
  859. "The module has CPU parameters when `sync_module_states=True`, "
  860. "which only works when all parameters are on GPU. Please specify "
  861. "the `device_id` argument or move the module to GPU before passing "
  862. "into FSDP."
  863. )
  864. def _get_orig_params(
  865. module: nn.Module,
  866. ignored_params: Set[nn.Parameter],
  867. ) -> Iterator[nn.Parameter]:
  868. """
  869. Returns an iterator over the original parameters in ``module``, ignoring
  870. the parameters in ``ignored_params``, any ``FlatParameter`` s (which may be
  871. present due to nested FSDP wrapping), and any original parameters already
  872. flattened (only relevant when ``use_orig_params=True``).
  873. """
  874. param_gen = module.parameters()
  875. try:
  876. while True:
  877. param = next(param_gen)
  878. if param not in ignored_params and not _is_fsdp_flattened(param):
  879. yield param
  880. except StopIteration:
  881. pass
  882. def _check_orig_params_flattened(
  883. fsdp_module,
  884. ignored_params: Set[nn.Parameter],
  885. ) -> None:
  886. """
  887. Checks that all original parameters have been flattened and hence made
  888. invisible to ``named_parameters()`` for the module hierarchy rooted at
  889. ``fsdp_module``. This should be called as a sanity check after flattening
  890. the wrapped module's parameters.
  891. """
  892. for param_name, param in fsdp_module.named_parameters():
  893. if param not in ignored_params and not _is_fsdp_flattened(param):
  894. raise RuntimeError(
  895. f"Found an unflattened parameter: {param_name}; "
  896. f"{param.size()} {param.__class__}"
  897. )
  898. def _get_default_comm_hook(sharding_strategy: ShardingStrategy):
  899. return (
  900. default_hooks.allreduce_hook
  901. if sharding_strategy == ShardingStrategy.NO_SHARD
  902. else default_hooks.reduce_scatter_hook
  903. )
  904. def _get_default_comm_hook_state(
  905. process_group: dist.ProcessGroup,
  906. ) -> default_hooks.DefaultState:
  907. return default_hooks.DefaultState(process_group=process_group)