continuous_bernoulli.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. from numbers import Number
  2. import math
  3. import torch
  4. from torch.distributions import constraints
  5. from torch.distributions.exp_family import ExponentialFamily
  6. from torch.distributions.utils import broadcast_all, probs_to_logits, logits_to_probs, lazy_property, clamp_probs
  7. from torch.nn.functional import binary_cross_entropy_with_logits
  8. __all__ = ['ContinuousBernoulli']
  9. class ContinuousBernoulli(ExponentialFamily):
  10. r"""
  11. Creates a continuous Bernoulli distribution parameterized by :attr:`probs`
  12. or :attr:`logits` (but not both).
  13. The distribution is supported in [0, 1] and parameterized by 'probs' (in
  14. (0,1)) or 'logits' (real-valued). Note that, unlike the Bernoulli, 'probs'
  15. does not correspond to a probability and 'logits' does not correspond to
  16. log-odds, but the same names are used due to the similarity with the
  17. Bernoulli. See [1] for more details.
  18. Example::
  19. >>> # xdoctest: +IGNORE_WANT("non-deterinistic")
  20. >>> m = ContinuousBernoulli(torch.tensor([0.3]))
  21. >>> m.sample()
  22. tensor([ 0.2538])
  23. Args:
  24. probs (Number, Tensor): (0,1) valued parameters
  25. logits (Number, Tensor): real valued parameters whose sigmoid matches 'probs'
  26. [1] The continuous Bernoulli: fixing a pervasive error in variational
  27. autoencoders, Loaiza-Ganem G and Cunningham JP, NeurIPS 2019.
  28. https://arxiv.org/abs/1907.06845
  29. """
  30. arg_constraints = {'probs': constraints.unit_interval,
  31. 'logits': constraints.real}
  32. support = constraints.unit_interval
  33. _mean_carrier_measure = 0
  34. has_rsample = True
  35. def __init__(self, probs=None, logits=None, lims=(0.499, 0.501), validate_args=None):
  36. if (probs is None) == (logits is None):
  37. raise ValueError("Either `probs` or `logits` must be specified, but not both.")
  38. if probs is not None:
  39. is_scalar = isinstance(probs, Number)
  40. self.probs, = broadcast_all(probs)
  41. # validate 'probs' here if necessary as it is later clamped for numerical stability
  42. # close to 0 and 1, later on; otherwise the clamped 'probs' would always pass
  43. if validate_args is not None:
  44. if not self.arg_constraints['probs'].check(getattr(self, 'probs')).all():
  45. raise ValueError("The parameter {} has invalid values".format('probs'))
  46. self.probs = clamp_probs(self.probs)
  47. else:
  48. is_scalar = isinstance(logits, Number)
  49. self.logits, = broadcast_all(logits)
  50. self._param = self.probs if probs is not None else self.logits
  51. if is_scalar:
  52. batch_shape = torch.Size()
  53. else:
  54. batch_shape = self._param.size()
  55. self._lims = lims
  56. super().__init__(batch_shape, validate_args=validate_args)
  57. def expand(self, batch_shape, _instance=None):
  58. new = self._get_checked_instance(ContinuousBernoulli, _instance)
  59. new._lims = self._lims
  60. batch_shape = torch.Size(batch_shape)
  61. if 'probs' in self.__dict__:
  62. new.probs = self.probs.expand(batch_shape)
  63. new._param = new.probs
  64. if 'logits' in self.__dict__:
  65. new.logits = self.logits.expand(batch_shape)
  66. new._param = new.logits
  67. super(ContinuousBernoulli, new).__init__(batch_shape, validate_args=False)
  68. new._validate_args = self._validate_args
  69. return new
  70. def _new(self, *args, **kwargs):
  71. return self._param.new(*args, **kwargs)
  72. def _outside_unstable_region(self):
  73. return torch.max(torch.le(self.probs, self._lims[0]),
  74. torch.gt(self.probs, self._lims[1]))
  75. def _cut_probs(self):
  76. return torch.where(self._outside_unstable_region(),
  77. self.probs,
  78. self._lims[0] * torch.ones_like(self.probs))
  79. def _cont_bern_log_norm(self):
  80. '''computes the log normalizing constant as a function of the 'probs' parameter'''
  81. cut_probs = self._cut_probs()
  82. cut_probs_below_half = torch.where(torch.le(cut_probs, 0.5),
  83. cut_probs,
  84. torch.zeros_like(cut_probs))
  85. cut_probs_above_half = torch.where(torch.ge(cut_probs, 0.5),
  86. cut_probs,
  87. torch.ones_like(cut_probs))
  88. log_norm = torch.log(torch.abs(torch.log1p(-cut_probs) - torch.log(cut_probs))) - torch.where(
  89. torch.le(cut_probs, 0.5),
  90. torch.log1p(-2.0 * cut_probs_below_half),
  91. torch.log(2.0 * cut_probs_above_half - 1.0))
  92. x = torch.pow(self.probs - 0.5, 2)
  93. taylor = math.log(2.0) + (4.0 / 3.0 + 104.0 / 45.0 * x) * x
  94. return torch.where(self._outside_unstable_region(), log_norm, taylor)
  95. @property
  96. def mean(self):
  97. cut_probs = self._cut_probs()
  98. mus = cut_probs / (2.0 * cut_probs - 1.0) + 1.0 / (torch.log1p(-cut_probs) - torch.log(cut_probs))
  99. x = self.probs - 0.5
  100. taylor = 0.5 + (1.0 / 3.0 + 16.0 / 45.0 * torch.pow(x, 2)) * x
  101. return torch.where(self._outside_unstable_region(), mus, taylor)
  102. @property
  103. def stddev(self):
  104. return torch.sqrt(self.variance)
  105. @property
  106. def variance(self):
  107. cut_probs = self._cut_probs()
  108. vars = cut_probs * (cut_probs - 1.0) / torch.pow(1.0 - 2.0 * cut_probs, 2) + 1.0 / torch.pow(
  109. torch.log1p(-cut_probs) - torch.log(cut_probs), 2)
  110. x = torch.pow(self.probs - 0.5, 2)
  111. taylor = 1.0 / 12.0 - (1.0 / 15.0 - 128. / 945.0 * x) * x
  112. return torch.where(self._outside_unstable_region(), vars, taylor)
  113. @lazy_property
  114. def logits(self):
  115. return probs_to_logits(self.probs, is_binary=True)
  116. @lazy_property
  117. def probs(self):
  118. return clamp_probs(logits_to_probs(self.logits, is_binary=True))
  119. @property
  120. def param_shape(self):
  121. return self._param.size()
  122. def sample(self, sample_shape=torch.Size()):
  123. shape = self._extended_shape(sample_shape)
  124. u = torch.rand(shape, dtype=self.probs.dtype, device=self.probs.device)
  125. with torch.no_grad():
  126. return self.icdf(u)
  127. def rsample(self, sample_shape=torch.Size()):
  128. shape = self._extended_shape(sample_shape)
  129. u = torch.rand(shape, dtype=self.probs.dtype, device=self.probs.device)
  130. return self.icdf(u)
  131. def log_prob(self, value):
  132. if self._validate_args:
  133. self._validate_sample(value)
  134. logits, value = broadcast_all(self.logits, value)
  135. return -binary_cross_entropy_with_logits(logits, value, reduction='none') + self._cont_bern_log_norm()
  136. def cdf(self, value):
  137. if self._validate_args:
  138. self._validate_sample(value)
  139. cut_probs = self._cut_probs()
  140. cdfs = (torch.pow(cut_probs, value) * torch.pow(1.0 - cut_probs, 1.0 - value)
  141. + cut_probs - 1.0) / (2.0 * cut_probs - 1.0)
  142. unbounded_cdfs = torch.where(self._outside_unstable_region(), cdfs, value)
  143. return torch.where(
  144. torch.le(value, 0.0),
  145. torch.zeros_like(value),
  146. torch.where(torch.ge(value, 1.0), torch.ones_like(value), unbounded_cdfs))
  147. def icdf(self, value):
  148. cut_probs = self._cut_probs()
  149. return torch.where(
  150. self._outside_unstable_region(),
  151. (torch.log1p(-cut_probs + value * (2.0 * cut_probs - 1.0))
  152. - torch.log1p(-cut_probs)) / (torch.log(cut_probs) - torch.log1p(-cut_probs)),
  153. value)
  154. def entropy(self):
  155. log_probs0 = torch.log1p(-self.probs)
  156. log_probs1 = torch.log(self.probs)
  157. return self.mean * (log_probs0 - log_probs1) - self._cont_bern_log_norm() - log_probs0
  158. @property
  159. def _natural_params(self):
  160. return (self.logits, )
  161. def _log_normalizer(self, x):
  162. """computes the log normalizing constant as a function of the natural parameter"""
  163. out_unst_reg = torch.max(torch.le(x, self._lims[0] - 0.5),
  164. torch.gt(x, self._lims[1] - 0.5))
  165. cut_nat_params = torch.where(out_unst_reg,
  166. x,
  167. (self._lims[0] - 0.5) * torch.ones_like(x))
  168. log_norm = torch.log(torch.abs(torch.exp(cut_nat_params) - 1.0)) - torch.log(torch.abs(cut_nat_params))
  169. taylor = 0.5 * x + torch.pow(x, 2) / 24.0 - torch.pow(x, 4) / 2880.0
  170. return torch.where(out_unst_reg, log_norm, taylor)