post_localSGD_optimizer.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import warnings
  2. import torch
  3. import torch.distributed.algorithms.model_averaging.averagers as averagers
  4. class PostLocalSGDOptimizer(torch.optim.Optimizer):
  5. r"""
  6. Wraps an arbitrary :class:`torch.optim.Optimizer` and runs `post-local SGD <https://arxiv.org/abs/1808.07217>`_,
  7. This optimizer runs local optimizer at every step.
  8. After the warm-up stage, it averages parameters periodically afer the local optimizer is applied.
  9. Args:
  10. optim: The local optimizer.
  11. averager: A model averager instance to run post-localSGD algorithm.
  12. Example::
  13. >>> # xdoctest: +SKIP("undefined variables")
  14. >>> import torch
  15. >>> import torch.distributed as dist
  16. >>> import torch.distributed.algorithms.model_averaging.averagers as averagers
  17. >>> import torch.nn as nn
  18. >>> from torch.distributed.optim import PostLocalSGDOptimizer
  19. >>> from torch.distributed.algorithms.ddp_comm_hooks.post_localSGD_hook import (
  20. >>> PostLocalSGDState,
  21. >>> post_localSGD_hook,
  22. >>> )
  23. >>>
  24. >>> model = nn.parallel.DistributedDataParallel(
  25. >>> module, device_ids=[rank], output_device=rank
  26. >>> )
  27. >>>
  28. >>> # Register a post-localSGD communication hook.
  29. >>> state = PostLocalSGDState(process_group=None, subgroup=None, start_localSGD_iter=100)
  30. >>> model.register_comm_hook(state, post_localSGD_hook)
  31. >>>
  32. >>> # Create a post-localSGD optimizer that wraps a local optimizer.
  33. >>> # Note that ``warmup_steps`` used in ``PostLocalSGDOptimizer`` must be the same as
  34. >>> # ``start_localSGD_iter`` used in ``PostLocalSGDState``.
  35. >>> local_optim = torch.optim.SGD(params=model.parameters(), lr=0.01)
  36. >>> opt = PostLocalSGDOptimizer(
  37. >>> optim=local_optim,
  38. >>> averager=averagers.PeriodicModelAverager(period=4, warmup_steps=100)
  39. >>> )
  40. >>>
  41. >>> # In the first 100 steps, DDP runs global gradient averaging at every step.
  42. >>> # After 100 steps, DDP runs gradient averaging within each subgroup (intra-node by default),
  43. >>> # and post-localSGD optimizer runs global model averaging every 4 steps after applying the local optimizer.
  44. >>> for step in range(0, 200):
  45. >>> opt.zero_grad()
  46. >>> loss = loss_fn(output, labels)
  47. >>> loss.backward()
  48. >>> opt.step()
  49. """
  50. def __init__(self, optim: torch.optim.Optimizer, averager: averagers.ModelAverager):
  51. self.optim = optim
  52. self.param_groups = self.optim.param_groups
  53. self.averager = averager
  54. @property
  55. def state(self):
  56. return self.optim.state
  57. def __repr__(self):
  58. return self.optim.__repr__()
  59. def state_dict(self):
  60. r"""
  61. This is the same as :class:`torch.optim.Optimizer` :meth:`state_dict`,
  62. but adds an extra entry to record model averager's step to the checkpoint
  63. to ensure reload does not cause unnecessary warm up again.
  64. """
  65. optim_state_dict = self.optim.state_dict()
  66. optim_state_dict["step"] = self.averager.step
  67. return optim_state_dict
  68. def load_state_dict(self, state_dict):
  69. r"""
  70. This is the same as :class:`torch.optim.Optimizer` :meth:`load_state_dict`,
  71. but also restores model averager's step value to the one
  72. saved in the provided ``state_dict``.
  73. If there is no ``"step"`` entry in ``state_dict``,
  74. it will raise a warning and initialize the model averager's step to 0.
  75. """
  76. self.optim.load_state_dict(state_dict)
  77. if "step" in state_dict:
  78. self.averager.step = state_dict["step"]
  79. else:
  80. warnings.warn(
  81. "Loaded state dict does not contain a step counter for an averager. "
  82. "Setting step counter to 0."
  83. )
  84. self.averager.step = 0
  85. def step(self):
  86. r"""
  87. Performs a single optimization step (parameter update).
  88. """
  89. self.optim.step()
  90. self.averager.average_parameters(params=self.param_groups)
  91. def zero_grad(self, set_to_none: bool = True): # type: ignore[override]
  92. self.optim.zero_grad(set_to_none=set_to_none)
  93. def add_param_group(self, param_group):
  94. self.optim.add_param_group(param_group)