flat_param.py 92 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060
  1. import contextlib
  2. import warnings
  3. from enum import auto, Enum
  4. from itertools import accumulate, chain
  5. from typing import (
  6. Any,
  7. Dict,
  8. Generator,
  9. Iterator,
  10. List,
  11. NamedTuple,
  12. no_type_check,
  13. Optional,
  14. Sequence,
  15. Set,
  16. Tuple,
  17. Union,
  18. )
  19. import torch
  20. import torch.distributed as dist
  21. import torch.nn as nn
  22. import torch.nn.functional as F
  23. from torch import Tensor
  24. from torch.distributed._tensor import DTensor
  25. from torch.distributed.fsdp._common_utils import (
  26. _set_fsdp_flattened,
  27. HandleTrainingState,
  28. )
  29. from ._fsdp_extensions import _ext_post_unflatten_transform, _ext_pre_flatten_transform
  30. from ._utils import (
  31. _alloc_storage,
  32. _free_storage,
  33. _no_dispatch_record_stream,
  34. _same_storage,
  35. p_assert,
  36. )
  37. __all__ = [
  38. "FlatParameter",
  39. "FlatParamHandle",
  40. "FlatParamShardMetadata",
  41. "ParamInfo",
  42. "SharedParamInfo",
  43. "HandleShardingStrategy",
  44. ]
  45. """
  46. [Note: Fully Sharded Module]
  47. We define the "fully sharded module" to be the original ``nn.Module`` that owns
  48. a ``FlatParamHandle``. It is the *single* module logically responsible for the
  49. *single* unshard/reshard pair for the handle's ``FlatParameter`` for a given
  50. forward or backward pass. The fully sharded module should be passed to the
  51. ``FlatParamHandle`` constructor.
  52. For the wrapper code path:
  53. - The ``FullyShardedDataParallel`` module wrapping the fully sharded module
  54. runs the unshard/reshard on behalf of the fully sharded module by overriding
  55. ``nn.Module.forward``.
  56. - The fully sharded module is exactly the module passed to the
  57. ``FullyShardedDataParallel`` constructor's ``module`` argument.
  58. For the non-wrapper code path:
  59. - Hooks registered on the fully sharded module run the unshard/reshard.
  60. - The fully sharded module may either be the direct argument to ``fully_shard``
  61. or a submodule chosen by the provided wrapping policy.
  62. """
  63. class ParamInfo(NamedTuple):
  64. """Information for an original module parameter."""
  65. param_name: str # unprefixed
  66. module: nn.Module
  67. module_name: str
  68. class SharedParamInfo(NamedTuple):
  69. """
  70. Additional information for a shared parameter.
  71. For each shared parameter, we designate one module and its parameter
  72. variable to be the primary owner, determined as the first one encountered
  73. in the parameter walk. These are prefixed with "prim". The primary module
  74. and parameter do not have their own :class:`SharedParamInfo` instance.
  75. """
  76. param_name: str # unprefixed
  77. module: nn.Module
  78. module_name: str
  79. prim_param_name: str # unprefixed
  80. prim_module: nn.Module
  81. prim_module_name: str
  82. class FlatParamShardMetadata(NamedTuple):
  83. """
  84. This holds metadata specific to this rank's shard of the flattened
  85. parameter.
  86. Attributes:
  87. param_names (Tuple[str, ...]): Prefixed parameter names of this rank's
  88. shard of the parameters; see :class:`FlatParameter`.
  89. param_shapes (Tuple[torch.Size, ...]): Parameter shapes of this rank's
  90. shard of the parameters; see :class:`FlatParameter`.
  91. param_numels (Tuple[int, ...]): Parameter numels of this rank's shard
  92. of the parameters; see :class:`FlatParameter`.
  93. param_offsets (Tuple[Tuple[int, int], ...]): [start, end] offsets (in
  94. units of numels) giving this rank's part of each flattened
  95. original module parameter.
  96. """
  97. param_names: Tuple[str, ...]
  98. param_shapes: Tuple[torch.Size, ...]
  99. param_numels: Tuple[int, ...]
  100. param_offsets: Tuple[Tuple[int, int], ...]
  101. # TODO (awgu): Prefix these with "Handle" for now to avoid circular imports and
  102. # inadvertent misuses; coalesce with those in fully_sharded_data_parallel.py
  103. # later
  104. class HandleShardingStrategy(Enum):
  105. FULL_SHARD = auto()
  106. SHARD_GRAD_OP = auto()
  107. NO_SHARD = auto()
  108. HYBRID_SHARD = auto()
  109. _HYBRID_SHARD_ZERO2 = auto()
  110. class FlatParameter(nn.Parameter):
  111. """
  112. This is the flattened parameter used by :class:`FullyShardedDataParallel`.
  113. It is comprised of one or more original parameters, which are flattened
  114. and concatenated to construct the flattened parameter.
  115. Under the current design, this parameter logically represents both the
  116. unsharded and sharded flattened parameter, and its data changes storages
  117. dynamically.
  118. - In the :class:`FullyShardedDataParallel` constructor, the parameter
  119. is initialized as unsharded and then sharded in-place.
  120. - At runtime, the parameter is lazily (re)-initialized. The sharded
  121. parameter data is saved in ``self._local_shard``, and a new ``Tensor``
  122. ``self._full_param_padded`` is created, which is the all-gather
  123. destination and owns the unsharded parameter storage thereafter. (See
  124. :meth:`FlatParamHandle.init_flat_param_attributes`.)
  125. - Throughout runtime, the parameter data changes storages as needed,
  126. e.g. to the sharded flattened parameter, reduced-precision sharded
  127. flattened parameter, or the unsharded flattened parameter.
  128. Attributes:
  129. _unpadded_unsharded_size (torch.Size): Unsharded flattened parameter's
  130. size without padding.
  131. _padded_unsharded_size (torch.Size): Unsharded flattened parameter's
  132. size with padding. This is only set for sharded strategies since
  133. they require padding for the all-gather.
  134. _sharded_size (torch.Size): Sharded flattened parameter's size with
  135. padding. This is also set for ``NO_SHARD``, in which case it is the
  136. same as the unsharded sizes. (We omit "padded" because there is no
  137. analogous unpadded one.)
  138. _param_infos (Tuple[ParamInfo, ...]): Each parameter's parameter info
  139. entry; see :class:`ParamInfo`.
  140. _numels (Tuple[int, ...]): Each parameter's numel.
  141. _shapes (Tuple[torch.Size, ...]): Each parameter's shape.
  142. _fqns (Tuple[str, ...]): The original parameters' FQNs prefixed from
  143. the owning handle's ``_fully_sharded_module``. The names are
  144. guaranteed to be unique within the subtree rooted at that module.
  145. _num_params (int): Number of original parameters flattened into this
  146. flattened parameter; this is the length of ``_param_infos``,
  147. ``_numels``, ``_shapes``, and ``_fqns``.
  148. _shared_param_infos (Tuple[SharedParamInfo, ...]): Shared parameter
  149. info entries; see :class:`SharedParamInfo`.
  150. _param_extensions (Tuple[Optional[Any], ...]): Parameter extensions
  151. (i.e. some per-parameter state) used to customize pre-flatten and
  152. post-unflatten behavior. This is experimental, and users should not
  153. depend on its existence in the future.
  154. _modules (Set[nn.Module]): Modules that contain some original parameter
  155. that is flattened into the ``FlatParameter``.
  156. _shard_param_offsets (List[Tuple[int, int])): [start, end] offsets (in
  157. units of numel) giving this rank's part of each flattened original
  158. module parameter; for any parameter ``p`` that is not sharded
  159. across ranks, this will be [0, ``p.numel()``-1].
  160. _shard_indices (Tuple[int, int]): [start, end] indices (in units of
  161. parameters) for this rank's shard of the original model parameters,
  162. where the parameters follow the order in which they were originally
  163. flattened; this indexes appropriately into any data structure that
  164. follows the flattening order (e.g. ``_param_infos``, ``_numels``,
  165. etc.).
  166. _shard_numel_padded (int): Numel padded for this rank's sharded
  167. flattened parameter.
  168. _local_shard (Tensor): Sharded flattened parameter with padding if
  169. using a sharded strategy. If using ``NO_SHARD``, then this is the
  170. unpadded unsharded flattened parameter, and there is no notion of a
  171. sharded flattened parameter or padded unsharded flattened
  172. parameter.
  173. _full_param_padded (Tensor): Unsharded flattened parameter with
  174. padding. This is not defined for ``NO_SHARD``. When using mixed
  175. precision for parameters, this has the low precision.
  176. _full_prec_full_param_padded (Tensor): Full precision unsharded
  177. flattened parameter with padding. This is used for unsharding
  178. outside of computation when using mixed precision for parameters.
  179. This is never defined for ``NO_SHARD``.
  180. _post_backward_hook_state (Tuple[AccumulateGrad, RemovableHandle]):
  181. Flattened parameter's :class:`AccumulateGrad` object and
  182. post-backward hook handle.
  183. _mp_shard (Tensor): Low precision sharded flattened parameter with
  184. padding. This is only defined when parameter mixed precision is
  185. enabled. For ``NO_SHARD``, this is used for computation.
  186. _cpu_grad (Tensor): Sharded gradient with padding stored on CPU.
  187. This is only defined when offloading parameters is enabled.
  188. _saved_grad_shard (Tensor): Sharded gradient with padding from previous
  189. iterations for gradient accumulation without :meth:`no_sync`.
  190. _params (Optional[List[nn.Parameter]]): The original parameter
  191. variables if ``use_orig_params=True`` and ``None`` otherwise.
  192. _shared_params (Optional[List[nn.Parameter]]): The original shared
  193. parameter variables if ``use_orig_params=True`` and ``None``
  194. otherwise.
  195. _tensors (Optional[List[Optional[Tensor]]]): This saves the ``Tensor``
  196. views created in the forward and tracked by autograd when
  197. ``use_orig_params=True`` and is ``None`` otherwise. This is to
  198. preserve those ``Tensor`` variables for the backward to ensure that
  199. the ``FlatParameter`` 's ``AccumulateGrad`` object does not change
  200. in which case the post-backward hook does not run. This is relevant
  201. for cases like reentrant activation checkpointing.
  202. _is_grad_none (Optional[List[bool]]): A mask over the original
  203. parameters' gradients indicating if it is logically ``None`` or not
  204. if ``use_orig_params=True`` and ``None`` otherwise. This is needed
  205. because only some of the parameters may have ``None`` gradient, in
  206. which case the ``FlatParameter`` gradient must be non-``None`` and
  207. must use zeros to approximate those original ``None`` gradients.
  208. This mask informs FSDP to set the original parameter gradients to
  209. ``None`` (instead of zeros) as needed.
  210. """
  211. def _init_metadata(
  212. self,
  213. param_infos: List[ParamInfo],
  214. numels: List[int],
  215. shapes: List[torch.Size],
  216. fqns: List[str],
  217. shared_param_infos: List[SharedParamInfo],
  218. param_extensions: List[Any],
  219. params: Optional[List[nn.Parameter]],
  220. shared_params: Optional[List[nn.Parameter]],
  221. ) -> None:
  222. """
  223. Initializes attributes holding metadata about the original parameters
  224. comprising the flattened parameter.
  225. We expose this method separate from the constructor to keep the
  226. constructor only responsible for the flattened parameter's tensor data.
  227. This method should only be called once per model, while the constructor
  228. may be called multiple times, e.g. when reloading from a checkpoint, in
  229. which case only the tensor data needs to be passed to the constructor.
  230. Since :meth:`load_state_dict` is implemented via :meth:`copy_`, the
  231. metadata is correctly assumed to be unchanged.
  232. Args:
  233. See the Attributes in the class docstring.
  234. """
  235. assert len(param_infos) == len(numels)
  236. assert len(param_infos) == len(shapes)
  237. assert len(param_infos) == len(fqns)
  238. assert len(param_infos) == len(param_extensions)
  239. self._num_params = len(param_infos)
  240. self._param_infos = tuple(param_infos)
  241. self._numels = tuple(numels)
  242. self._shapes = tuple(shapes)
  243. self._fqns = tuple(fqns)
  244. self._shared_param_infos = tuple(shared_param_infos)
  245. self._param_extensions = tuple(param_extensions)
  246. self._modules = {pi.module for pi in self._param_infos}.union(
  247. {spi.module for spi in self._shared_param_infos}
  248. )
  249. assert (params is None) == (shared_params is None)
  250. if params is not None:
  251. assert shared_params is not None and len(shared_params) == len(
  252. shared_param_infos
  253. )
  254. self._params: Optional[List[nn.Parameter]] = params
  255. self._shared_params: Optional[List[nn.Parameter]] = shared_params
  256. # Mark the original parameters to avoid flattening them into
  257. # another `FlatParameter` during recursive construction
  258. for param in chain(self._params, self._shared_params):
  259. _set_fsdp_flattened(param)
  260. self._is_grad_none: Optional[List[bool]] = [
  261. False for _ in range(len(params))
  262. ]
  263. self._tensors: Optional[List[Optional[Tensor]]] = [
  264. None for _ in range(len(self._params))
  265. ]
  266. else:
  267. self._params = None
  268. self._shared_params = None
  269. self._is_grad_none = None
  270. self._tensors = None
  271. self._unpadded_unsharded_size = self.size()
  272. _set_fsdp_flattened(self)
  273. # Tracks whether the `FlatParameter`'s post-backward hook has been
  274. # called to modify the behavior of the post-backward callback
  275. self._post_backward_called = False
  276. class FlatParamHandle:
  277. """
  278. This handle manages a flattened parameter (:class:`FlatParameter`). This
  279. includes sharding and view management.
  280. Args:
  281. params (Sequence[nn.Parameter]): The parameters to use for the
  282. flattened parameter.
  283. fully_sharded_module (nn.Module): See [Note: Fully Sharded Module].
  284. device (torch.device): The compute and communication device, which
  285. should be a non-CPU device. We refer to it as the compute device.
  286. sharding_strategy (ShardingStrategy): Sharding strategy to apply to
  287. this handle's ``FlatParameter``.
  288. offload_params (bool): Whether to offload the handle's
  289. ``FlatParameter`` to CPU.
  290. mp_param_dtype (Optional[torch.dtype]): Parameter mixed precision
  291. setting passed to the FSDP constructor.
  292. mp_reduce_dtype (Optional[torch.dtype]): Gradient reduction mixed
  293. precision setting passed to the FSDP constructor.
  294. keep_low_precision_grads (bool): Whether to keep gradients in low
  295. precision.
  296. use_orig_params (bool): If ``True``, then FSDP preserves the original
  297. parameter variables and returns them from ``named_parameters()``
  298. (e.g. to support different optimizer hyperparameters within one
  299. :class:`FlatParameter`). If ``False``, then FSDP reconstructs the
  300. parameter every iteration and returns the :class:`FlatParameter` s
  301. from ``named_parameters()``.
  302. """
  303. ##################
  304. # INITIALIZATION #
  305. ##################
  306. def __init__(
  307. self,
  308. params: Sequence[nn.Parameter],
  309. fully_sharded_module: nn.Module,
  310. device: torch.device,
  311. sharding_strategy: HandleShardingStrategy,
  312. offload_params: bool,
  313. mp_param_dtype: Optional[torch.dtype],
  314. mp_reduce_dtype: Optional[torch.dtype],
  315. keep_low_precision_grads: bool,
  316. process_group: dist.ProcessGroup,
  317. use_orig_params: bool,
  318. ):
  319. super().__init__()
  320. self.device = device
  321. self.process_group = process_group
  322. self.rank = process_group.rank()
  323. self.world_size = process_group.size()
  324. self._sharding_strategy = sharding_strategy
  325. self._offload_params = offload_params
  326. self._use_orig_params = use_orig_params
  327. self._keep_low_precision_grads = keep_low_precision_grads
  328. self._training_state = HandleTrainingState.IDLE
  329. self._debug_level = dist.get_debug_level()
  330. self._fully_sharded_module = fully_sharded_module
  331. self._init_flat_param(params, fully_sharded_module, use_orig_params)
  332. self._orig_param_dtype = self.flat_param.dtype
  333. self._use_unsharded_views(as_params=False)
  334. self._init_param_reduce_dtypes(mp_param_dtype, mp_reduce_dtype)
  335. def _init_flat_param(
  336. self,
  337. params: Sequence[Optional[nn.Parameter]],
  338. module: nn.Module,
  339. use_orig_params: bool,
  340. ) -> None:
  341. """
  342. Initializes the flattened parameter ``self.flat_param`` by flattening
  343. the parameters in ``params`` into a single :class:`FlatParameter` and
  344. saves relevant metadata. Shared parameters are only included in the
  345. flattened parameter once.
  346. This checks that all comprising parameters have the same dtype and
  347. ``requires_grad`` and does not support nested construction of
  348. :class:`FlatParameter` s.
  349. Args:
  350. See the Args in the class docstring.
  351. """
  352. params_set = set(params)
  353. params_set.discard(None)
  354. if len(params_set) == 0:
  355. raise ValueError(
  356. "Cannot initialize a `FlatParameter` from an empty parameter list"
  357. )
  358. param_infos: List[ParamInfo] = []
  359. numels: List[int] = []
  360. shapes: List[torch.Size] = []
  361. fqns: List[str] = []
  362. shared_param_infos: List[SharedParamInfo] = []
  363. shared_param_memo: Dict[nn.Parameter, Tuple[nn.Module, str, str]] = {}
  364. params_to_flatten: List[Union[torch.Tensor, nn.Parameter]] = []
  365. shared_params: List[Union[torch.Tensor, nn.Parameter]] = []
  366. param_extensions: List[Any] = []
  367. dtype: Optional[torch.dtype] = None
  368. requires_grad: Optional[bool] = None
  369. for submodule_name, submodule in module.named_modules():
  370. for param_name, param in submodule.named_parameters(recurse=False):
  371. if param not in params_set:
  372. continue
  373. if param in shared_param_memo: # shared reference
  374. prim_module, prim_module_name, prim_param_name = shared_param_memo[
  375. param
  376. ]
  377. shared_params.append(param)
  378. shared_param_infos.append(
  379. SharedParamInfo(
  380. param_name,
  381. submodule,
  382. submodule_name,
  383. prim_param_name,
  384. prim_module,
  385. prim_module_name,
  386. )
  387. )
  388. else:
  389. if type(param) is FlatParameter:
  390. raise ValueError("`FlatParameter` does not support nesting")
  391. if dtype is not None and param.dtype != dtype:
  392. raise ValueError(
  393. "`FlatParameter` requires uniform dtype but got "
  394. f"{dtype} and {param.dtype}"
  395. )
  396. if dtype is None and not param.is_floating_point():
  397. raise ValueError("Integer parameters are unsupported")
  398. if (
  399. requires_grad is not None
  400. and param.requires_grad != requires_grad
  401. ):
  402. raise ValueError(
  403. "`FlatParameter` requires uniform `requires_grad`"
  404. )
  405. param, extension = _ext_pre_flatten_transform(param)
  406. param_extensions.append(extension)
  407. dtype = param.dtype
  408. requires_grad = param.requires_grad
  409. shared_param_memo[param] = (submodule, submodule_name, param_name)
  410. params_to_flatten.append(param)
  411. param_infos.append(ParamInfo(param_name, submodule, submodule_name))
  412. numels.append(param.numel())
  413. shapes.append(param.shape)
  414. fqn = (
  415. submodule_name + "." + param_name
  416. if submodule_name
  417. else param_name
  418. )
  419. fqns.append(fqn)
  420. assert requires_grad is not None, (
  421. "Passed-in `params` were not found in the module tree\n"
  422. f"params: {params}\nmodule: {module}"
  423. )
  424. self.flat_param = FlatParamHandle.flatten_params(
  425. params_to_flatten, requires_grad
  426. )
  427. # For `use_orig_params=True`, ensure that the logical parameters are
  428. # `nn.Parameter`s (and not plain `torch.Tensor`)
  429. def convert_to_params(
  430. tensors: List[Union[torch.Tensor, nn.Parameter]]
  431. ) -> List[nn.Parameter]:
  432. return [
  433. t if isinstance(t, nn.Parameter) else nn.Parameter(t) for t in tensors
  434. ]
  435. self.flat_param._init_metadata(
  436. param_infos,
  437. numels,
  438. shapes,
  439. fqns,
  440. shared_param_infos,
  441. param_extensions,
  442. convert_to_params(params_to_flatten) if use_orig_params else None,
  443. convert_to_params(shared_params) if use_orig_params else None,
  444. )
  445. @staticmethod
  446. def flatten_params(
  447. params: Sequence[torch.Tensor],
  448. requires_grad: bool,
  449. ) -> FlatParameter:
  450. """
  451. Flattens the parameters in ``params`` into a single
  452. :class:`FlatParameter`. This should be the only way used to construct
  453. :class:`FlatParameter` s.
  454. We expose this factory method for checkpointing (e.g. sharded state
  455. dict). The flattened parameter's metadata should only be initialized
  456. once (see :meth:`_init_metadata`), but its tensor data may be reloaded.
  457. """
  458. with torch.no_grad():
  459. flat_params = [
  460. p.detach().reshape(-1) if isinstance(p, nn.Parameter) else p.reshape(-1)
  461. for p in params
  462. ]
  463. flat_param_data = torch.cat(flat_params, dim=0)
  464. flat_param = FlatParameter(flat_param_data, requires_grad=requires_grad)
  465. return flat_param
  466. def _init_param_reduce_dtypes(
  467. self,
  468. mp_param_dtype: Optional[torch.dtype],
  469. mp_reduce_dtype: Optional[torch.dtype],
  470. ) -> None:
  471. """
  472. Precondition: ``self.flat_param`` is set via :meth:`_init_flat_param`.
  473. This ensures that this handle's parameters have a single dtype.
  474. Postcondition: This sets ``self._fwd_bwd_param_dtype`` and
  475. ``self._reduce_dtype``. If ``mp_param_dtype`` or ``mp_reduce_dtype``
  476. is ``None``, then we assume the original parameter dtype. One special
  477. case is if ``mp_param_dtype`` is not ``None`` and ``mp_reduce_dtype``
  478. is ``None``, in which case we assume the gradient reduction dtype
  479. matches the forward/backward parameter dtype.
  480. """
  481. # Save whether these dtypes were specified so that we permit the
  482. # parameter dtype to change up until the lazy initialization
  483. self._low_prec_param_dtype_specified = mp_param_dtype is not None
  484. self._low_prec_reduce_dtype_specified = mp_reduce_dtype is not None
  485. if (
  486. self._low_prec_param_dtype_specified
  487. and not self._low_prec_reduce_dtype_specified
  488. ):
  489. # Special case: infer gradient reduction mixed precision
  490. self._fwd_bwd_param_dtype = mp_param_dtype
  491. self._reduce_dtype = self._fwd_bwd_param_dtype
  492. else:
  493. self._fwd_bwd_param_dtype = mp_param_dtype or self._orig_param_dtype
  494. self._reduce_dtype = mp_reduce_dtype or self._orig_param_dtype
  495. assert self._fwd_bwd_param_dtype is not None
  496. assert self._reduce_dtype is not None
  497. ###################################
  498. # SHARD INITIALIZATION & METADATA #
  499. ###################################
  500. @torch.no_grad()
  501. def shard(self):
  502. """
  503. Shards the handle's ``FlatParameter``. In terms of memory, this
  504. allocates new memory for the sharded flattened parameter and frees the
  505. unsharded flattened parameter's storage.
  506. Postcondition: ``self.flat_param`` is the sharded flattened parameter.
  507. Shard metadata attributes are set for all sharding strategies.
  508. ``process_group``, ``rank``, and ``world_size`` attributes are set if
  509. using a sharded strategy.
  510. """
  511. flat_param = self.flat_param
  512. if not self.uses_sharded_strategy:
  513. self._init_shard_metadata(0, 0, flat_param.numel() - 1)
  514. else:
  515. p_assert(
  516. flat_param.storage_offset() == 0,
  517. "The `FlatParameter` is not the sole occupant of its storage",
  518. )
  519. orig_storage = flat_param._typed_storage()
  520. sharded_flat_param, numel_padded = FlatParamHandle._get_shard(
  521. flat_param, self.rank, self.world_size
  522. )
  523. flat_param.set_(sharded_flat_param) # type: ignore[call-overload]
  524. start = sharded_flat_param.numel() * self.rank
  525. end = sharded_flat_param.numel() * (self.rank + 1) - 1 # inclusive
  526. self._init_shard_metadata(numel_padded, start, end)
  527. if orig_storage._size() > 0:
  528. orig_storage._resize_(0)
  529. if self._use_orig_params:
  530. self._use_sharded_views()
  531. def _init_shard_metadata(
  532. self,
  533. numel_padded: int,
  534. start: int,
  535. end: int,
  536. ) -> None:
  537. """
  538. Initializes shard-related metadata for this rank's shard of the
  539. flattened parameter: ``_sharded_size``, ``_shard_param_offsets``,
  540. ``_shard_indices``, and ``_shard_numel_padded``.
  541. Args:
  542. numel_padded (int): Numel padded for this rank's sharded flattened
  543. parameter.
  544. start (int): Start index in the sharded flattened parameter
  545. assigned to this rank.
  546. end (int): End index (inclusive) in the sharded flattened parameter
  547. assigned to this rank. If this exceeds the sharded flattened
  548. parameter's numel, then it is truncated.
  549. Precondition: ``self.flat_param`` 's data is the sharded flattened
  550. parameter.
  551. """
  552. self.flat_param._sharded_size = self.flat_param.size() # type: ignore[attr-defined]
  553. sharded_flat_param_numel = self.flat_param.numel() # includes `numel_padded`
  554. p_assert(start >= 0 and start <= end, f"start: {start} end: {end}")
  555. p_assert(
  556. numel_padded <= sharded_flat_param_numel,
  557. f"numel_padded: {numel_padded} "
  558. f"sharded_flat_param_numel: {sharded_flat_param_numel}",
  559. )
  560. (
  561. self.flat_param._shard_param_offsets, # type: ignore[attr-defined]
  562. self.flat_param._shard_indices, # type: ignore[attr-defined]
  563. ) = self._get_shard_metadata(start, end)
  564. self.flat_param._shard_numel_padded = numel_padded # type: ignore[attr-defined]
  565. def _get_shard_metadata(
  566. self,
  567. start: int,
  568. end: int,
  569. ) -> Tuple[Tuple[Tuple[int, int], ...], Tuple[int, int]]:
  570. """
  571. Computes the shard metadata based on ``start`` and ``end``, which give
  572. the closed interval of the unsharded flattened parameter specifying the
  573. shard.
  574. Args:
  575. start (int): Start index (in units of numel) of this rank's shard
  576. of the flattened parameter.
  577. end (int): End index (in units of numel and inclusive) of this
  578. rank's shard of the flattened parameter.
  579. Return:
  580. Tuple[Tuple[Tuple[int, int], ...], Tuple[int, int]]: See
  581. ``_shard_param_offsets`` and ``_shard_indices`` in
  582. :class:`FlatParameter` 's docstring.
  583. """
  584. flat_param_offsets = self._get_flat_param_offsets()
  585. # Indices of the original parameters in this rank's sharded flattened
  586. # parameter
  587. shard_param_indices_range = [] # elements will be consecutive
  588. # [start, end] offsets giving this rank's part of the flattened
  589. # original module parameter (which will be [0, `p.numel()`-1] for any
  590. # parameter that is not sharded across ranks)
  591. shard_param_offsets = []
  592. for i, (param_start, param_end) in enumerate(flat_param_offsets):
  593. if start > param_end or end < param_start:
  594. continue
  595. if start <= param_start:
  596. intra_param_start = 0
  597. else:
  598. intra_param_start = start - param_start
  599. intra_param_end = min(param_end, end) - param_start
  600. shard_param_indices_range.append(i)
  601. shard_param_offsets.append(
  602. (intra_param_start, intra_param_end)
  603. ) # both inclusive
  604. if len(shard_param_indices_range) == 0:
  605. shard_param_indices = (0, 0)
  606. assert len(shard_param_offsets) == 0
  607. else:
  608. shard_param_indices = (
  609. shard_param_indices_range[0],
  610. shard_param_indices_range[-1],
  611. )
  612. assert (
  613. len(shard_param_offsets)
  614. == shard_param_indices[-1] - shard_param_indices[0] + 1
  615. )
  616. return tuple(shard_param_offsets), shard_param_indices
  617. @staticmethod
  618. def _get_unpadded_shard(
  619. tensor: Tensor,
  620. rank: int,
  621. world_size: int,
  622. ) -> Tuple[Tensor, int]:
  623. """
  624. Returns the shard of ``tensor`` without any padding for the given
  625. ``rank`` and ``world_size`` and the numel to pad for that shard.
  626. If ``tensor`` is already flattened or may be viewed in the flattened
  627. shape (which is true in the expected usage), then this method does not
  628. allocate any new tensor memory.
  629. """
  630. chunks = torch.flatten(tensor).chunk(world_size)
  631. if len(chunks) < (rank + 1):
  632. # This rank gets an empty chunk fully padded with zeros since there
  633. # are not enough chunks across ranks
  634. chunk = chunks[0].new_empty(0)
  635. else:
  636. chunk = chunks[rank]
  637. numel_to_pad = chunks[0].numel() - chunk.numel()
  638. assert (
  639. numel_to_pad >= 0
  640. ), "Chunk's size should be at most the first chunk's size"
  641. return chunk, numel_to_pad
  642. @staticmethod
  643. def _get_shard(
  644. tensor: Tensor,
  645. rank: int,
  646. world_size: int,
  647. ) -> Tuple[Tensor, int]:
  648. """
  649. Returns the shard of ``tensor`` with padding for the given ``rank`` and
  650. ``world_size`` and the numel padded for that shard.
  651. This method allocates new memory (via :meth:`clone`) since the
  652. unsharded ``tensor`` may be deallocated after this method returns.
  653. """
  654. chunk, numel_to_pad = FlatParamHandle._get_unpadded_shard(
  655. tensor, rank, world_size
  656. )
  657. shard = chunk.clone()
  658. if numel_to_pad > 0:
  659. shard = F.pad(shard, [0, numel_to_pad])
  660. return shard, numel_to_pad
  661. @staticmethod
  662. def _get_sharded_size(tensor: Tensor, rank: int, world_size: int) -> torch.Size:
  663. """
  664. Returns the shape of ``tensor`` after sharding including padding. This
  665. requires ``tensor`` to have 1D shape and ensures that the returned
  666. shape is 1D.
  667. """
  668. assert len(tensor.shape) == 1, f"{tensor.shape}"
  669. unpadded_sharded_tensor, numel_to_pad = FlatParamHandle._get_unpadded_shard(
  670. tensor, rank, world_size
  671. )
  672. unpadded_sharded_size = unpadded_sharded_tensor.size()
  673. assert len(unpadded_sharded_size) == 1, f"{unpadded_sharded_size}"
  674. return torch.Size([unpadded_sharded_size[0] + numel_to_pad])
  675. def _get_flat_param_offsets(self) -> List[Tuple[int, int]]:
  676. """Returns [start, end] offsets of each original parameter's flattened
  677. data in the unsharded flattened parameter (without padding)."""
  678. cumulative_sum = list(accumulate(self.flat_param._numels))
  679. starts = [0] + cumulative_sum[:-1]
  680. ends = [end - 1 for end in cumulative_sum] # inclusive
  681. param_offsets = list(zip(starts, ends))
  682. return param_offsets
  683. def shard_metadata(
  684. self,
  685. ) -> FlatParamShardMetadata:
  686. """Returns shard-related metadata specific to this rank's shard of the
  687. flattened parameter."""
  688. assert hasattr(self.flat_param, "_shard_indices") and hasattr(
  689. self.flat_param, "_shard_param_offsets"
  690. ), "Shard metadata has not been initialized"
  691. shard_param_start_index = self.flat_param._shard_indices[0] # type: ignore[attr-defined]
  692. shard_param_end_index = self.flat_param._shard_indices[1] # type: ignore[attr-defined]
  693. sl = (
  694. slice(shard_param_start_index, shard_param_end_index + 1)
  695. if shard_param_start_index <= shard_param_end_index
  696. else slice(0, 0)
  697. )
  698. return FlatParamShardMetadata(
  699. self.flat_param._fqns[sl],
  700. self.flat_param._shapes[sl],
  701. self.flat_param._numels[sl],
  702. self.flat_param._shard_param_offsets[:], # type: ignore[attr-defined]
  703. )
  704. @no_type_check
  705. @torch.no_grad()
  706. def init_flat_param_attributes(self) -> None:
  707. """
  708. This initializes some attributes on the handle's ``FlatParameter``.
  709. This should be called during lazy initialization since it requires the
  710. parameter to be on the compute device if not offloading to CPU and we
  711. want to give users the chance to move the parameter appropriately after
  712. the FSDP constructor.
  713. For each tensor attribute on the ``FlatParameter``, see the unshard and
  714. reshard methods in this class for the allocation and free pattern.
  715. """
  716. flat_param = self.flat_param
  717. if flat_param.dtype != self._orig_param_dtype:
  718. # Entering this branch means that the user changed the parameter
  719. # dtype after FSDP initialization, in which case we may need to
  720. # refresh some saved dtype attributes (dtypes specified as a part
  721. # of mixed precision take precedence).
  722. if not self._low_prec_param_dtype_specified:
  723. self._fwd_bwd_param_dtype = flat_param.dtype
  724. # For `reduce_dtype`, require `param_dtype` was not specified since
  725. # then we infer the `reduce_dtype` from the specified `param_dtype`
  726. if (
  727. not self._low_prec_reduce_dtype_specified
  728. and not self._low_prec_param_dtype_specified
  729. ):
  730. self._reduce_dtype = flat_param.dtype
  731. self._orig_param_dtype = flat_param.dtype
  732. cpu_device = torch.device("cpu")
  733. if self._offload_params:
  734. p_assert(
  735. flat_param.device == cpu_device,
  736. f"Expects the `FlatParameter` to be on CPU when parameter CPU "
  737. f"offloading is enabled, not {flat_param.device}",
  738. )
  739. else:
  740. self._check_on_compute_device(self.flat_param)
  741. flat_param._local_shard = flat_param.data
  742. if self._offload_params:
  743. # Pin the memory for faster H2D transfer
  744. flat_param._local_shard = flat_param._local_shard.pin_memory()
  745. # Pre-allocate the sharded gradient on CPU to enable non-blocking
  746. # D2H transfer during the backward pass
  747. flat_param._cpu_grad = torch.zeros_like(
  748. flat_param._local_shard, device=cpu_device
  749. ).pin_memory()
  750. if self._uses_param_mixed_precision:
  751. # For parameter mixed precision, we maintain a low precision
  752. # sharded tensor on the compute device to be all-gathered (for
  753. # sharded strategies) or directly used (for `NO_SHARD`) for
  754. # computation.
  755. flat_param._mp_shard = torch.zeros_like(
  756. flat_param._local_shard,
  757. device=self.device,
  758. dtype=self._fwd_bwd_param_dtype,
  759. )
  760. _free_storage(flat_param._mp_shard)
  761. if self.uses_sharded_strategy:
  762. # We maintain a padded unsharded tensor that serves as the
  763. # all-gather destination and owns the original parameter storages.
  764. unsharded_param_dtype = (
  765. self._fwd_bwd_param_dtype
  766. if self._uses_param_mixed_precision
  767. else flat_param.dtype
  768. ) # use low precision if parameter mixed precision is enabled
  769. padded_unsharded_numel = flat_param.numel() * self.world_size
  770. flat_param._full_param_padded = torch.zeros(
  771. padded_unsharded_numel,
  772. device=self.device,
  773. dtype=unsharded_param_dtype,
  774. )
  775. flat_param._padded_unsharded_size = flat_param._full_param_padded.size()
  776. _free_storage(flat_param._full_param_padded)
  777. if self._uses_param_mixed_precision:
  778. # For parameter mixed precision, we maintain a full precision
  779. # padded unsharded tensor for when we force full precision.
  780. flat_param._full_prec_full_param_padded = torch.zeros(
  781. padded_unsharded_numel,
  782. device=self.device,
  783. dtype=flat_param.dtype, # full precision
  784. )
  785. _free_storage(flat_param._full_prec_full_param_padded)
  786. ###################
  787. # UNSHARD/RESHARD #
  788. ###################
  789. def pre_unshard(self) -> bool:
  790. """
  791. Returns: ``False`` if this is a no-op and ``True`` otherwise.
  792. Postcondition: ``self.flat_param`` 's data is on the device for
  793. communication and is what should be all-gathered. This means that it
  794. matches the dtype of the expected unsharded parameter.
  795. """
  796. ret = False
  797. if self._use_orig_params:
  798. ret = self._writeback_orig_params()
  799. if (
  800. self.uses_sharded_strategy
  801. and not self._offload_params
  802. and not self.needs_unshard()
  803. ):
  804. pass # no-op
  805. elif self._uses_param_mixed_precision and not self._force_full_precision:
  806. self._use_low_precision_shard()
  807. ret = True
  808. elif self._offload_params and self.flat_param.device != self.device:
  809. # NOTE: This creates a new tensor distinct from any attributes.
  810. self.flat_param_to(self.device, non_blocking=True)
  811. ret = True
  812. self._check_on_compute_device(self.flat_param)
  813. return ret
  814. def _use_low_precision_shard(self):
  815. """
  816. Allocates the low precision shard directly on the compute device and
  817. switches to using the low precision sharded flattened parameter.
  818. """
  819. self._check_low_precision_shard()
  820. flat_param = self.flat_param
  821. _alloc_storage(
  822. flat_param._mp_shard, flat_param._local_shard.size() # type: ignore[attr-defined]
  823. )
  824. # `copy_()` implicitly casts to the low precision
  825. flat_param._mp_shard.copy_( # type: ignore[attr-defined]
  826. flat_param._local_shard.to( # type: ignore[attr-defined]
  827. self.device, non_blocking=True
  828. )
  829. )
  830. # Invariant: `_mp_shard` is always on the compute device.
  831. flat_param.data = flat_param._mp_shard # type: ignore[attr-defined]
  832. def unshard(self):
  833. """
  834. Runs the unshard logic. This includes all-gathering the flattened
  835. parameter and switching to using the unsharded flattened parameter. If
  836. the handle does not need unsharding, then this only switches to using
  837. the unsharded flattened parameter. For ``NO_SHARD``, this is a no-op.
  838. If FSDP is in :meth:`summon_full_params` and the handle uses parameter
  839. mixed precision, then the parameter is forced to full precision.
  840. """
  841. if not self.needs_unshard():
  842. # Even when not needing an unshard, we should switch to using
  843. # the unsharded flattened parameter
  844. unsharded_flat_param = (
  845. self._get_padded_unsharded_flat_param()
  846. if self.uses_sharded_strategy
  847. else self.flat_param
  848. )
  849. self._use_unsharded_flat_param(unsharded_flat_param)
  850. return
  851. unsharded_flat_param = self._alloc_padded_unsharded_flat_param()
  852. padded_unsharded_flat_param = self._all_gather_flat_param(unsharded_flat_param)
  853. self._use_unsharded_flat_param(padded_unsharded_flat_param)
  854. def needs_unshard(self) -> bool:
  855. """Returns if the handle's flattened parameter needs to be unsharded."""
  856. if not self.uses_sharded_strategy:
  857. return False
  858. unsharded_flat_param = self._get_padded_unsharded_flat_param()
  859. already_unsharded = (
  860. unsharded_flat_param._typed_storage()._size()
  861. == unsharded_flat_param.numel()
  862. )
  863. return not already_unsharded
  864. def _alloc_padded_unsharded_flat_param(self):
  865. """
  866. Allocates the *padded* unsharded flattened parameter. The unpadded
  867. unsharded flattened parameter is always a view into the padded one.
  868. This padded parameter is saved to a different attribute on the
  869. ``FlatParameter`` depending on if we force full precision.
  870. """
  871. self._check_sharded_strategy()
  872. flat_param = self.flat_param
  873. unsharded_flat_param = self._get_padded_unsharded_flat_param()
  874. self._check_storage_freed(unsharded_flat_param)
  875. _alloc_storage(unsharded_flat_param, flat_param._padded_unsharded_size) # type: ignore[attr-defined]
  876. return unsharded_flat_param
  877. def _get_padded_unsharded_flat_param(self) -> torch.Tensor:
  878. """
  879. Returns a reference to the padded unsharded flattened parameter
  880. depending on the calling context. This should only be called if using a
  881. sharded strategy.
  882. """
  883. self._check_sharded_strategy()
  884. flat_param = self.flat_param
  885. if self._force_full_precision:
  886. # When parameter mixed precision is enabled, we use a different
  887. # tensor as the all-gather destination to preserve the invariant
  888. # that `_full_param_padded` is in the low precision
  889. unsharded_flat_param = flat_param._full_prec_full_param_padded # type: ignore[attr-defined]
  890. p_assert(
  891. unsharded_flat_param.dtype != self._fwd_bwd_param_dtype,
  892. f"Expects full precision but got {self._fwd_bwd_param_dtype}",
  893. )
  894. else:
  895. unsharded_flat_param = flat_param._full_param_padded # type: ignore[attr-defined]
  896. return unsharded_flat_param
  897. def _all_gather_flat_param(
  898. self,
  899. padded_unsharded_flat_param: Tensor,
  900. ) -> Tensor:
  901. """
  902. All-gathers the handle's flattened parameter to the destination
  903. ``padded_unsharded_flat_param``, and switches to using the all-gathered
  904. tensor.
  905. """
  906. p_assert(
  907. hasattr(self, "process_group") and hasattr(self, "world_size"),
  908. "Expects a process group and world size to have been set via `shard()`",
  909. )
  910. sharded_flat_param = self.flat_param.data
  911. expected_numel = sharded_flat_param.numel() * self.world_size
  912. p_assert(
  913. padded_unsharded_flat_param.numel() == expected_numel,
  914. f"Expects {expected_numel} numel but got {padded_unsharded_flat_param.numel()}",
  915. )
  916. dist.all_gather_into_tensor(
  917. padded_unsharded_flat_param,
  918. sharded_flat_param,
  919. self.process_group,
  920. )
  921. return padded_unsharded_flat_param
  922. def _use_unsharded_flat_param(
  923. self,
  924. padded_unsharded_flat_param: torch.Tensor,
  925. ) -> None:
  926. """
  927. Switches to using the *unpadded* unsharded flattened parameter, which
  928. is a view into the *padded* unsharded flattened parameter.
  929. """
  930. unsharded_size = self.flat_param._unpadded_unsharded_size
  931. self.flat_param.data = padded_unsharded_flat_param[
  932. : unsharded_size.numel()
  933. ].view(
  934. unsharded_size
  935. ) # this `.view()` is not autograd visible
  936. in_forward = self._training_state == HandleTrainingState.FORWARD
  937. in_pre_backward = self._training_state == HandleTrainingState.BACKWARD_PRE
  938. if self._use_orig_params:
  939. # We use `Tensor` views in the forward so that they are tracked by
  940. # autograd. We use them in the pre-backward as well to support
  941. # reentrant activation checkpointing, which needs the views to be
  942. # tracked by autograd in the backward pass's recomputed forward.
  943. self._use_unsharded_views(
  944. as_params=(not in_forward and not in_pre_backward)
  945. )
  946. elif in_forward:
  947. self._use_unsharded_views(as_params=False)
  948. def post_unshard(self):
  949. """
  950. Runs the post-unshard logic. This includes freeing the low precision
  951. shard if needed.
  952. """
  953. if self._uses_param_mixed_precision and self.uses_sharded_strategy:
  954. self._free_low_precision_sharded_param()
  955. self._check_on_compute_device(self.flat_param)
  956. def _free_low_precision_sharded_param(self):
  957. """Frees the low precision sharded flattened parameter."""
  958. self._check_low_precision_shard()
  959. # `_mp_shard` is allocated in the pre-unshard stream, consumed in the
  960. # unshard stream for sharded strategies, and consumed in both the
  961. # unshard and default streams for `NO_SHARD`. For sharded strategies,
  962. # the current stream here is the unshard stream, and for `NO_SHARD`,
  963. # it is the default stream. For `NO_SHARD`, only recording for the
  964. # default stream suffices since the default stream waits for the
  965. # unshard stream.
  966. _no_dispatch_record_stream(
  967. self.flat_param._mp_shard, torch.cuda.current_stream() # type: ignore[attr-defined]
  968. )
  969. _free_storage(self.flat_param._mp_shard) # type: ignore[attr-defined]
  970. @torch.no_grad()
  971. def unshard_grad(self):
  972. """
  973. Unshards the handle's ``FlatParameter`` 's gradient. If all ranks have
  974. ``None`` gradient, then all original parameters will as well. This
  975. method performs an all-reduce and an all-gather. The additional
  976. all-reduce is tolerable since this method is not meant to be used on
  977. the computation critical path.
  978. Postcondition: ``_saved_grad_shard`` is defined and contains the value
  979. to set ``flat_param.grad`` after gradients are resharded.
  980. """
  981. if not self.uses_sharded_strategy:
  982. self._use_unsharded_grad_views()
  983. return
  984. flat_param = self.flat_param
  985. self._check_unsharded(flat_param)
  986. # Check if all ranks have a `None` gradient
  987. num_grad_none = torch.zeros(1, dtype=torch.int32, device=self.device)
  988. num_grad_none[0] = flat_param.grad is None
  989. dist.all_reduce(num_grad_none, group=self.process_group)
  990. if num_grad_none[0] == self.world_size:
  991. flat_param._saved_grad_shard = None # type: ignore[attr-defined]
  992. self._use_unsharded_grad_views()
  993. return
  994. padded_unsharded_grad = torch.empty(
  995. flat_param._padded_unsharded_size, # type: ignore[attr-defined]
  996. device=self.device,
  997. )
  998. if flat_param.grad is None:
  999. # In the case that only some ranks have `None` gradient, we use
  1000. # zeros to approximate as a best effort attempt
  1001. if self._debug_level == dist.DebugLevel.DETAIL:
  1002. warnings.warn(
  1003. f"[Rank {self.rank}] Only some but not all ranks have a "
  1004. "`None` `FlatParameter` gradient, so FSDP is using zeros to "
  1005. "approximate those ranks' sharded gradients being `None`"
  1006. )
  1007. flat_param._saved_grad_shard = None # type: ignore[attr-defined]
  1008. sharded_grad = torch.zeros(flat_param._sharded_size, device=self.device) # type: ignore[attr-defined]
  1009. else:
  1010. self._check_sharded(flat_param.grad)
  1011. flat_param._saved_grad_shard = flat_param.grad # type: ignore[attr-defined]
  1012. sharded_grad = flat_param._saved_grad_shard # type: ignore[attr-defined]
  1013. dist.all_gather_into_tensor(
  1014. padded_unsharded_grad, sharded_grad, self.process_group
  1015. )
  1016. unsharded_size = self.flat_param._unpadded_unsharded_size
  1017. flat_param.grad = padded_unsharded_grad[: unsharded_size.numel()].view(
  1018. unsharded_size
  1019. )
  1020. self._use_unsharded_grad_views()
  1021. def reshard_grad(self):
  1022. if self._use_orig_params:
  1023. self._use_sharded_grad_views()
  1024. if not self.uses_sharded_strategy:
  1025. return
  1026. self.flat_param.grad = self.flat_param._saved_grad_shard # type: ignore[attr-defined]
  1027. delattr(self.flat_param, "_saved_grad_shard")
  1028. def prepare_gradient_for_backward(self):
  1029. """
  1030. Prepares the gradient for the backward computation by saving and
  1031. clearing any existing sharded gradient in ``.grad`` to enable computing
  1032. a new unsharded gradient.
  1033. """
  1034. p_assert(
  1035. self._training_state
  1036. in (HandleTrainingState.BACKWARD_PRE, HandleTrainingState.IDLE),
  1037. "Expects to be in `BACKWARD_PRE` or `IDLE` (if prefetching)",
  1038. )
  1039. flat_param = self.flat_param
  1040. if flat_param.grad is not None and (
  1041. flat_param.grad.size() != flat_param._unpadded_unsharded_size
  1042. or flat_param.grad.device != flat_param.device # grad on CPU
  1043. ):
  1044. self._check_on_compute_device(self.flat_param)
  1045. grad_offloaded = flat_param.grad.device != self.device
  1046. p_assert(
  1047. not grad_offloaded or self._offload_params,
  1048. f"Expects the sharded gradient to be on {self.device} "
  1049. f"but got {flat_param.grad.device}",
  1050. )
  1051. prev_iter_synced_gradients = (
  1052. flat_param.grad.size()
  1053. == flat_param._local_shard.size() # type: ignore[attr-defined]
  1054. )
  1055. if prev_iter_synced_gradients:
  1056. # TODO (awgu): Gradient accumulation outside `no_sync()`
  1057. # does not work with CPU offloading. The issue should be
  1058. # that, in the post-backward hook, we cannot do an addition
  1059. # between a CPU tensor (the existing sharded gradient) and
  1060. # a GPU tensor (the new sharded gradient).
  1061. if not grad_offloaded:
  1062. flat_param._saved_grad_shard = flat_param.grad.data # type: ignore[attr-defined]
  1063. sharded_grad = flat_param._saved_grad_shard # type: ignore[attr-defined]
  1064. else:
  1065. p_assert(
  1066. hasattr(flat_param, "_cpu_grad"),
  1067. "`_cpu_grad` should be defined if the gradient is on CPU",
  1068. )
  1069. sharded_grad = flat_param._cpu_grad # type: ignore[attr-defined]
  1070. # If user specified to keep the gradient in low precision, then
  1071. # the gradient may still be of the low precision dtype if the
  1072. # user did not set the gradient to `None` after the previous
  1073. # backward, in which case FSDP should cast back to the full
  1074. # precision dtype so that FSDP can accumulate in that dtype in
  1075. # the post-backward hook and assign to `.grad` in that dtype in
  1076. # the post-backward callback.
  1077. local_shard_dtype = flat_param._local_shard.dtype # type: ignore[attr-defined]
  1078. if (
  1079. self._keep_low_precision_grads
  1080. and sharded_grad.dtype != local_shard_dtype
  1081. ):
  1082. sharded_grad.data = sharded_grad.to(local_shard_dtype)
  1083. else:
  1084. padded_unsharded_size = flat_param._padded_unsharded_size # type: ignore[attr-defined]
  1085. p_assert(
  1086. flat_param.grad.size() == padded_unsharded_size,
  1087. "Expects `.grad` to be the unsharded gradient in "
  1088. f"`no_sync()` with size {padded_unsharded_size} "
  1089. f"but got size {flat_param.grad.size()}",
  1090. )
  1091. flat_param.grad = None
  1092. def prepare_gradient_for_optim(self):
  1093. """
  1094. Prepares the gradient for optimizer computation by moving the sharded
  1095. gradient to the ``.grad`` attribute.
  1096. """
  1097. def cast_grad_to_param_dtype_if_needed(flat_param):
  1098. if self._keep_low_precision_grads:
  1099. assert flat_param.grad is not None # mypy
  1100. if flat_param.grad.dtype != self._fwd_bwd_param_dtype:
  1101. flat_param.grad.data = flat_param.grad.to(self._fwd_bwd_param_dtype)
  1102. if self._use_orig_params:
  1103. self._use_sharded_grad_views()
  1104. flat_param = self.flat_param
  1105. # TODO (awgu): We should replace these conditional checks to encode
  1106. # the logical intention more directly.
  1107. if hasattr(flat_param, "_cpu_grad"):
  1108. # NOTE: This branch includes `NO_SHARD`.
  1109. self._check_sharded(flat_param)
  1110. self._check_on_cpu(flat_param)
  1111. flat_param.grad = flat_param._cpu_grad # type: ignore[attr-defined]
  1112. cast_grad_to_param_dtype_if_needed(flat_param)
  1113. elif hasattr(flat_param, "_saved_grad_shard"):
  1114. self._check_sharded(flat_param)
  1115. self._check_on_compute_device(flat_param)
  1116. self._check_on_compute_device(flat_param._saved_grad_shard) # type: ignore[attr-defined]
  1117. # If no sharded gradient was computed this iteration, then there is
  1118. # no need to forward `_saved_grad_shard` to `grad`
  1119. if flat_param._post_backward_called: # type: ignore[attr-defined]
  1120. flat_param.grad = flat_param._saved_grad_shard # type: ignore[attr-defined]
  1121. cast_grad_to_param_dtype_if_needed(flat_param)
  1122. else:
  1123. p_assert(
  1124. not self.uses_sharded_strategy
  1125. or not flat_param._post_backward_called, # type: ignore[attr-defined]
  1126. "All sharded parameters that received a gradient in the "
  1127. "post-backward should use `_saved_grad_shard`",
  1128. )
  1129. # Delete `_saved_grad_shard` since its existence indicates a previous
  1130. # gradient to accumulate with in the post-backward hook
  1131. if hasattr(flat_param, "_saved_grad_shard"):
  1132. delattr(flat_param, "_saved_grad_shard")
  1133. @contextlib.contextmanager
  1134. def to_cpu(self):
  1135. """
  1136. Moves the unpadded unsharded flattened parameter to CPU while in the
  1137. context and moves it back to the previous device upon exit. For now,
  1138. this assumes the ``FlatParameter`` is the unpadded unsharded flattened
  1139. parameter since (1) there is no reason to include the padding in the
  1140. copy and (2) there is no use case for the sharded flattened parameter.
  1141. Precondition: ``self.flat_param`` 's data is the unpadded unsharded
  1142. flattened parameter on the compute device, and the handle uses a
  1143. sharded strategy.
  1144. Postcondition: Same as the precondition.
  1145. """
  1146. self._check_sharded_strategy()
  1147. p_assert(
  1148. self.flat_param.size() == self.flat_param._unpadded_unsharded_size,
  1149. f"Expects size {self.flat_param._unpadded_unsharded_size} but got {self.flat_param.size()}",
  1150. )
  1151. self._check_on_compute_device(self.flat_param)
  1152. # Check that the unpadded unsharded flattened parameter is a view into
  1153. # the padded unsharded flattened parameter as expected
  1154. # NOTE: This check is not strictly needed for correctness but is a
  1155. # useful sanity check since the tensor should only be used internally.
  1156. unpadded_storage_ptr = self.flat_param._typed_storage()._data_ptr()
  1157. padded_storage_ptr = (
  1158. self._get_padded_unsharded_flat_param()._typed_storage()._data_ptr()
  1159. )
  1160. p_assert(
  1161. unpadded_storage_ptr == padded_storage_ptr,
  1162. "Expects the unpadded parameter to be a view into the padded parameter",
  1163. )
  1164. self.flat_param_to(torch.device("cpu"))
  1165. self._free_unsharded_flat_param()
  1166. try:
  1167. yield
  1168. finally:
  1169. p_assert(
  1170. self.flat_param.size() == self.flat_param._unpadded_unsharded_size,
  1171. f"Expects size {self.flat_param._unpadded_unsharded_size} but got {self.flat_param.size()}",
  1172. )
  1173. padded_unsharded_flat_param = self._alloc_padded_unsharded_flat_param()
  1174. # Copy from CPU to the compute device
  1175. padded_unsharded_flat_param[: self.flat_param.numel()].copy_(
  1176. self.flat_param
  1177. )
  1178. self._use_unsharded_flat_param(padded_unsharded_flat_param)
  1179. def reshard(self, free_unsharded_flat_param: bool):
  1180. """
  1181. Runs the reshard logic. This includes freeing the unsharded flattened
  1182. parameter if ``free_unsharded_flat_param`` and switching to using the
  1183. sharded flattened parameter.
  1184. """
  1185. # Switch to the sharded `FlatParameter` before freeing to prevent
  1186. # "use-after-free"-type bugs with external profiling tools, where for
  1187. # `use_orig_params=True`, the `param` does not point to valid memory
  1188. # when setting `param.data = ...` in `_use_sharded_views()`.
  1189. self._use_sharded_flat_param()
  1190. if free_unsharded_flat_param:
  1191. self._free_unsharded_flat_param()
  1192. def post_reshard(self):
  1193. """
  1194. Runs the post-reshard logic. This includes freeing any memory that
  1195. can now be freed given that the ``FlatParameter`` points to the full
  1196. precision sharded flattened parameter.
  1197. Precondition: ``self.flat_param`` 's data points to the full precision
  1198. sharded flattened parameter.
  1199. """
  1200. # For `NO_SHARD`, `_mp_shard` is not freed in the post-unshard since
  1201. # it is also the low precision *unsharded* flattened parameter. Hence,
  1202. # we delay the free until the reshard.
  1203. if (
  1204. self._uses_param_mixed_precision
  1205. and not self.uses_sharded_strategy
  1206. and not self._force_full_precision # did not use the low precision shard
  1207. ):
  1208. self._free_low_precision_sharded_param()
  1209. def _free_unsharded_flat_param(self):
  1210. """
  1211. Frees the padded unsharded flattened parameter. The tensor to free
  1212. depends on the calling context since the unshard may have forced full
  1213. precision, in which case a different tensor is used.
  1214. """
  1215. self._check_sharded_strategy()
  1216. unsharded_flat_param = self._get_padded_unsharded_flat_param()
  1217. self._check_storage_allocated(unsharded_flat_param)
  1218. self._check_on_compute_device(unsharded_flat_param)
  1219. # Do not free the memory until all ops in the current stream finish
  1220. _no_dispatch_record_stream(unsharded_flat_param, torch.cuda.current_stream())
  1221. _free_storage(unsharded_flat_param)
  1222. def _use_sharded_flat_param(self) -> None:
  1223. """Switches to using the sharded flattened parameter."""
  1224. flat_param = self.flat_param
  1225. if self._offload_params:
  1226. device = flat_param._local_shard.device # type: ignore[attr-defined]
  1227. p_assert(
  1228. device == torch.device("cpu"),
  1229. f"Expects the local shard to be on CPU but got {device}",
  1230. )
  1231. flat_param.data = flat_param._local_shard # type: ignore[attr-defined]
  1232. if self._use_orig_params:
  1233. self._use_sharded_views()
  1234. # For the post-forward reshard, we may try to use sharded gradient
  1235. # views (or unsharded gradient views if a gradient was accumulated
  1236. # in `no_sync()`), but for the post-backward reshard, we delay the
  1237. # call to after the reduce-scatter.
  1238. if self._training_state == HandleTrainingState.FORWARD:
  1239. # TODO: Change `_unpadded_unsharded_size` if we change the
  1240. # gradient to be computed directly with padding.
  1241. accumulated_grad_in_no_sync = (
  1242. flat_param.grad is not None
  1243. and self.uses_sharded_strategy
  1244. and flat_param.grad.shape == flat_param._unpadded_unsharded_size
  1245. )
  1246. if accumulated_grad_in_no_sync:
  1247. self._use_unsharded_grad_views()
  1248. else:
  1249. self._use_sharded_grad_views()
  1250. #########
  1251. # VIEWS #
  1252. #########
  1253. @staticmethod
  1254. def _get_unflat_views(
  1255. flat_param: FlatParameter,
  1256. tensor: Optional[torch.Tensor] = None,
  1257. ) -> Iterator[Tensor]:
  1258. """
  1259. Returns unflattened ``Tensor`` views into ``tensor`` if it is not
  1260. ``None`` or ``flat_param`` otherwise, where the unflattening is based
  1261. on ``flat_param`` 's metadata.
  1262. In other words, to get views into the unsharded flattened parameter,
  1263. pass ``tensor`` as ``None``, but to get views into tensor optimizer
  1264. state, pass ``tensor`` as the optimizer state tensor.
  1265. """
  1266. if tensor is None:
  1267. tensor = flat_param
  1268. p_assert(
  1269. tensor.numel() == flat_param._unpadded_unsharded_size.numel(),
  1270. f"Expects {flat_param._unpadded_unsharded_size.numel()} numel but got "
  1271. f"{tensor.numel()} numel",
  1272. )
  1273. views = (
  1274. _ext_post_unflatten_transform(subtensor.view(shape), param_extension)
  1275. for (subtensor, shape, param_extension) in zip(
  1276. torch.split(tensor, flat_param._numels, dim=0), # type: ignore[arg-type]
  1277. flat_param._shapes,
  1278. flat_param._param_extensions,
  1279. )
  1280. )
  1281. return views
  1282. def _use_unsharded_views(self, as_params: bool) -> None:
  1283. """
  1284. Unflattens the unsharded flattened parameter by setting the original
  1285. module parameter variables to be views into it.
  1286. Args:
  1287. as_params (bool): If ``True``, then registers the original
  1288. parameters as ``nn.Parameter`` s; if ``False``, then registers
  1289. the original parameters only as ``Tensor`` s. ``False`` should
  1290. be used during forward/backward computation and when hiding the
  1291. original parameters from :meth:`nn.Module.named_parameters`.
  1292. """
  1293. self._check_unsharded(self.flat_param)
  1294. views = self._get_unflat_views(self.flat_param)
  1295. for i, (view, (param_name, module, _)) in enumerate(
  1296. zip(views, self.flat_param._param_infos)
  1297. ):
  1298. if hasattr(module, param_name):
  1299. delattr(module, param_name)
  1300. if self._use_orig_params and as_params:
  1301. if type(view) is DTensor:
  1302. # A `DTensor` `view` is not compatible with assigning
  1303. # `param.data = view`, so we cannot preserve the parameter
  1304. # variable.
  1305. setattr(module, param_name, nn.Parameter(view))
  1306. continue
  1307. param = self.flat_param._params[i] # type: ignore[index]
  1308. setattr(module, param_name, param)
  1309. param.data = view
  1310. elif as_params:
  1311. module.register_parameter(param_name, nn.Parameter(view))
  1312. else: # `as_params=False`
  1313. param_var: Tensor = view
  1314. if self._use_orig_params:
  1315. if self._training_state == HandleTrainingState.FORWARD:
  1316. assert self.flat_param._tensors is not None
  1317. # Save the `Tensor` for the pre-backward
  1318. self.flat_param._tensors[i] = view # save for pre-backward
  1319. elif self._training_state == HandleTrainingState.BACKWARD_PRE:
  1320. # Use the saved `Tensor` variable from the forward to
  1321. # preserve the autograd graph so that the post-backward
  1322. # hook fires (e.g. for reentrant AC)
  1323. assert self.flat_param._tensors is not None # mypy
  1324. tensor = self.flat_param._tensors[i]
  1325. p_assert(
  1326. tensor is not None,
  1327. "Expects `Tensor` to have been saved in forward",
  1328. )
  1329. tensor.data = view # type: ignore[union-attr]
  1330. assert tensor is not None # mypy
  1331. param_var = tensor
  1332. setattr(module, param_name, param_var)
  1333. if (
  1334. self._use_orig_params
  1335. and self._training_state == HandleTrainingState.FORWARD
  1336. ):
  1337. module._parameters[param_name] = param_var # type: ignore[assignment]
  1338. for i, (
  1339. param_name,
  1340. module,
  1341. _,
  1342. prim_param_name,
  1343. prim_module,
  1344. prim_module_name,
  1345. ) in enumerate(self.flat_param._shared_param_infos):
  1346. if hasattr(module, param_name):
  1347. delattr(module, param_name)
  1348. p_assert(
  1349. hasattr(prim_module, prim_param_name),
  1350. f"Module {prim_module_name} is missing parameter {prim_param_name}",
  1351. )
  1352. prim_param: Union[Tensor, nn.Parameter] = getattr(
  1353. prim_module, prim_param_name
  1354. )
  1355. p_assert(
  1356. not as_params or isinstance(prim_param, nn.Parameter),
  1357. f"as_params={as_params} type(prim_param)={type(prim_param)}",
  1358. )
  1359. if self._use_orig_params and as_params:
  1360. shared_param = self.flat_param._shared_params[i] # type: ignore[index]
  1361. setattr(module, param_name, shared_param)
  1362. shared_param.data = prim_param
  1363. elif as_params:
  1364. assert isinstance(prim_param, nn.Parameter)
  1365. module.register_parameter(param_name, prim_param)
  1366. else:
  1367. setattr(module, param_name, prim_param)
  1368. if (
  1369. self._use_orig_params
  1370. and self._training_state == HandleTrainingState.FORWARD
  1371. ):
  1372. module._parameters[param_name] = prim_param # type: ignore[assignment]
  1373. def _use_unsharded_grad_views(self) -> None:
  1374. """
  1375. Unflattens the unsharded flattened parameter's gradient by setting the
  1376. original module parameter variables' gradients to be views into it.
  1377. """
  1378. # Expects the gradient to be in `flat_param.grad`
  1379. if self.flat_param.grad is None:
  1380. assert self.flat_param._params is not None # mypy
  1381. assert self.flat_param._shared_params is not None # mypy
  1382. for param in chain(
  1383. self.flat_param._params, # type: ignore[attr-defined]
  1384. self.flat_param._shared_params, # type: ignore[attr-defined]
  1385. ):
  1386. param.grad = None
  1387. return
  1388. self._check_unsharded(self.flat_param.grad)
  1389. views = self._get_unflat_views(self.flat_param, self.flat_param.grad)
  1390. for i, (view, (param_name, module, _)) in enumerate(
  1391. zip(views, self.flat_param._param_infos)
  1392. ):
  1393. p_assert(
  1394. hasattr(module, param_name),
  1395. f"{self.flat_param._fqns[i]} is missing",
  1396. )
  1397. param = getattr(module, param_name)
  1398. if param.shape != view.shape or param.dtype != view.dtype:
  1399. # NOTE: This is a hack using `.data` to side step the
  1400. # check that parameter/gradient sizes and dtypes match. Here,
  1401. # `param` can have the sharded size, and `grad` can have the
  1402. # unsharded size. Orthgonally, `param` can have the full
  1403. # precision dtype from `reshard()`, and `grad` can have the
  1404. # parameter low precision dtype. Both of these mismatches
  1405. # happen when running in `no_sync()`.
  1406. if param.grad is None:
  1407. param.grad = torch.empty_like(param)
  1408. param.grad.data = view
  1409. else:
  1410. param.grad = view
  1411. for i, (
  1412. param_name,
  1413. module,
  1414. module_name,
  1415. prim_param_name,
  1416. prim_module,
  1417. _,
  1418. ) in enumerate(self.flat_param._shared_param_infos):
  1419. p_assert(
  1420. hasattr(module, param_name),
  1421. f"{module_name + '.' + param_name if module_name else param_name} is missing",
  1422. ) # did not save FQN info in `_shared_param_infos`
  1423. param = getattr(module, param_name)
  1424. prim_param = getattr(prim_module, prim_param_name)
  1425. if (
  1426. param.shape != prim_param.grad.shape
  1427. or param.dtype != prim_param.grad.dtype
  1428. ):
  1429. # NOTE: This is the same hack to use `.data` to side step the
  1430. # size check.
  1431. if param.grad is None:
  1432. param.grad = torch.empty_like(param)
  1433. param.grad.data = prim_param.grad
  1434. else:
  1435. param.grad = prim_param.grad
  1436. @contextlib.contextmanager
  1437. def unflatten_as_params(self) -> Generator:
  1438. """
  1439. Assumes the flattened parameter is unsharded. When in the context,
  1440. unflattens the original parameters as ``nn.Parameter`` views into the
  1441. flattened parameter, and after the context, restores the original
  1442. parameters as ``Tensor`` views into the flattened parameter.
  1443. """
  1444. self._use_unsharded_views(as_params=True)
  1445. try:
  1446. yield
  1447. finally:
  1448. self._use_unsharded_views(as_params=False)
  1449. @torch.no_grad()
  1450. def _use_sharded_views(self) -> None:
  1451. """
  1452. Sets the original module parameter variables' data to be flattened
  1453. views into the sharded flattened parameter.
  1454. The views are kept as flattened to simplify the case where a parameter
  1455. is sharded across ranks. Parameters whose data is not present in the
  1456. sharded flattened parameter have their data set to a size-0 empty
  1457. tensor. We do not delete them to ensure to preserve expected behaviors
  1458. like model printability. Parameters whose data is present must preserve
  1459. their variables to be passable to an optimizer.
  1460. """
  1461. if not self.uses_sharded_strategy:
  1462. # For `NO_SHARD`, use the *unflattened* unsharded views since we
  1463. # have the unsharded parameter
  1464. self._use_unsharded_views(as_params=True)
  1465. return
  1466. self._check_sharded(self.flat_param)
  1467. start, end = self.flat_param._shard_indices # type: ignore[attr-defined]
  1468. offset = 0
  1469. assert self.flat_param._params is not None
  1470. for i, (param, (param_name, module, _)) in enumerate(
  1471. zip(self.flat_param._params, self.flat_param._param_infos)
  1472. ):
  1473. setattr(module, param_name, param)
  1474. in_sharded_flat_param = (
  1475. i >= start
  1476. and i <= end
  1477. and self.flat_param._shard_param_offsets # type: ignore[attr-defined]
  1478. )
  1479. if in_sharded_flat_param:
  1480. param_start, param_end = self.flat_param._shard_param_offsets[i - start] # type: ignore[attr-defined]
  1481. numel_in_shard = param_end - param_start + 1
  1482. param.data = self.flat_param[offset : offset + numel_in_shard]
  1483. offset += numel_in_shard
  1484. else:
  1485. # Allow the original data to be freed via garbage collection
  1486. param.data = torch.empty(
  1487. 0,
  1488. dtype=self.flat_param.dtype, # in case `flat_param` changed dtype
  1489. device=self.flat_param.device,
  1490. requires_grad=False,
  1491. )
  1492. assert self.flat_param._shared_params is not None
  1493. for i, (
  1494. param,
  1495. (param_name, module, _, prim_param_name, prim_module, _),
  1496. ) in enumerate(
  1497. zip(self.flat_param._shared_params, self.flat_param._shared_param_infos)
  1498. ):
  1499. setattr(module, param_name, param)
  1500. prim_param = getattr(prim_module, prim_param_name)
  1501. param.data = prim_param # could be both empty and non-empty
  1502. if self._training_state == HandleTrainingState.BACKWARD_POST:
  1503. assert self.flat_param._tensors is not None # mypy
  1504. # Clear the saved `Tensor`s since they are unneeded now
  1505. for i in range(len(self.flat_param._tensors)):
  1506. self.flat_param._tensors[i] = None # type: ignore[index]
  1507. @torch.no_grad()
  1508. def _use_sharded_grad_views(self) -> None:
  1509. """
  1510. Sets the original module parameter variables' gradients to be flattened
  1511. views into the sharded flattened parameter's gradient. This is a no-op
  1512. if there is no gradient.
  1513. Parameters whose data is not present in the sharded flattened parameter
  1514. and parameters with ``requires_grad=False`` have their gradients set to
  1515. ``None``. Since the gradient variables do not need to be preserved,
  1516. this method does not manipulate existing ``Tensor`` data directly and
  1517. creates new ``Tensor`` variables instead.
  1518. """
  1519. flat_param = self.flat_param
  1520. self._check_sharded(flat_param)
  1521. grad = self.sharded_grad
  1522. if grad is None:
  1523. assert flat_param._params is not None # mypy
  1524. assert flat_param._shared_params is not None # mypy
  1525. for param in chain(flat_param._params, flat_param._shared_params): # type: ignore[attr-defined]
  1526. param.grad = None
  1527. return
  1528. self._check_sharded(grad)
  1529. start, end = flat_param._shard_indices # type: ignore[attr-defined]
  1530. offset = 0
  1531. assert flat_param._params is not None
  1532. for i, param in enumerate(flat_param._params):
  1533. in_sharded_flat_param = (
  1534. i >= start
  1535. and i <= end
  1536. and flat_param._shard_param_offsets # type: ignore[attr-defined]
  1537. )
  1538. if in_sharded_flat_param:
  1539. param_start, param_end = flat_param._shard_param_offsets[i - start] # type: ignore[attr-defined]
  1540. numel_in_shard = param_end - param_start + 1
  1541. assert flat_param._is_grad_none is not None # mypy
  1542. if param.requires_grad and not flat_param._is_grad_none[i]:
  1543. if self._keep_low_precision_grads or param.dtype != grad.dtype:
  1544. # NOTE: This is a hack using `.data` to side step the
  1545. # check that parameter/gradient dtypes match. Here,
  1546. # `param` has full precision; `grad` has low precision.
  1547. if param.grad is None:
  1548. # `.grad` must have the same shape as `param`
  1549. param.grad = torch.empty_like(param)
  1550. param.grad.data = grad[
  1551. offset : offset + numel_in_shard
  1552. ].reshape(param.shape)
  1553. else:
  1554. param.grad = grad[offset : offset + numel_in_shard].reshape(
  1555. param.shape
  1556. )
  1557. else:
  1558. param.grad = None
  1559. offset += numel_in_shard
  1560. else:
  1561. param.grad = None
  1562. assert flat_param._shared_params is not None
  1563. for i, (param, (_, _, _, prim_param_name, prim_module, _)) in enumerate(
  1564. zip(flat_param._shared_params, flat_param._shared_param_infos)
  1565. ):
  1566. in_sharded_flat_param = hasattr(prim_module, prim_param_name)
  1567. if in_sharded_flat_param and param.requires_grad:
  1568. prim_param = getattr(prim_module, prim_param_name)
  1569. param.grad = prim_param.grad # share the same reference
  1570. else:
  1571. param.grad = None
  1572. @torch.no_grad()
  1573. def _writeback_orig_params(self) -> bool:
  1574. """
  1575. Iterates over the original parameters and writes back any parameters
  1576. that changed storages (due to a non-inplace operator) to the handle's
  1577. ``FlatParameter``. This method preserves the ``FlatParameter` 's
  1578. device even if an original parameter's device changes.
  1579. Raises:
  1580. RuntimeError: If an original parameter or gradient changes storages
  1581. but no longer has the expected flattened shape.
  1582. Returns: ``True`` if some writeback happened, and ``False`` otherwise.
  1583. """
  1584. if self.uses_sharded_strategy and not self.is_sharded(self.flat_param):
  1585. # For `NO_SHARD`, we may still need to writeback
  1586. return False
  1587. flat_param = self.flat_param
  1588. start, end = flat_param._shard_indices # type: ignore[attr-defined]
  1589. offset = 0
  1590. assert flat_param._params is not None
  1591. wroteback = False
  1592. for i, (param, (param_name, module, _)) in enumerate(
  1593. zip(flat_param._params, flat_param._param_infos)
  1594. ):
  1595. if not hasattr(module, param_name):
  1596. # Do not writeback if original parameters are deregistered
  1597. # (e.g. during model checkpointing)
  1598. continue
  1599. in_sharded_flat_param = (
  1600. i >= start
  1601. and i <= end
  1602. and self.flat_param._shard_param_offsets # type: ignore[attr-defined]
  1603. )
  1604. if not in_sharded_flat_param:
  1605. continue
  1606. param_start, param_end = flat_param._shard_param_offsets[i - start] # type: ignore[attr-defined]
  1607. numel_in_shard = param_end - param_start + 1
  1608. # Check for parameter writeback
  1609. param_changed = getattr(module, param_name) is not param
  1610. needs_param_writeback = (
  1611. param_changed # changed parameter variable itself
  1612. or not _same_storage(param, flat_param) # changed `.data`
  1613. )
  1614. if param_changed:
  1615. # NOTE: The gradient is not preserved after a parameter change.
  1616. param = getattr(module, param_name)
  1617. flat_param._params[i] = param
  1618. if needs_param_writeback:
  1619. expected_shape = torch.Size([numel_in_shard])
  1620. self._writeback_tensor(
  1621. param, flat_param, i, expected_shape, offset, True
  1622. )
  1623. wroteback = True
  1624. # Check for gradient writeback
  1625. # NOTE: Since this method is called in the pre-unshard, which is
  1626. # only called during computation in the pre-forward or
  1627. # pre-backward, the sharded gradient should be guaranteed to be in
  1628. # `.grad`, not in `._saved_grad_shard`.
  1629. if param.grad is None and flat_param.grad is not None:
  1630. expected_shape = torch.Size([numel_in_shard])
  1631. self._writeback_tensor(
  1632. None, flat_param.grad, i, expected_shape, offset, False
  1633. )
  1634. elif param.grad is not None:
  1635. # For `NO_SHARD` + CPU offloading, `_cpu_grad` is always in
  1636. # memory and owns the gradient storage, so it will never
  1637. # require gradient writeback.
  1638. flat_param_grad = (
  1639. flat_param.grad
  1640. if self.uses_sharded_strategy or not self._offload_params
  1641. else flat_param._cpu_grad # type: ignore[attr-defined]
  1642. )
  1643. needs_grad_writeback = flat_param_grad is None or not _same_storage(
  1644. param.grad, flat_param_grad
  1645. )
  1646. if needs_grad_writeback:
  1647. if flat_param_grad is None:
  1648. flat_param_grad = torch.zeros_like(flat_param)
  1649. expected_shape = torch.Size([numel_in_shard])
  1650. self._writeback_tensor(
  1651. param.grad, flat_param_grad, i, expected_shape, offset, False
  1652. )
  1653. flat_param.grad = flat_param_grad
  1654. offset += numel_in_shard
  1655. # TODO (awgu): Handle shared parameters. We need to re-generate the
  1656. # shared parameter data structures in case sharedness changed.
  1657. for i, (
  1658. param_name,
  1659. module,
  1660. _,
  1661. prim_param_name,
  1662. prim_module,
  1663. _,
  1664. ) in enumerate(flat_param._shared_param_infos):
  1665. if getattr(module, param_name) is not getattr(prim_module, prim_param_name):
  1666. raise NotImplementedError(
  1667. "Changing shared parameters is not supported yet"
  1668. )
  1669. return wroteback
  1670. def _writeback_tensor(
  1671. self,
  1672. src_tensor: Optional[Tensor],
  1673. dst_tensor: Tensor,
  1674. tensor_index: int,
  1675. expected_shape: torch.Size,
  1676. offset: int,
  1677. is_param: bool, # else gradient
  1678. ) -> None:
  1679. """
  1680. Writes back ``src_tensor`` to ``dst_tensor`` at offset ``offset``,
  1681. where ``src_tensor`` should have shape ``expected_shape``. ``is_param``
  1682. indicates if the tensor is the parameter (if ``True``) or gradient (if
  1683. ``False``). If ``src_tensor`` is ``None``, then the effect is zeroing
  1684. instead of copying. ``tensor_index`` gives the index of ``src_tensor``
  1685. in the metadata structures.
  1686. Raises:
  1687. RuntimeError: If the ``src_tensor`` does not have the expected
  1688. shape.
  1689. """
  1690. p_assert(
  1691. len(expected_shape) == 1,
  1692. f"Expects a 1D expected shape but got {expected_shape}",
  1693. )
  1694. if self._debug_level == dist.DebugLevel.DETAIL:
  1695. rank = self.rank if hasattr(self, "rank") else dist.get_rank()
  1696. src_shape = src_tensor.shape if src_tensor is not None else None
  1697. src_device = src_tensor.device if src_tensor is not None else None
  1698. warnings.warn(
  1699. f"[Rank {rank}] {'Parameter' if is_param else 'Gradient'} needs "
  1700. f"writeback in {self._training_state}\n"
  1701. f"expected shape={expected_shape} shape={src_shape} "
  1702. f"expected device={dst_tensor.device} device={src_device}"
  1703. )
  1704. if src_tensor is not None and src_tensor.shape != expected_shape:
  1705. # NOTE: Gradient shape mismatch is not possible in practice since
  1706. # the gradient shape is enforced to match that of the parameter and
  1707. # we already check for parameter shape mismatch.
  1708. raise RuntimeError(
  1709. f"Cannot writeback when the {'parameter' if is_param else 'gradient'} "
  1710. f"shape changes\nExpects {expected_shape} but got {src_tensor.shape}"
  1711. )
  1712. if src_tensor is not None:
  1713. dst_tensor[offset : offset + expected_shape.numel()].copy_(src_tensor)
  1714. else:
  1715. dst_tensor[offset : offset + expected_shape.numel()].zero_()
  1716. assert self.flat_param._is_grad_none is not None
  1717. self.flat_param._is_grad_none[tensor_index] = True
  1718. def _clear_grads_if_needed(self):
  1719. """
  1720. When ``use_orig_params=True``, sets the underlying ``flat_param.grad``
  1721. to ``None`` if *all* of the original parameters' ``.grad`` are
  1722. ``None``. This is targeting ``optim.zero_grad(set_to_none=True)``, in
  1723. which case we want to free the gradients as soon after the
  1724. ``zero_grad()`` call as possible.
  1725. """
  1726. if not self._use_orig_params:
  1727. return
  1728. flat_param = self.flat_param
  1729. assert flat_param._params is not None
  1730. if all(param.grad is None for param in flat_param._params):
  1731. flat_param.grad = None
  1732. def _deregister_orig_params(self):
  1733. for (param_name, module, _) in self.flat_param._param_infos:
  1734. if hasattr(module, param_name):
  1735. delattr(module, param_name)
  1736. for (param_name, module, _, _, _, _) in self.flat_param._shared_param_infos:
  1737. if hasattr(module, param_name):
  1738. delattr(module, param_name)
  1739. ###########
  1740. # HELPERS #
  1741. ###########
  1742. def flat_param_to(self, *args, **kwargs):
  1743. """Wraps an in-place call to ``.to()`` for ``self.flat_param``."""
  1744. self.flat_param.data = self.flat_param.to(*args, **kwargs)
  1745. if self._use_orig_params:
  1746. # Refresh the views because their storage may have changed
  1747. if self.is_sharded(self.flat_param):
  1748. self._use_sharded_views()
  1749. else:
  1750. self._use_unsharded_views(as_params=True)
  1751. def _get_modules(self) -> Set[nn.Module]:
  1752. """Returns a :class:`set` of the modules whose parameters are included
  1753. in this handle's flattened parameter."""
  1754. return {pi.module for pi in self.flat_param._param_infos}.union(
  1755. {spi.module for spi in self.flat_param._shared_param_infos}
  1756. )
  1757. def is_sharded(self, tensor: Tensor) -> bool:
  1758. """
  1759. Returns if ``tensor`` is *currently* sharded. For ``NO_SHARD``, we
  1760. choose to have this always return ``False`` for clarity.
  1761. """
  1762. if (
  1763. not hasattr(self.flat_param, "_sharded_size")
  1764. or not self.uses_sharded_strategy
  1765. ):
  1766. # `_sharded_size` is defined iff `handle.shard()` has been called
  1767. return False
  1768. sharded_size = self.flat_param._sharded_size # type: ignore[attr-defined]
  1769. return tensor.size() == sharded_size
  1770. def parameter_module_names(self) -> Iterator[Tuple[str, str]]:
  1771. shared_param_infos = [
  1772. ParamInfo(param_name, module, module_name)
  1773. for (
  1774. param_name,
  1775. module,
  1776. module_name,
  1777. _,
  1778. _,
  1779. _,
  1780. ) in self.flat_param._shared_param_infos
  1781. ]
  1782. for param_name, _, module_name in chain(
  1783. self.flat_param._param_infos, shared_param_infos
  1784. ):
  1785. yield (param_name, module_name)
  1786. def shared_parameter_module_names(self) -> Iterator[Tuple[str, str]]:
  1787. for param_name, _, module_name in [
  1788. ParamInfo(param_name, module, module_name)
  1789. for (
  1790. param_name,
  1791. module,
  1792. module_name,
  1793. _,
  1794. _,
  1795. _,
  1796. ) in self.flat_param._shared_param_infos
  1797. ]:
  1798. yield (param_name, module_name)
  1799. @property
  1800. def _fqns_in_shard(self) -> List[str]:
  1801. """Returns the FQNs of the parameters present in this rank's shard."""
  1802. fqns_in_shard: List[str] = []
  1803. start, end = self.flat_param._shard_indices # type: ignore[attr-defined]
  1804. for i in range(len(self.flat_param._fqns)):
  1805. if i >= start and i <= end and self.flat_param._shard_param_offsets: # type: ignore[attr-defined]
  1806. fqns_in_shard.append(self.flat_param._fqns[i])
  1807. return fqns_in_shard
  1808. @property
  1809. def sharded_grad(self) -> Optional[Tensor]:
  1810. """Returns the handle's sharded gradient."""
  1811. flat_param = self.flat_param
  1812. # Priority for non-`None`: `_cpu_grad` > `_saved_grad_shard` > `grad`
  1813. # - CPU offloading: `_cpu_grad`
  1814. # - No CPU offloading + sharded strategies: `_saved_grad_shard`
  1815. # - No CPU offloading + `NO_SHARD`: `grad`
  1816. if hasattr(flat_param, "_cpu_grad"):
  1817. grad = flat_param._cpu_grad # type: ignore[attr-defined]
  1818. elif hasattr(flat_param, "_saved_grad_shard"):
  1819. grad = flat_param._saved_grad_shard # type: ignore[attr-defined]
  1820. else:
  1821. # If in the forward, then there may be an accumulated gradient,
  1822. # which will be in `.grad`
  1823. p_assert(
  1824. flat_param.grad is None
  1825. or not self.uses_sharded_strategy
  1826. or self._training_state == HandleTrainingState.FORWARD,
  1827. "Sharded strategies should use `_cpu_grad` or `_saved_grad_shard` "
  1828. "unless in FORWARD (for the post-forward reshard)",
  1829. )
  1830. grad = flat_param.grad
  1831. return grad
  1832. def _reset_is_grad_none(self) -> None:
  1833. """
  1834. Resets the ``_is_grad_none`` mask as needed. This method should only be
  1835. called in the post-backward after gradient computation, in which case
  1836. if a parameter requires gradient, then it will surely receive a
  1837. gradient and we may reset its mask entry to ``False``.
  1838. """
  1839. if not self._use_orig_params:
  1840. return
  1841. p_assert(
  1842. self._training_state == HandleTrainingState.BACKWARD_POST,
  1843. "Expects to only be called in the post-backward after gradient computation",
  1844. )
  1845. flat_param = self.flat_param
  1846. assert flat_param._params is not None # mypy
  1847. for i, param in enumerate(flat_param._params):
  1848. # As long as the parameter requires gradient, it should receive a
  1849. # meaningful gradient (even if the gradient happens to be zeros)
  1850. if param.requires_grad:
  1851. assert flat_param._is_grad_none is not None # mypy
  1852. flat_param._is_grad_none[i] = False
  1853. #######################
  1854. # CHECKS & INVARIANTS #
  1855. #######################
  1856. def _check_sharded_strategy(self):
  1857. p_assert(self.uses_sharded_strategy, "Expects sharded strategy")
  1858. def _check_on_compute_device(self, tensor: Tensor):
  1859. p_assert(
  1860. tensor.device == self.device,
  1861. f"Expects tensor to be on the compute device {self.device}",
  1862. )
  1863. def _check_on_cpu(self, tensor: Tensor):
  1864. p_assert(
  1865. tensor.device == torch.device("cpu"),
  1866. f"Expects tensor to be on CPU but got {tensor.device}",
  1867. )
  1868. @staticmethod
  1869. def _check_storage_freed(tensor: Tensor):
  1870. storage_size: int = tensor._typed_storage()._size()
  1871. p_assert(
  1872. storage_size == 0,
  1873. f"Expects storage to be freed but got storage with size {storage_size}",
  1874. )
  1875. @staticmethod
  1876. def _check_storage_allocated(tensor: Tensor):
  1877. storage_size: int = tensor._typed_storage()._size()
  1878. p_assert(storage_size > 0, "Expects storage to be allocated")
  1879. def _check_low_precision_shard(self):
  1880. p_assert(
  1881. self._uses_param_mixed_precision,
  1882. "Not using low precision for parameters",
  1883. )
  1884. p_assert(
  1885. getattr(self.flat_param, "_mp_shard", None) is not None,
  1886. "Expects `_mp_shard` to exist",
  1887. )
  1888. device = self.flat_param._mp_shard.device # type: ignore[attr-defined]
  1889. p_assert(
  1890. device == self.device,
  1891. f"Expects the low precision shard to be on {self.device} but got {device}",
  1892. )
  1893. def _check_unsharded(self, tensor: Tensor):
  1894. msg_prefix = "Expects tensor to be unsharded "
  1895. p_assert(tensor is not None, msg_prefix + "but got `None`")
  1896. unsharded_size = self.flat_param._unpadded_unsharded_size
  1897. p_assert(
  1898. tensor.size() == unsharded_size,
  1899. msg_prefix + f"with size {unsharded_size} but got {tensor.size()}",
  1900. )
  1901. def _check_sharded(self, tensor: Tensor):
  1902. msg_prefix = "Expects tensor to be sharded "
  1903. p_assert(tensor is not None, msg_prefix + "but got `None`")
  1904. sharded_size = self.flat_param._sharded_size # type: ignore[attr-defined]
  1905. p_assert(
  1906. tensor.size() == sharded_size,
  1907. msg_prefix + f"with size {sharded_size} but got {tensor.size()}",
  1908. )
  1909. ##############
  1910. # PROPERTIES #
  1911. ##############
  1912. @property
  1913. def uses_sharded_strategy(self) -> bool:
  1914. return self._sharding_strategy != HandleShardingStrategy.NO_SHARD
  1915. @property
  1916. def _uses_param_mixed_precision(self) -> bool:
  1917. return self._fwd_bwd_param_dtype != self._orig_param_dtype
  1918. @property
  1919. def _uses_reduce_mixed_precision(self) -> bool:
  1920. return self._reduce_dtype != self._orig_param_dtype
  1921. @property
  1922. def _force_full_precision(self) -> bool:
  1923. return (
  1924. self._training_state == HandleTrainingState.SUMMON_FULL_PARAMS
  1925. and self._uses_param_mixed_precision
  1926. )
  1927. # A handles key represents the group of `FlatParamHandle`s involved in a given
  1928. # module's forward. These will be all-gathered together in the pre-forward and
  1929. # pre-backward.
  1930. _HandlesKey = Tuple[FlatParamHandle, ...]