123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- from typing import Dict, List, Optional
- import torch
- import torch.optim._functional as F
- from torch import Tensor
- __all__: List[str] = []
- # Define a TorchScript compatible Functional SGD Optimizer
- # where we use these optimizer in a functional way.
- # Instead of using the `param.grad` when updating parameters,
- # we explicitly allow the distributed optimizer pass gradients to
- # the `step` function. In this way, we could separate the gradients
- # and parameters and allow multithreaded trainer to update the
- # parameters without data traces on accumulating to the same .grad.
- # NOTE: This should be only used by distributed optimizer internals
- # and not meant to expose to the user.
- @torch.jit.script
- class _FunctionalSGD:
- def __init__(
- self,
- params: List[Tensor],
- lr: float = 1e-2,
- momentum: float = 0.0,
- dampening: float = 0.0,
- weight_decay: float = 0.0,
- nesterov: bool = False,
- maximize: bool = False,
- foreach: bool = False,
- _allow_empty_param_list: bool = False,
- ):
- self.defaults = {
- "lr": lr,
- "momentum": momentum,
- "dampening": dampening,
- "weight_decay": weight_decay,
- }
- self.nesterov = nesterov
- self.maximize = maximize
- self.foreach = foreach
- self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {})
- if len(params) == 0 and not _allow_empty_param_list:
- raise ValueError("optimizer got an empty parameter list")
- # NOTE: we only have one param_group and don't allow user to add additional
- # param group as it's not a common use case.
- self.param_group = {"params": params}
- def step_param(self, param: Tensor, grad: Optional[Tensor]):
- """Similar to self.step, but operates on a single parameter and
- its gradient.
- """
- # TODO: Once step_param interface is robust, refactor step to call
- # step param on each param.
- weight_decay = self.defaults["weight_decay"]
- momentum = self.defaults["momentum"]
- dampening = self.defaults["dampening"]
- lr = self.defaults["lr"]
- params = [param]
- momentum_buffer_list: List[Optional[Tensor]] = []
- grads = []
- has_sparse_grad = False
- if grad is not None:
- grads.append(grad)
- if grad.is_sparse:
- has_sparse_grad = True
- if param not in self.state:
- self.state[param] = {}
- state = self.state[param]
- if "momentum_buffer" not in state:
- momentum_buffer_list.append(None)
- else:
- momentum_buffer_list.append(state["momentum_buffer"])
- with torch.no_grad():
- F.sgd(
- params,
- grads,
- momentum_buffer_list,
- weight_decay=weight_decay,
- momentum=momentum,
- lr=lr,
- dampening=dampening,
- nesterov=self.nesterov,
- maximize=self.maximize,
- has_sparse_grad=has_sparse_grad,
- foreach=self.foreach,
- )
- # update momentum_buffer in state
- state = self.state[param]
- momentum_buffer = momentum_buffer_list[0]
- if momentum_buffer is not None:
- state["momentum_buffer"] = momentum_buffer
- def step(self, gradients: List[Optional[Tensor]]):
- params = self.param_group["params"]
- params_with_grad = []
- grads = []
- momentum_buffer_list: List[Optional[Tensor]] = []
- lr = self.defaults["lr"]
- weight_decay = self.defaults["weight_decay"]
- momentum = self.defaults["momentum"]
- dampening = self.defaults["dampening"]
- if len(params) != len(gradients):
- raise ValueError(
- "the gradients passed in does not equal to the size of the parameters!"
- + f"Params length: {len(params)}. "
- + f"Gradients length: {len(gradients)}"
- )
- has_sparse_grad = False
- for param, gradient in zip(params, gradients):
- if gradient is not None:
- params_with_grad.append(param)
- grads.append(gradient)
- if gradient.is_sparse:
- has_sparse_grad = True
- if param not in self.state:
- self.state[param] = {}
- state = self.state[param]
- if "momentum_buffer" not in state:
- momentum_buffer_list.append(None)
- else:
- momentum_buffer_list.append(state["momentum_buffer"])
- with torch.no_grad():
- F.sgd(
- params_with_grad,
- grads,
- momentum_buffer_list,
- weight_decay=weight_decay,
- momentum=momentum,
- lr=lr,
- dampening=dampening,
- nesterov=self.nesterov,
- maximize=self.maximize,
- has_sparse_grad=has_sparse_grad,
- foreach=self.foreach,
- )
- # update momentum_buffers in state
- for i, p in enumerate(params_with_grad):
- state = self.state[p]
- momentum_buffer = momentum_buffer_list[i]
- if momentum_buffer is not None:
- state["momentum_buffer"] = momentum_buffer
|