test_moments_consistency_ewm.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import numpy as np
  2. import pytest
  3. from pandas import (
  4. DataFrame,
  5. Series,
  6. concat,
  7. )
  8. import pandas._testing as tm
  9. def create_mock_weights(obj, com, adjust, ignore_na):
  10. if isinstance(obj, DataFrame):
  11. if not len(obj.columns):
  12. return DataFrame(index=obj.index, columns=obj.columns)
  13. w = concat(
  14. [
  15. create_mock_series_weights(
  16. obj.iloc[:, i], com=com, adjust=adjust, ignore_na=ignore_na
  17. )
  18. for i in range(len(obj.columns))
  19. ],
  20. axis=1,
  21. )
  22. w.index = obj.index
  23. w.columns = obj.columns
  24. return w
  25. else:
  26. return create_mock_series_weights(obj, com, adjust, ignore_na)
  27. def create_mock_series_weights(s, com, adjust, ignore_na):
  28. w = Series(np.nan, index=s.index, name=s.name)
  29. alpha = 1.0 / (1.0 + com)
  30. if adjust:
  31. count = 0
  32. for i in range(len(s)):
  33. if s.iat[i] == s.iat[i]:
  34. w.iat[i] = pow(1.0 / (1.0 - alpha), count)
  35. count += 1
  36. elif not ignore_na:
  37. count += 1
  38. else:
  39. sum_wts = 0.0
  40. prev_i = -1
  41. count = 0
  42. for i in range(len(s)):
  43. if s.iat[i] == s.iat[i]:
  44. if prev_i == -1:
  45. w.iat[i] = 1.0
  46. else:
  47. w.iat[i] = alpha * sum_wts / pow(1.0 - alpha, count - prev_i)
  48. sum_wts += w.iat[i]
  49. prev_i = count
  50. count += 1
  51. elif not ignore_na:
  52. count += 1
  53. return w
  54. def test_ewm_consistency_mean(all_data, adjust, ignore_na, min_periods):
  55. com = 3.0
  56. result = all_data.ewm(
  57. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  58. ).mean()
  59. weights = create_mock_weights(all_data, com=com, adjust=adjust, ignore_na=ignore_na)
  60. expected = (
  61. all_data.multiply(weights)
  62. .cumsum()
  63. .divide(weights.cumsum())
  64. .fillna(method="ffill")
  65. )
  66. expected[
  67. all_data.expanding().count() < (max(min_periods, 1) if min_periods else 1)
  68. ] = np.nan
  69. tm.assert_equal(result, expected.astype("float64"))
  70. def test_ewm_consistency_consistent(consistent_data, adjust, ignore_na, min_periods):
  71. com = 3.0
  72. count_x = consistent_data.expanding().count()
  73. mean_x = consistent_data.ewm(
  74. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  75. ).mean()
  76. # check that correlation of a series with itself is either 1 or NaN
  77. corr_x_x = consistent_data.ewm(
  78. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  79. ).corr(consistent_data)
  80. exp = (
  81. consistent_data.max()
  82. if isinstance(consistent_data, Series)
  83. else consistent_data.max().max()
  84. )
  85. # check mean of constant series
  86. expected = consistent_data * np.nan
  87. expected[count_x >= max(min_periods, 1)] = exp
  88. tm.assert_equal(mean_x, expected)
  89. # check correlation of constant series with itself is NaN
  90. expected[:] = np.nan
  91. tm.assert_equal(corr_x_x, expected)
  92. def test_ewm_consistency_var_debiasing_factors(
  93. all_data, adjust, ignore_na, min_periods
  94. ):
  95. com = 3.0
  96. # check variance debiasing factors
  97. var_unbiased_x = all_data.ewm(
  98. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  99. ).var(bias=False)
  100. var_biased_x = all_data.ewm(
  101. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  102. ).var(bias=True)
  103. weights = create_mock_weights(all_data, com=com, adjust=adjust, ignore_na=ignore_na)
  104. cum_sum = weights.cumsum().fillna(method="ffill")
  105. cum_sum_sq = (weights * weights).cumsum().fillna(method="ffill")
  106. numerator = cum_sum * cum_sum
  107. denominator = numerator - cum_sum_sq
  108. denominator[denominator <= 0.0] = np.nan
  109. var_debiasing_factors_x = numerator / denominator
  110. tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x)
  111. @pytest.mark.parametrize("bias", [True, False])
  112. def test_moments_consistency_var(all_data, adjust, ignore_na, min_periods, bias):
  113. com = 3.0
  114. mean_x = all_data.ewm(
  115. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  116. ).mean()
  117. var_x = all_data.ewm(
  118. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  119. ).var(bias=bias)
  120. assert not (var_x < 0).any().any()
  121. if bias:
  122. # check that biased var(x) == mean(x^2) - mean(x)^2
  123. mean_x2 = (
  124. (all_data * all_data)
  125. .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
  126. .mean()
  127. )
  128. tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x))
  129. @pytest.mark.parametrize("bias", [True, False])
  130. def test_moments_consistency_var_constant(
  131. consistent_data, adjust, ignore_na, min_periods, bias
  132. ):
  133. com = 3.0
  134. count_x = consistent_data.expanding(min_periods=min_periods).count()
  135. var_x = consistent_data.ewm(
  136. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  137. ).var(bias=bias)
  138. # check that variance of constant series is identically 0
  139. assert not (var_x > 0).any().any()
  140. expected = consistent_data * np.nan
  141. expected[count_x >= max(min_periods, 1)] = 0.0
  142. if not bias:
  143. expected[count_x < 2] = np.nan
  144. tm.assert_equal(var_x, expected)
  145. @pytest.mark.parametrize("bias", [True, False])
  146. def test_ewm_consistency_std(all_data, adjust, ignore_na, min_periods, bias):
  147. com = 3.0
  148. var_x = all_data.ewm(
  149. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  150. ).var(bias=bias)
  151. assert not (var_x < 0).any().any()
  152. std_x = all_data.ewm(
  153. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  154. ).std(bias=bias)
  155. assert not (std_x < 0).any().any()
  156. # check that var(x) == std(x)^2
  157. tm.assert_equal(var_x, std_x * std_x)
  158. cov_x_x = all_data.ewm(
  159. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  160. ).cov(all_data, bias=bias)
  161. assert not (cov_x_x < 0).any().any()
  162. # check that var(x) == cov(x, x)
  163. tm.assert_equal(var_x, cov_x_x)
  164. @pytest.mark.parametrize("bias", [True, False])
  165. def test_ewm_consistency_series_cov_corr(
  166. series_data, adjust, ignore_na, min_periods, bias
  167. ):
  168. com = 3.0
  169. var_x_plus_y = (
  170. (series_data + series_data)
  171. .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
  172. .var(bias=bias)
  173. )
  174. var_x = series_data.ewm(
  175. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  176. ).var(bias=bias)
  177. var_y = series_data.ewm(
  178. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  179. ).var(bias=bias)
  180. cov_x_y = series_data.ewm(
  181. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  182. ).cov(series_data, bias=bias)
  183. # check that cov(x, y) == (var(x+y) - var(x) -
  184. # var(y)) / 2
  185. tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))
  186. # check that corr(x, y) == cov(x, y) / (std(x) *
  187. # std(y))
  188. corr_x_y = series_data.ewm(
  189. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  190. ).corr(series_data)
  191. std_x = series_data.ewm(
  192. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  193. ).std(bias=bias)
  194. std_y = series_data.ewm(
  195. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  196. ).std(bias=bias)
  197. tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))
  198. if bias:
  199. # check that biased cov(x, y) == mean(x*y) -
  200. # mean(x)*mean(y)
  201. mean_x = series_data.ewm(
  202. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  203. ).mean()
  204. mean_y = series_data.ewm(
  205. com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
  206. ).mean()
  207. mean_x_times_y = (
  208. (series_data * series_data)
  209. .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
  210. .mean()
  211. )
  212. tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))