functional_sgd.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from typing import Dict, List, Optional
  2. import torch
  3. import torch.optim._functional as F
  4. from torch import Tensor
  5. __all__: List[str] = []
  6. # Define a TorchScript compatible Functional SGD Optimizer
  7. # where we use these optimizer in a functional way.
  8. # Instead of using the `param.grad` when updating parameters,
  9. # we explicitly allow the distributed optimizer pass gradients to
  10. # the `step` function. In this way, we could separate the gradients
  11. # and parameters and allow multithreaded trainer to update the
  12. # parameters without data traces on accumulating to the same .grad.
  13. # NOTE: This should be only used by distributed optimizer internals
  14. # and not meant to expose to the user.
  15. @torch.jit.script
  16. class _FunctionalSGD:
  17. def __init__(
  18. self,
  19. params: List[Tensor],
  20. lr: float = 1e-2,
  21. momentum: float = 0.0,
  22. dampening: float = 0.0,
  23. weight_decay: float = 0.0,
  24. nesterov: bool = False,
  25. maximize: bool = False,
  26. foreach: bool = False,
  27. _allow_empty_param_list: bool = False,
  28. ):
  29. self.defaults = {
  30. "lr": lr,
  31. "momentum": momentum,
  32. "dampening": dampening,
  33. "weight_decay": weight_decay,
  34. }
  35. self.nesterov = nesterov
  36. self.maximize = maximize
  37. self.foreach = foreach
  38. self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {})
  39. if len(params) == 0 and not _allow_empty_param_list:
  40. raise ValueError("optimizer got an empty parameter list")
  41. # NOTE: we only have one param_group and don't allow user to add additional
  42. # param group as it's not a common use case.
  43. self.param_group = {"params": params}
  44. def step_param(self, param: Tensor, grad: Optional[Tensor]):
  45. """Similar to self.step, but operates on a single parameter and
  46. its gradient.
  47. """
  48. # TODO: Once step_param interface is robust, refactor step to call
  49. # step param on each param.
  50. weight_decay = self.defaults["weight_decay"]
  51. momentum = self.defaults["momentum"]
  52. dampening = self.defaults["dampening"]
  53. lr = self.defaults["lr"]
  54. params = [param]
  55. momentum_buffer_list: List[Optional[Tensor]] = []
  56. grads = []
  57. has_sparse_grad = False
  58. if grad is not None:
  59. grads.append(grad)
  60. if grad.is_sparse:
  61. has_sparse_grad = True
  62. if param not in self.state:
  63. self.state[param] = {}
  64. state = self.state[param]
  65. if "momentum_buffer" not in state:
  66. momentum_buffer_list.append(None)
  67. else:
  68. momentum_buffer_list.append(state["momentum_buffer"])
  69. with torch.no_grad():
  70. F.sgd(
  71. params,
  72. grads,
  73. momentum_buffer_list,
  74. weight_decay=weight_decay,
  75. momentum=momentum,
  76. lr=lr,
  77. dampening=dampening,
  78. nesterov=self.nesterov,
  79. maximize=self.maximize,
  80. has_sparse_grad=has_sparse_grad,
  81. foreach=self.foreach,
  82. )
  83. # update momentum_buffer in state
  84. state = self.state[param]
  85. momentum_buffer = momentum_buffer_list[0]
  86. if momentum_buffer is not None:
  87. state["momentum_buffer"] = momentum_buffer
  88. def step(self, gradients: List[Optional[Tensor]]):
  89. params = self.param_group["params"]
  90. params_with_grad = []
  91. grads = []
  92. momentum_buffer_list: List[Optional[Tensor]] = []
  93. lr = self.defaults["lr"]
  94. weight_decay = self.defaults["weight_decay"]
  95. momentum = self.defaults["momentum"]
  96. dampening = self.defaults["dampening"]
  97. if len(params) != len(gradients):
  98. raise ValueError(
  99. "the gradients passed in does not equal to the size of the parameters!"
  100. + f"Params length: {len(params)}. "
  101. + f"Gradients length: {len(gradients)}"
  102. )
  103. has_sparse_grad = False
  104. for param, gradient in zip(params, gradients):
  105. if gradient is not None:
  106. params_with_grad.append(param)
  107. grads.append(gradient)
  108. if gradient.is_sparse:
  109. has_sparse_grad = True
  110. if param not in self.state:
  111. self.state[param] = {}
  112. state = self.state[param]
  113. if "momentum_buffer" not in state:
  114. momentum_buffer_list.append(None)
  115. else:
  116. momentum_buffer_list.append(state["momentum_buffer"])
  117. with torch.no_grad():
  118. F.sgd(
  119. params_with_grad,
  120. grads,
  121. momentum_buffer_list,
  122. weight_decay=weight_decay,
  123. momentum=momentum,
  124. lr=lr,
  125. dampening=dampening,
  126. nesterov=self.nesterov,
  127. maximize=self.maximize,
  128. has_sparse_grad=has_sparse_grad,
  129. foreach=self.foreach,
  130. )
  131. # update momentum_buffers in state
  132. for i, p in enumerate(params_with_grad):
  133. state = self.state[p]
  134. momentum_buffer = momentum_buffer_list[i]
  135. if momentum_buffer is not None:
  136. state["momentum_buffer"] = momentum_buffer