binomial.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import torch
  2. from torch.distributions import constraints
  3. from torch.distributions.distribution import Distribution
  4. from torch.distributions.utils import broadcast_all, probs_to_logits, lazy_property, logits_to_probs
  5. __all__ = ['Binomial']
  6. def _clamp_by_zero(x):
  7. # works like clamp(x, min=0) but has grad at 0 is 0.5
  8. return (x.clamp(min=0) + x - x.clamp(max=0)) / 2
  9. class Binomial(Distribution):
  10. r"""
  11. Creates a Binomial distribution parameterized by :attr:`total_count` and
  12. either :attr:`probs` or :attr:`logits` (but not both). :attr:`total_count` must be
  13. broadcastable with :attr:`probs`/:attr:`logits`.
  14. Example::
  15. >>> # xdoctest: +IGNORE_WANT("non-deterinistic")
  16. >>> m = Binomial(100, torch.tensor([0 , .2, .8, 1]))
  17. >>> x = m.sample()
  18. tensor([ 0., 22., 71., 100.])
  19. >>> m = Binomial(torch.tensor([[5.], [10.]]), torch.tensor([0.5, 0.8]))
  20. >>> x = m.sample()
  21. tensor([[ 4., 5.],
  22. [ 7., 6.]])
  23. Args:
  24. total_count (int or Tensor): number of Bernoulli trials
  25. probs (Tensor): Event probabilities
  26. logits (Tensor): Event log-odds
  27. """
  28. arg_constraints = {'total_count': constraints.nonnegative_integer,
  29. 'probs': constraints.unit_interval,
  30. 'logits': constraints.real}
  31. has_enumerate_support = True
  32. def __init__(self, total_count=1, probs=None, logits=None, validate_args=None):
  33. if (probs is None) == (logits is None):
  34. raise ValueError("Either `probs` or `logits` must be specified, but not both.")
  35. if probs is not None:
  36. self.total_count, self.probs, = broadcast_all(total_count, probs)
  37. self.total_count = self.total_count.type_as(self.probs)
  38. else:
  39. self.total_count, self.logits, = broadcast_all(total_count, logits)
  40. self.total_count = self.total_count.type_as(self.logits)
  41. self._param = self.probs if probs is not None else self.logits
  42. batch_shape = self._param.size()
  43. super().__init__(batch_shape, validate_args=validate_args)
  44. def expand(self, batch_shape, _instance=None):
  45. new = self._get_checked_instance(Binomial, _instance)
  46. batch_shape = torch.Size(batch_shape)
  47. new.total_count = self.total_count.expand(batch_shape)
  48. if 'probs' in self.__dict__:
  49. new.probs = self.probs.expand(batch_shape)
  50. new._param = new.probs
  51. if 'logits' in self.__dict__:
  52. new.logits = self.logits.expand(batch_shape)
  53. new._param = new.logits
  54. super(Binomial, new).__init__(batch_shape, validate_args=False)
  55. new._validate_args = self._validate_args
  56. return new
  57. def _new(self, *args, **kwargs):
  58. return self._param.new(*args, **kwargs)
  59. @constraints.dependent_property(is_discrete=True, event_dim=0)
  60. def support(self):
  61. return constraints.integer_interval(0, self.total_count)
  62. @property
  63. def mean(self):
  64. return self.total_count * self.probs
  65. @property
  66. def mode(self):
  67. return ((self.total_count + 1) * self.probs).floor().clamp(max=self.total_count)
  68. @property
  69. def variance(self):
  70. return self.total_count * self.probs * (1 - self.probs)
  71. @lazy_property
  72. def logits(self):
  73. return probs_to_logits(self.probs, is_binary=True)
  74. @lazy_property
  75. def probs(self):
  76. return logits_to_probs(self.logits, is_binary=True)
  77. @property
  78. def param_shape(self):
  79. return self._param.size()
  80. def sample(self, sample_shape=torch.Size()):
  81. shape = self._extended_shape(sample_shape)
  82. with torch.no_grad():
  83. return torch.binomial(self.total_count.expand(shape), self.probs.expand(shape))
  84. def log_prob(self, value):
  85. if self._validate_args:
  86. self._validate_sample(value)
  87. log_factorial_n = torch.lgamma(self.total_count + 1)
  88. log_factorial_k = torch.lgamma(value + 1)
  89. log_factorial_nmk = torch.lgamma(self.total_count - value + 1)
  90. # k * log(p) + (n - k) * log(1 - p) = k * (log(p) - log(1 - p)) + n * log(1 - p)
  91. # (case logit < 0) = k * logit - n * log1p(e^logit)
  92. # (case logit > 0) = k * logit - n * (log(p) - log(1 - p)) + n * log(p)
  93. # = k * logit - n * logit - n * log1p(e^-logit)
  94. # (merge two cases) = k * logit - n * max(logit, 0) - n * log1p(e^-|logit|)
  95. normalize_term = (self.total_count * _clamp_by_zero(self.logits)
  96. + self.total_count * torch.log1p(torch.exp(-torch.abs(self.logits)))
  97. - log_factorial_n)
  98. return value * self.logits - log_factorial_k - log_factorial_nmk - normalize_term
  99. def entropy(self):
  100. total_count = int(self.total_count.max())
  101. if not self.total_count.min() == total_count:
  102. raise NotImplementedError("Inhomogeneous total count not supported by `entropy`.")
  103. log_prob = self.log_prob(self.enumerate_support(False))
  104. return -(torch.exp(log_prob) * log_prob).sum(0)
  105. def enumerate_support(self, expand=True):
  106. total_count = int(self.total_count.max())
  107. if not self.total_count.min() == total_count:
  108. raise NotImplementedError("Inhomogeneous total count not supported by `enumerate_support`.")
  109. values = torch.arange(1 + total_count, dtype=self._param.dtype, device=self._param.device)
  110. values = values.view((-1,) + (1,) * len(self._batch_shape))
  111. if expand:
  112. values = values.expand((-1,) + self._batch_shape)
  113. return values