utils.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. from functools import update_wrapper
  2. from numbers import Number
  3. import torch
  4. import torch.nn.functional as F
  5. from typing import Dict, Any
  6. from torch.overrides import is_tensor_like
  7. euler_constant = 0.57721566490153286060 # Euler Mascheroni Constant
  8. __all__ = ["broadcast_all", "logits_to_probs", "clamp_probs", "probs_to_logits", "lazy_property",
  9. "tril_matrix_to_vec", "vec_to_tril_matrix"]
  10. def broadcast_all(*values):
  11. r"""
  12. Given a list of values (possibly containing numbers), returns a list where each
  13. value is broadcasted based on the following rules:
  14. - `torch.*Tensor` instances are broadcasted as per :ref:`_broadcasting-semantics`.
  15. - numbers.Number instances (scalars) are upcast to tensors having
  16. the same size and type as the first tensor passed to `values`. If all the
  17. values are scalars, then they are upcasted to scalar Tensors.
  18. Args:
  19. values (list of `numbers.Number`, `torch.*Tensor` or objects implementing __torch_function__)
  20. Raises:
  21. ValueError: if any of the values is not a `numbers.Number` instance,
  22. a `torch.*Tensor` instance, or an instance implementing __torch_function__
  23. """
  24. if not all(is_tensor_like(v) or isinstance(v, Number)
  25. for v in values):
  26. raise ValueError('Input arguments must all be instances of numbers.Number, '
  27. 'torch.Tensor or objects implementing __torch_function__.')
  28. if not all(is_tensor_like(v) for v in values):
  29. options: Dict[str, Any] = dict(dtype=torch.get_default_dtype())
  30. for value in values:
  31. if isinstance(value, torch.Tensor):
  32. options = dict(dtype=value.dtype, device=value.device)
  33. break
  34. new_values = [v if is_tensor_like(v) else torch.tensor(v, **options)
  35. for v in values]
  36. return torch.broadcast_tensors(*new_values)
  37. return torch.broadcast_tensors(*values)
  38. def _standard_normal(shape, dtype, device):
  39. if torch._C._get_tracing_state():
  40. # [JIT WORKAROUND] lack of support for .normal_()
  41. return torch.normal(torch.zeros(shape, dtype=dtype, device=device),
  42. torch.ones(shape, dtype=dtype, device=device))
  43. return torch.empty(shape, dtype=dtype, device=device).normal_()
  44. def _sum_rightmost(value, dim):
  45. r"""
  46. Sum out ``dim`` many rightmost dimensions of a given tensor.
  47. Args:
  48. value (Tensor): A tensor of ``.dim()`` at least ``dim``.
  49. dim (int): The number of rightmost dims to sum out.
  50. """
  51. if dim == 0:
  52. return value
  53. required_shape = value.shape[:-dim] + (-1,)
  54. return value.reshape(required_shape).sum(-1)
  55. def logits_to_probs(logits, is_binary=False):
  56. r"""
  57. Converts a tensor of logits into probabilities. Note that for the
  58. binary case, each value denotes log odds, whereas for the
  59. multi-dimensional case, the values along the last dimension denote
  60. the log probabilities (possibly unnormalized) of the events.
  61. """
  62. if is_binary:
  63. return torch.sigmoid(logits)
  64. return F.softmax(logits, dim=-1)
  65. def clamp_probs(probs):
  66. eps = torch.finfo(probs.dtype).eps
  67. return probs.clamp(min=eps, max=1 - eps)
  68. def probs_to_logits(probs, is_binary=False):
  69. r"""
  70. Converts a tensor of probabilities into logits. For the binary case,
  71. this denotes the probability of occurrence of the event indexed by `1`.
  72. For the multi-dimensional case, the values along the last dimension
  73. denote the probabilities of occurrence of each of the events.
  74. """
  75. ps_clamped = clamp_probs(probs)
  76. if is_binary:
  77. return torch.log(ps_clamped) - torch.log1p(-ps_clamped)
  78. return torch.log(ps_clamped)
  79. class lazy_property:
  80. r"""
  81. Used as a decorator for lazy loading of class attributes. This uses a
  82. non-data descriptor that calls the wrapped method to compute the property on
  83. first call; thereafter replacing the wrapped method into an instance
  84. attribute.
  85. """
  86. def __init__(self, wrapped):
  87. self.wrapped = wrapped
  88. update_wrapper(self, wrapped)
  89. def __get__(self, instance, obj_type=None):
  90. if instance is None:
  91. return _lazy_property_and_property(self.wrapped)
  92. with torch.enable_grad():
  93. value = self.wrapped(instance)
  94. setattr(instance, self.wrapped.__name__, value)
  95. return value
  96. class _lazy_property_and_property(lazy_property, property):
  97. """We want lazy properties to look like multiple things.
  98. * property when Sphinx autodoc looks
  99. * lazy_property when Distribution validate_args looks
  100. """
  101. def __init__(self, wrapped):
  102. return property.__init__(self, wrapped)
  103. def tril_matrix_to_vec(mat, diag=0):
  104. r"""
  105. Convert a `D x D` matrix or a batch of matrices into a (batched) vector
  106. which comprises of lower triangular elements from the matrix in row order.
  107. """
  108. n = mat.shape[-1]
  109. if not torch._C._get_tracing_state() and (diag < -n or diag >= n):
  110. raise ValueError(f'diag ({diag}) provided is outside [{-n}, {n-1}].')
  111. arange = torch.arange(n, device=mat.device)
  112. tril_mask = arange < arange.view(-1, 1) + (diag + 1)
  113. vec = mat[..., tril_mask]
  114. return vec
  115. def vec_to_tril_matrix(vec, diag=0):
  116. r"""
  117. Convert a vector or a batch of vectors into a batched `D x D`
  118. lower triangular matrix containing elements from the vector in row order.
  119. """
  120. # +ve root of D**2 + (1+2*diag)*D - |diag| * (diag+1) - 2*vec.shape[-1] = 0
  121. n = (-(1 + 2 * diag) + ((1 + 2 * diag)**2 + 8 * vec.shape[-1] + 4 * abs(diag) * (diag + 1))**0.5) / 2
  122. eps = torch.finfo(vec.dtype).eps
  123. if not torch._C._get_tracing_state() and (round(n) - n > eps):
  124. raise ValueError(f'The size of last dimension is {vec.shape[-1]} which cannot be expressed as ' +
  125. 'the lower triangular part of a square D x D matrix.')
  126. n = torch.round(n).long() if isinstance(n, torch.Tensor) else round(n)
  127. mat = vec.new_zeros(vec.shape[:-1] + torch.Size((n, n)))
  128. arange = torch.arange(n, device=vec.device)
  129. tril_mask = arange < arange.view(-1, 1) + (diag + 1)
  130. mat[..., tril_mask] = vec
  131. return mat