test_rolling_quantile.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. from functools import partial
  2. import numpy as np
  3. import pytest
  4. from pandas import (
  5. DataFrame,
  6. Series,
  7. concat,
  8. isna,
  9. notna,
  10. )
  11. import pandas._testing as tm
  12. from pandas.tseries import offsets
  13. def scoreatpercentile(a, per):
  14. values = np.sort(a, axis=0)
  15. idx = int(per / 1.0 * (values.shape[0] - 1))
  16. if idx == values.shape[0] - 1:
  17. retval = values[-1]
  18. else:
  19. qlow = idx / (values.shape[0] - 1)
  20. qhig = (idx + 1) / (values.shape[0] - 1)
  21. vlow = values[idx]
  22. vhig = values[idx + 1]
  23. retval = vlow + (vhig - vlow) * (per - qlow) / (qhig - qlow)
  24. return retval
  25. @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
  26. def test_series(series, q, step):
  27. compare_func = partial(scoreatpercentile, per=q)
  28. result = series.rolling(50, step=step).quantile(q)
  29. assert isinstance(result, Series)
  30. end = range(0, len(series), step or 1)[-1] + 1
  31. tm.assert_almost_equal(result.iloc[-1], compare_func(series[end - 50 : end]))
  32. @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
  33. def test_frame(raw, frame, q, step):
  34. compare_func = partial(scoreatpercentile, per=q)
  35. result = frame.rolling(50, step=step).quantile(q)
  36. assert isinstance(result, DataFrame)
  37. end = range(0, len(frame), step or 1)[-1] + 1
  38. tm.assert_series_equal(
  39. result.iloc[-1, :],
  40. frame.iloc[end - 50 : end, :].apply(compare_func, axis=0, raw=raw),
  41. check_names=False,
  42. )
  43. @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
  44. def test_time_rule_series(series, q):
  45. compare_func = partial(scoreatpercentile, per=q)
  46. win = 25
  47. ser = series[::2].resample("B").mean()
  48. series_result = ser.rolling(window=win, min_periods=10).quantile(q)
  49. last_date = series_result.index[-1]
  50. prev_date = last_date - 24 * offsets.BDay()
  51. trunc_series = series[::2].truncate(prev_date, last_date)
  52. tm.assert_almost_equal(series_result[-1], compare_func(trunc_series))
  53. @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
  54. def test_time_rule_frame(raw, frame, q):
  55. compare_func = partial(scoreatpercentile, per=q)
  56. win = 25
  57. frm = frame[::2].resample("B").mean()
  58. frame_result = frm.rolling(window=win, min_periods=10).quantile(q)
  59. last_date = frame_result.index[-1]
  60. prev_date = last_date - 24 * offsets.BDay()
  61. trunc_frame = frame[::2].truncate(prev_date, last_date)
  62. tm.assert_series_equal(
  63. frame_result.xs(last_date),
  64. trunc_frame.apply(compare_func, raw=raw),
  65. check_names=False,
  66. )
  67. @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
  68. def test_nans(q):
  69. compare_func = partial(scoreatpercentile, per=q)
  70. obj = Series(np.random.randn(50))
  71. obj[:10] = np.NaN
  72. obj[-10:] = np.NaN
  73. result = obj.rolling(50, min_periods=30).quantile(q)
  74. tm.assert_almost_equal(result.iloc[-1], compare_func(obj[10:-10]))
  75. # min_periods is working correctly
  76. result = obj.rolling(20, min_periods=15).quantile(q)
  77. assert isna(result.iloc[23])
  78. assert not isna(result.iloc[24])
  79. assert not isna(result.iloc[-6])
  80. assert isna(result.iloc[-5])
  81. obj2 = Series(np.random.randn(20))
  82. result = obj2.rolling(10, min_periods=5).quantile(q)
  83. assert isna(result.iloc[3])
  84. assert notna(result.iloc[4])
  85. result0 = obj.rolling(20, min_periods=0).quantile(q)
  86. result1 = obj.rolling(20, min_periods=1).quantile(q)
  87. tm.assert_almost_equal(result0, result1)
  88. @pytest.mark.parametrize("minp", [0, 99, 100])
  89. @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
  90. def test_min_periods(series, minp, q, step):
  91. result = series.rolling(len(series) + 1, min_periods=minp, step=step).quantile(q)
  92. expected = series.rolling(len(series), min_periods=minp, step=step).quantile(q)
  93. nan_mask = isna(result)
  94. tm.assert_series_equal(nan_mask, isna(expected))
  95. nan_mask = ~nan_mask
  96. tm.assert_almost_equal(result[nan_mask], expected[nan_mask])
  97. @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
  98. def test_center(q):
  99. obj = Series(np.random.randn(50))
  100. obj[:10] = np.NaN
  101. obj[-10:] = np.NaN
  102. result = obj.rolling(20, center=True).quantile(q)
  103. expected = (
  104. concat([obj, Series([np.NaN] * 9)])
  105. .rolling(20)
  106. .quantile(q)
  107. .iloc[9:]
  108. .reset_index(drop=True)
  109. )
  110. tm.assert_series_equal(result, expected)
  111. @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
  112. def test_center_reindex_series(series, q):
  113. # shifter index
  114. s = [f"x{x:d}" for x in range(12)]
  115. series_xp = (
  116. series.reindex(list(series.index) + s)
  117. .rolling(window=25)
  118. .quantile(q)
  119. .shift(-12)
  120. .reindex(series.index)
  121. )
  122. series_rs = series.rolling(window=25, center=True).quantile(q)
  123. tm.assert_series_equal(series_xp, series_rs)
  124. @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
  125. def test_center_reindex_frame(frame, q):
  126. # shifter index
  127. s = [f"x{x:d}" for x in range(12)]
  128. frame_xp = (
  129. frame.reindex(list(frame.index) + s)
  130. .rolling(window=25)
  131. .quantile(q)
  132. .shift(-12)
  133. .reindex(frame.index)
  134. )
  135. frame_rs = frame.rolling(window=25, center=True).quantile(q)
  136. tm.assert_frame_equal(frame_xp, frame_rs)