123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- from functools import partial
- import numpy as np
- import pytest
- from pandas import (
- DataFrame,
- Series,
- concat,
- isna,
- notna,
- )
- import pandas._testing as tm
- from pandas.tseries import offsets
- def scoreatpercentile(a, per):
- values = np.sort(a, axis=0)
- idx = int(per / 1.0 * (values.shape[0] - 1))
- if idx == values.shape[0] - 1:
- retval = values[-1]
- else:
- qlow = idx / (values.shape[0] - 1)
- qhig = (idx + 1) / (values.shape[0] - 1)
- vlow = values[idx]
- vhig = values[idx + 1]
- retval = vlow + (vhig - vlow) * (per - qlow) / (qhig - qlow)
- return retval
- @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
- def test_series(series, q, step):
- compare_func = partial(scoreatpercentile, per=q)
- result = series.rolling(50, step=step).quantile(q)
- assert isinstance(result, Series)
- end = range(0, len(series), step or 1)[-1] + 1
- tm.assert_almost_equal(result.iloc[-1], compare_func(series[end - 50 : end]))
- @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
- def test_frame(raw, frame, q, step):
- compare_func = partial(scoreatpercentile, per=q)
- result = frame.rolling(50, step=step).quantile(q)
- assert isinstance(result, DataFrame)
- end = range(0, len(frame), step or 1)[-1] + 1
- tm.assert_series_equal(
- result.iloc[-1, :],
- frame.iloc[end - 50 : end, :].apply(compare_func, axis=0, raw=raw),
- check_names=False,
- )
- @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
- def test_time_rule_series(series, q):
- compare_func = partial(scoreatpercentile, per=q)
- win = 25
- ser = series[::2].resample("B").mean()
- series_result = ser.rolling(window=win, min_periods=10).quantile(q)
- last_date = series_result.index[-1]
- prev_date = last_date - 24 * offsets.BDay()
- trunc_series = series[::2].truncate(prev_date, last_date)
- tm.assert_almost_equal(series_result[-1], compare_func(trunc_series))
- @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
- def test_time_rule_frame(raw, frame, q):
- compare_func = partial(scoreatpercentile, per=q)
- win = 25
- frm = frame[::2].resample("B").mean()
- frame_result = frm.rolling(window=win, min_periods=10).quantile(q)
- last_date = frame_result.index[-1]
- prev_date = last_date - 24 * offsets.BDay()
- trunc_frame = frame[::2].truncate(prev_date, last_date)
- tm.assert_series_equal(
- frame_result.xs(last_date),
- trunc_frame.apply(compare_func, raw=raw),
- check_names=False,
- )
- @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
- def test_nans(q):
- compare_func = partial(scoreatpercentile, per=q)
- obj = Series(np.random.randn(50))
- obj[:10] = np.NaN
- obj[-10:] = np.NaN
- result = obj.rolling(50, min_periods=30).quantile(q)
- tm.assert_almost_equal(result.iloc[-1], compare_func(obj[10:-10]))
- # min_periods is working correctly
- result = obj.rolling(20, min_periods=15).quantile(q)
- assert isna(result.iloc[23])
- assert not isna(result.iloc[24])
- assert not isna(result.iloc[-6])
- assert isna(result.iloc[-5])
- obj2 = Series(np.random.randn(20))
- result = obj2.rolling(10, min_periods=5).quantile(q)
- assert isna(result.iloc[3])
- assert notna(result.iloc[4])
- result0 = obj.rolling(20, min_periods=0).quantile(q)
- result1 = obj.rolling(20, min_periods=1).quantile(q)
- tm.assert_almost_equal(result0, result1)
- @pytest.mark.parametrize("minp", [0, 99, 100])
- @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
- def test_min_periods(series, minp, q, step):
- result = series.rolling(len(series) + 1, min_periods=minp, step=step).quantile(q)
- expected = series.rolling(len(series), min_periods=minp, step=step).quantile(q)
- nan_mask = isna(result)
- tm.assert_series_equal(nan_mask, isna(expected))
- nan_mask = ~nan_mask
- tm.assert_almost_equal(result[nan_mask], expected[nan_mask])
- @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
- def test_center(q):
- obj = Series(np.random.randn(50))
- obj[:10] = np.NaN
- obj[-10:] = np.NaN
- result = obj.rolling(20, center=True).quantile(q)
- expected = (
- concat([obj, Series([np.NaN] * 9)])
- .rolling(20)
- .quantile(q)
- .iloc[9:]
- .reset_index(drop=True)
- )
- tm.assert_series_equal(result, expected)
- @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
- def test_center_reindex_series(series, q):
- # shifter index
- s = [f"x{x:d}" for x in range(12)]
- series_xp = (
- series.reindex(list(series.index) + s)
- .rolling(window=25)
- .quantile(q)
- .shift(-12)
- .reindex(series.index)
- )
- series_rs = series.rolling(window=25, center=True).quantile(q)
- tm.assert_series_equal(series_xp, series_rs)
- @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
- def test_center_reindex_frame(frame, q):
- # shifter index
- s = [f"x{x:d}" for x in range(12)]
- frame_xp = (
- frame.reindex(list(frame.index) + s)
- .rolling(window=25)
- .quantile(q)
- .shift(-12)
- .reindex(frame.index)
- )
- frame_rs = frame.rolling(window=25, center=True).quantile(q)
- tm.assert_frame_equal(frame_xp, frame_rs)
|