test_moments_consistency_expanding.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import numpy as np
  2. import pytest
  3. from pandas import Series
  4. import pandas._testing as tm
  5. def no_nans(x):
  6. return x.notna().all().all()
  7. def all_na(x):
  8. return x.isnull().all().all()
  9. @pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum, np.sum])
  10. def test_expanding_apply_consistency_sum_nans(request, all_data, min_periods, f):
  11. if f is np.sum:
  12. if not no_nans(all_data) and not (
  13. all_na(all_data) and not all_data.empty and min_periods > 0
  14. ):
  15. request.node.add_marker(
  16. pytest.mark.xfail(reason="np.sum has different behavior with NaNs")
  17. )
  18. expanding_f_result = all_data.expanding(min_periods=min_periods).sum()
  19. expanding_apply_f_result = all_data.expanding(min_periods=min_periods).apply(
  20. func=f, raw=True
  21. )
  22. tm.assert_equal(expanding_f_result, expanding_apply_f_result)
  23. @pytest.mark.parametrize("ddof", [0, 1])
  24. def test_moments_consistency_var(all_data, min_periods, ddof):
  25. var_x = all_data.expanding(min_periods=min_periods).var(ddof=ddof)
  26. assert not (var_x < 0).any().any()
  27. if ddof == 0:
  28. # check that biased var(x) == mean(x^2) - mean(x)^2
  29. mean_x2 = (all_data * all_data).expanding(min_periods=min_periods).mean()
  30. mean_x = all_data.expanding(min_periods=min_periods).mean()
  31. tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x))
  32. @pytest.mark.parametrize("ddof", [0, 1])
  33. def test_moments_consistency_var_constant(consistent_data, min_periods, ddof):
  34. count_x = consistent_data.expanding(min_periods=min_periods).count()
  35. var_x = consistent_data.expanding(min_periods=min_periods).var(ddof=ddof)
  36. # check that variance of constant series is identically 0
  37. assert not (var_x > 0).any().any()
  38. expected = consistent_data * np.nan
  39. expected[count_x >= max(min_periods, 1)] = 0.0
  40. if ddof == 1:
  41. expected[count_x < 2] = np.nan
  42. tm.assert_equal(var_x, expected)
  43. @pytest.mark.parametrize("ddof", [0, 1])
  44. def test_expanding_consistency_var_std_cov(all_data, min_periods, ddof):
  45. var_x = all_data.expanding(min_periods=min_periods).var(ddof=ddof)
  46. assert not (var_x < 0).any().any()
  47. std_x = all_data.expanding(min_periods=min_periods).std(ddof=ddof)
  48. assert not (std_x < 0).any().any()
  49. # check that var(x) == std(x)^2
  50. tm.assert_equal(var_x, std_x * std_x)
  51. cov_x_x = all_data.expanding(min_periods=min_periods).cov(all_data, ddof=ddof)
  52. assert not (cov_x_x < 0).any().any()
  53. # check that var(x) == cov(x, x)
  54. tm.assert_equal(var_x, cov_x_x)
  55. @pytest.mark.parametrize("ddof", [0, 1])
  56. def test_expanding_consistency_series_cov_corr(series_data, min_periods, ddof):
  57. var_x_plus_y = (
  58. (series_data + series_data).expanding(min_periods=min_periods).var(ddof=ddof)
  59. )
  60. var_x = series_data.expanding(min_periods=min_periods).var(ddof=ddof)
  61. var_y = series_data.expanding(min_periods=min_periods).var(ddof=ddof)
  62. cov_x_y = series_data.expanding(min_periods=min_periods).cov(series_data, ddof=ddof)
  63. # check that cov(x, y) == (var(x+y) - var(x) -
  64. # var(y)) / 2
  65. tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))
  66. # check that corr(x, y) == cov(x, y) / (std(x) *
  67. # std(y))
  68. corr_x_y = series_data.expanding(min_periods=min_periods).corr(series_data)
  69. std_x = series_data.expanding(min_periods=min_periods).std(ddof=ddof)
  70. std_y = series_data.expanding(min_periods=min_periods).std(ddof=ddof)
  71. tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))
  72. if ddof == 0:
  73. # check that biased cov(x, y) == mean(x*y) -
  74. # mean(x)*mean(y)
  75. mean_x = series_data.expanding(min_periods=min_periods).mean()
  76. mean_y = series_data.expanding(min_periods=min_periods).mean()
  77. mean_x_times_y = (
  78. (series_data * series_data).expanding(min_periods=min_periods).mean()
  79. )
  80. tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))
  81. def test_expanding_consistency_mean(all_data, min_periods):
  82. result = all_data.expanding(min_periods=min_periods).mean()
  83. expected = (
  84. all_data.expanding(min_periods=min_periods).sum()
  85. / all_data.expanding(min_periods=min_periods).count()
  86. )
  87. tm.assert_equal(result, expected.astype("float64"))
  88. def test_expanding_consistency_constant(consistent_data, min_periods):
  89. count_x = consistent_data.expanding().count()
  90. mean_x = consistent_data.expanding(min_periods=min_periods).mean()
  91. # check that correlation of a series with itself is either 1 or NaN
  92. corr_x_x = consistent_data.expanding(min_periods=min_periods).corr(consistent_data)
  93. exp = (
  94. consistent_data.max()
  95. if isinstance(consistent_data, Series)
  96. else consistent_data.max().max()
  97. )
  98. # check mean of constant series
  99. expected = consistent_data * np.nan
  100. expected[count_x >= max(min_periods, 1)] = exp
  101. tm.assert_equal(mean_x, expected)
  102. # check correlation of constant series with itself is NaN
  103. expected[:] = np.nan
  104. tm.assert_equal(corr_x_x, expected)
  105. def test_expanding_consistency_var_debiasing_factors(all_data, min_periods):
  106. # check variance debiasing factors
  107. var_unbiased_x = all_data.expanding(min_periods=min_periods).var()
  108. var_biased_x = all_data.expanding(min_periods=min_periods).var(ddof=0)
  109. var_debiasing_factors_x = all_data.expanding().count() / (
  110. all_data.expanding().count() - 1.0
  111. ).replace(0.0, np.nan)
  112. tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x)