import abc import torch.nn as nn from dataclasses import dataclass from typing import Dict, List, Optional, Union from torch.distributed._shard.sharder import Sharder from torch.distributed._shard.sharding_spec import ShardingSpec @dataclass class ShardingPlan: """ Representation of a sharding plan, describes how to shard a module across hosts. `plan` is used to shard module parameters according to the spec provided, `output_plan` and `return_local_tensor` are optional, they are used to specify the output layout of a module with a spec, and when to convert back to data parallel fashion. Args: plan (Dict[str, Union[:class:`torch.distributed._shard.sharding_spec.ShardingSpec`, :class:`torch.distributed._shard.sharder.Sharder`]): a dict describes how to shard a module, there're currently two ways to shard a module: 1. directly shard a module parameter by a `ShardingSpec`, keyed by the name of a parameter to a `ShardingSpec`. 2. shard a submodule by applying a `Sharder` on it, keyed by the name of a module to a `Sharder` object. output_plan (Dict[str, :class:`torch.distributed._shard.sharding_spec.ShardingSpec`), optional): a dict specifies the layout of a module's output which produces a ShardedTensor, keyed by the name of module to ShardingSpec("" in key means the root module). Default: `None` return_local_tensor (List[str], optional): a list of string, each element enables a module's sharded output to be returned as a Tensor from its local shards to ensure further processsing in a data parallel fashion. ("" in list means the root module). Default: None Example: Suppose we want to shard a module with two linear layers and then run it with DDP, we also want to convert the output of the second linear layer back to DDP, we can do it as follows: >>> # xdoctest: +REQUIRES(module:torch._C._distributed_c10d) >>> class MyModule(nn.Module): >>> def __init__(self): >>> super().__init__() >>> self.fc1 = nn.Linear() >>> self.gelu = nn.GELU() >>> self.fc2 = nn.Linear() >>> self.relu = nn.Linear() >>> >>> def forward(self, input): >>> return self.relu(self.fc2(self.gelu(self.fc1(input)))) >>> # xdoctest: +SKIP("Undefined spec1, spec2) >>> sharding_plan = ShardingPlan( >>> plan={ >>> "fc1.weight": spec1, >>> "fc2.weight": spec2 >>> }, >>> output_plan={ >>> "fc2": output_spec >>> }, >>> return_local_tensor=["fc2"] >>> ) """ plan: Dict[str, Union[ShardingSpec, Sharder]] output_plan: Optional[Dict[str, ShardingSpec]] = None return_local_tensor: Optional[List[str]] = None class ShardingPlanner(abc.ABC): """ Default ShardingPlanner interface, can be extended and implement advanced sharding strategies. """ @abc.abstractmethod def build_plan(self, module: nn.Module) -> ShardingPlan: """ Given a nn.Module, define how to shard the module across ranks, return a ShardingPlan Args: module (:class:`torch.nn.Module`): The module to apply sharding to. Returns: A :class:`torch.distributed._shard.sharding_plan.ShardingPlan` object that represents how to shard the module. """ pass