123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- from typing import Optional
- import torch.distributed as dist
- from .planner import SavePlanner
- from .default_planner import DefaultSavePlanner
- from .storage import (
- StorageWriter,
- )
- from .metadata import Metadata, STATE_DICT_TYPE
- from .utils import _DistWrapper
- __all__ = ["save_state_dict"]
- def save_state_dict(
- state_dict: STATE_DICT_TYPE,
- storage_writer: StorageWriter,
- process_group: Optional[dist.ProcessGroup] = None,
- coordinator_rank: int = 0,
- no_dist: bool = False,
- planner: SavePlanner = None,
- ) -> Metadata:
- """
- Saves a distributed model in SPMD style.
- This function is different from ``torch.save()`` as it handles
- ``ShardedTensor`` by having each rank only save their local shards.
- .. warning::
- There is no guarantees of Backwards Compatibility across PyTorch versions
- for saved state_dicts.
- .. warning::
- If using the `process_group` argument, make sure that only its ranks
- call `save_state_dict` and that all data in state_dict belong to it.
- .. note::
- This function can be used to save a state_dict with an intialized process
- group by passing ``no_dist=True``. This can be used to produce a checkpoint
- that can consumed by load_state_dict is a SPMD fashion.
- Args:
- state_dict (Dict[str, Any]): A state_dict
- storage_writer (StorageWriter):
- Instance of StorageWrite use to perform writes.
- process_group (ProcessGroup):
- ProcessGroup to be used for cross-rank synchronization.
- coordinator_rank (int): Rank to use to coordinate the checkpoint.
- rank0 is used by default.
- no_dist (bool): If ``True``, distributed checkpoint will not save
- in SPMD style. (Default: ``False``)
- Returns:
- Metadata: Metadata object for the saved checkpoint.
- Example:
- >>> # xdoctest: +SKIP
- >>> my_model = MyModule()
- >>> model_state_dict = my_model.state_dict()
- >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter("/checkpoint/1")
- >>> torch.distributed.checkpoint.save_state_dict(
- >>> state_dict=model_state_dict,
- >>> storage_writer=fs_stroage_writer,
- >>> )
- .. note::
- save_state_dict uses collectives to coordinate writes across ranks.
- For NCCL-based process groups, internal tensor representations of
- objects must be moved to the GPU device before communication takes place.
- In this case, the device used is given by ``torch.cuda.current_device()``
- and it is the user's responsibility to ensure that this is set so that
- each rank has an individual GPU, via ``torch.cuda.set_device()``.
- """
- distW = _DistWrapper(process_group, not no_dist, coordinator_rank)
- if planner is None:
- planner = DefaultSavePlanner()
- assert planner is not None
- global_metatadata = None
- def local_step():
- assert planner is not None
- planner.set_up_planner(state_dict, distW.is_coordinator)
- storage_writer.set_up_storage_writer(distW.is_coordinator)
- local_plan = planner.create_local_plan()
- local_plan = storage_writer.prepare_local_plan(local_plan)
- return local_plan
- def global_step(all_local_plans):
- nonlocal global_metatadata
- assert planner is not None
- all_local_plans, global_metatadata = planner.create_global_plan(
- all_local_plans
- )
- all_local_plans = storage_writer.prepare_global_plan(all_local_plans)
- return all_local_plans
- central_plan = distW.reduce_scatter("plan", local_step, global_step)
- def write_data():
- assert planner is not None
- final_local_plan = planner.finish_plan(central_plan)
- all_writes = storage_writer.write_data(final_local_plan, planner)
- all_writes.wait()
- return all_writes.value()
- def finish_checkpoint(all_results):
- assert global_metatadata is not None
- storage_writer.finish(metadata=global_metatadata, results=all_results)
- return global_metatadata
- return distW.all_reduce("write", write_data, finish_checkpoint)
|