test_moments_consistency_rolling.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. import numpy as np
  2. import pytest
  3. from pandas import Series
  4. import pandas._testing as tm
  5. def no_nans(x):
  6. return x.notna().all().all()
  7. def all_na(x):
  8. return x.isnull().all().all()
  9. @pytest.fixture(params=[(1, 0), (5, 1)])
  10. def rolling_consistency_cases(request):
  11. """window, min_periods"""
  12. return request.param
  13. @pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum, np.sum])
  14. def test_rolling_apply_consistency_sum(
  15. request, all_data, rolling_consistency_cases, center, f
  16. ):
  17. window, min_periods = rolling_consistency_cases
  18. if f is np.sum:
  19. if not no_nans(all_data) and not (
  20. all_na(all_data) and not all_data.empty and min_periods > 0
  21. ):
  22. request.node.add_marker(
  23. pytest.mark.xfail(reason="np.sum has different behavior with NaNs")
  24. )
  25. rolling_f_result = all_data.rolling(
  26. window=window, min_periods=min_periods, center=center
  27. ).sum()
  28. rolling_apply_f_result = all_data.rolling(
  29. window=window, min_periods=min_periods, center=center
  30. ).apply(func=f, raw=True)
  31. tm.assert_equal(rolling_f_result, rolling_apply_f_result)
  32. @pytest.mark.parametrize("ddof", [0, 1])
  33. def test_moments_consistency_var(all_data, rolling_consistency_cases, center, ddof):
  34. window, min_periods = rolling_consistency_cases
  35. var_x = all_data.rolling(window=window, min_periods=min_periods, center=center).var(
  36. ddof=ddof
  37. )
  38. assert not (var_x < 0).any().any()
  39. if ddof == 0:
  40. # check that biased var(x) == mean(x^2) - mean(x)^2
  41. mean_x = all_data.rolling(
  42. window=window, min_periods=min_periods, center=center
  43. ).mean()
  44. mean_x2 = (
  45. (all_data * all_data)
  46. .rolling(window=window, min_periods=min_periods, center=center)
  47. .mean()
  48. )
  49. tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x))
  50. @pytest.mark.parametrize("ddof", [0, 1])
  51. def test_moments_consistency_var_constant(
  52. consistent_data, rolling_consistency_cases, center, ddof
  53. ):
  54. window, min_periods = rolling_consistency_cases
  55. count_x = consistent_data.rolling(
  56. window=window, min_periods=min_periods, center=center
  57. ).count()
  58. var_x = consistent_data.rolling(
  59. window=window, min_periods=min_periods, center=center
  60. ).var(ddof=ddof)
  61. # check that variance of constant series is identically 0
  62. assert not (var_x > 0).any().any()
  63. expected = consistent_data * np.nan
  64. expected[count_x >= max(min_periods, 1)] = 0.0
  65. if ddof == 1:
  66. expected[count_x < 2] = np.nan
  67. tm.assert_equal(var_x, expected)
  68. @pytest.mark.parametrize("ddof", [0, 1])
  69. def test_rolling_consistency_var_std_cov(
  70. all_data, rolling_consistency_cases, center, ddof
  71. ):
  72. window, min_periods = rolling_consistency_cases
  73. var_x = all_data.rolling(window=window, min_periods=min_periods, center=center).var(
  74. ddof=ddof
  75. )
  76. assert not (var_x < 0).any().any()
  77. std_x = all_data.rolling(window=window, min_periods=min_periods, center=center).std(
  78. ddof=ddof
  79. )
  80. assert not (std_x < 0).any().any()
  81. # check that var(x) == std(x)^2
  82. tm.assert_equal(var_x, std_x * std_x)
  83. cov_x_x = all_data.rolling(
  84. window=window, min_periods=min_periods, center=center
  85. ).cov(all_data, ddof=ddof)
  86. assert not (cov_x_x < 0).any().any()
  87. # check that var(x) == cov(x, x)
  88. tm.assert_equal(var_x, cov_x_x)
  89. @pytest.mark.parametrize("ddof", [0, 1])
  90. def test_rolling_consistency_series_cov_corr(
  91. series_data, rolling_consistency_cases, center, ddof
  92. ):
  93. window, min_periods = rolling_consistency_cases
  94. var_x_plus_y = (
  95. (series_data + series_data)
  96. .rolling(window=window, min_periods=min_periods, center=center)
  97. .var(ddof=ddof)
  98. )
  99. var_x = series_data.rolling(
  100. window=window, min_periods=min_periods, center=center
  101. ).var(ddof=ddof)
  102. var_y = series_data.rolling(
  103. window=window, min_periods=min_periods, center=center
  104. ).var(ddof=ddof)
  105. cov_x_y = series_data.rolling(
  106. window=window, min_periods=min_periods, center=center
  107. ).cov(series_data, ddof=ddof)
  108. # check that cov(x, y) == (var(x+y) - var(x) -
  109. # var(y)) / 2
  110. tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))
  111. # check that corr(x, y) == cov(x, y) / (std(x) *
  112. # std(y))
  113. corr_x_y = series_data.rolling(
  114. window=window, min_periods=min_periods, center=center
  115. ).corr(series_data)
  116. std_x = series_data.rolling(
  117. window=window, min_periods=min_periods, center=center
  118. ).std(ddof=ddof)
  119. std_y = series_data.rolling(
  120. window=window, min_periods=min_periods, center=center
  121. ).std(ddof=ddof)
  122. tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))
  123. if ddof == 0:
  124. # check that biased cov(x, y) == mean(x*y) -
  125. # mean(x)*mean(y)
  126. mean_x = series_data.rolling(
  127. window=window, min_periods=min_periods, center=center
  128. ).mean()
  129. mean_y = series_data.rolling(
  130. window=window, min_periods=min_periods, center=center
  131. ).mean()
  132. mean_x_times_y = (
  133. (series_data * series_data)
  134. .rolling(window=window, min_periods=min_periods, center=center)
  135. .mean()
  136. )
  137. tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))
  138. def test_rolling_consistency_mean(all_data, rolling_consistency_cases, center):
  139. window, min_periods = rolling_consistency_cases
  140. result = all_data.rolling(
  141. window=window, min_periods=min_periods, center=center
  142. ).mean()
  143. expected = (
  144. all_data.rolling(window=window, min_periods=min_periods, center=center)
  145. .sum()
  146. .divide(
  147. all_data.rolling(
  148. window=window, min_periods=min_periods, center=center
  149. ).count()
  150. )
  151. )
  152. tm.assert_equal(result, expected.astype("float64"))
  153. def test_rolling_consistency_constant(
  154. consistent_data, rolling_consistency_cases, center
  155. ):
  156. window, min_periods = rolling_consistency_cases
  157. count_x = consistent_data.rolling(
  158. window=window, min_periods=min_periods, center=center
  159. ).count()
  160. mean_x = consistent_data.rolling(
  161. window=window, min_periods=min_periods, center=center
  162. ).mean()
  163. # check that correlation of a series with itself is either 1 or NaN
  164. corr_x_x = consistent_data.rolling(
  165. window=window, min_periods=min_periods, center=center
  166. ).corr(consistent_data)
  167. exp = (
  168. consistent_data.max()
  169. if isinstance(consistent_data, Series)
  170. else consistent_data.max().max()
  171. )
  172. # check mean of constant series
  173. expected = consistent_data * np.nan
  174. expected[count_x >= max(min_periods, 1)] = exp
  175. tm.assert_equal(mean_x, expected)
  176. # check correlation of constant series with itself is NaN
  177. expected[:] = np.nan
  178. tm.assert_equal(corr_x_x, expected)
  179. def test_rolling_consistency_var_debiasing_factors(
  180. all_data, rolling_consistency_cases, center
  181. ):
  182. window, min_periods = rolling_consistency_cases
  183. # check variance debiasing factors
  184. var_unbiased_x = all_data.rolling(
  185. window=window, min_periods=min_periods, center=center
  186. ).var()
  187. var_biased_x = all_data.rolling(
  188. window=window, min_periods=min_periods, center=center
  189. ).var(ddof=0)
  190. var_debiasing_factors_x = (
  191. all_data.rolling(window=window, min_periods=min_periods, center=center)
  192. .count()
  193. .divide(
  194. (
  195. all_data.rolling(
  196. window=window, min_periods=min_periods, center=center
  197. ).count()
  198. - 1.0
  199. ).replace(0.0, np.nan)
  200. )
  201. )
  202. tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x)