123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- import warnings
- import numpy as np
- import pytest
- from pandas import (
- DataFrame,
- Index,
- MultiIndex,
- Series,
- Timestamp,
- concat,
- date_range,
- isna,
- notna,
- )
- import pandas._testing as tm
- from pandas.tseries import offsets
- def f(x):
- # suppress warnings about empty slices, as we are deliberately testing
- # with a 0-length Series
- with warnings.catch_warnings():
- warnings.filterwarnings(
- "ignore",
- message=".*(empty slice|0 for slice).*",
- category=RuntimeWarning,
- )
- return x[np.isfinite(x)].mean()
- @pytest.mark.parametrize("bad_raw", [None, 1, 0])
- def test_rolling_apply_invalid_raw(bad_raw):
- with pytest.raises(ValueError, match="raw parameter must be `True` or `False`"):
- Series(range(3)).rolling(1).apply(len, raw=bad_raw)
- def test_rolling_apply_out_of_bounds(engine_and_raw):
- # gh-1850
- engine, raw = engine_and_raw
- vals = Series([1, 2, 3, 4])
- result = vals.rolling(10).apply(np.sum, engine=engine, raw=raw)
- assert result.isna().all()
- result = vals.rolling(10, min_periods=1).apply(np.sum, engine=engine, raw=raw)
- expected = Series([1, 3, 6, 10], dtype=float)
- tm.assert_almost_equal(result, expected)
- @pytest.mark.parametrize("window", [2, "2s"])
- def test_rolling_apply_with_pandas_objects(window):
- # 5071
- df = DataFrame(
- {"A": np.random.randn(5), "B": np.random.randint(0, 10, size=5)},
- index=date_range("20130101", periods=5, freq="s"),
- )
- # we have an equal spaced timeseries index
- # so simulate removing the first period
- def f(x):
- if x.index[0] == df.index[0]:
- return np.nan
- return x.iloc[-1]
- result = df.rolling(window).apply(f, raw=False)
- expected = df.iloc[2:].reindex_like(df)
- tm.assert_frame_equal(result, expected)
- with tm.external_error_raised(AttributeError):
- df.rolling(window).apply(f, raw=True)
- def test_rolling_apply(engine_and_raw, step):
- engine, raw = engine_and_raw
- expected = Series([], dtype="float64")
- result = expected.rolling(10, step=step).apply(
- lambda x: x.mean(), engine=engine, raw=raw
- )
- tm.assert_series_equal(result, expected)
- # gh-8080
- s = Series([None, None, None])
- result = s.rolling(2, min_periods=0, step=step).apply(
- lambda x: len(x), engine=engine, raw=raw
- )
- expected = Series([1.0, 2.0, 2.0])[::step]
- tm.assert_series_equal(result, expected)
- result = s.rolling(2, min_periods=0, step=step).apply(len, engine=engine, raw=raw)
- tm.assert_series_equal(result, expected)
- def test_all_apply(engine_and_raw):
- engine, raw = engine_and_raw
- df = (
- DataFrame(
- {"A": date_range("20130101", periods=5, freq="s"), "B": range(5)}
- ).set_index("A")
- * 2
- )
- er = df.rolling(window=1)
- r = df.rolling(window="1s")
- result = r.apply(lambda x: 1, engine=engine, raw=raw)
- expected = er.apply(lambda x: 1, engine=engine, raw=raw)
- tm.assert_frame_equal(result, expected)
- def test_ragged_apply(engine_and_raw):
- engine, raw = engine_and_raw
- df = DataFrame({"B": range(5)})
- df.index = [
- Timestamp("20130101 09:00:00"),
- Timestamp("20130101 09:00:02"),
- Timestamp("20130101 09:00:03"),
- Timestamp("20130101 09:00:05"),
- Timestamp("20130101 09:00:06"),
- ]
- f = lambda x: 1
- result = df.rolling(window="1s", min_periods=1).apply(f, engine=engine, raw=raw)
- expected = df.copy()
- expected["B"] = 1.0
- tm.assert_frame_equal(result, expected)
- result = df.rolling(window="2s", min_periods=1).apply(f, engine=engine, raw=raw)
- expected = df.copy()
- expected["B"] = 1.0
- tm.assert_frame_equal(result, expected)
- result = df.rolling(window="5s", min_periods=1).apply(f, engine=engine, raw=raw)
- expected = df.copy()
- expected["B"] = 1.0
- tm.assert_frame_equal(result, expected)
- def test_invalid_engine():
- with pytest.raises(ValueError, match="engine must be either 'numba' or 'cython'"):
- Series(range(1)).rolling(1).apply(lambda x: x, engine="foo")
- def test_invalid_engine_kwargs_cython():
- with pytest.raises(ValueError, match="cython engine does not accept engine_kwargs"):
- Series(range(1)).rolling(1).apply(
- lambda x: x, engine="cython", engine_kwargs={"nopython": False}
- )
- def test_invalid_raw_numba():
- with pytest.raises(
- ValueError, match="raw must be `True` when using the numba engine"
- ):
- Series(range(1)).rolling(1).apply(lambda x: x, raw=False, engine="numba")
- @pytest.mark.parametrize("args_kwargs", [[None, {"par": 10}], [(10,), None]])
- def test_rolling_apply_args_kwargs(args_kwargs):
- # GH 33433
- def numpysum(x, par):
- return np.sum(x + par)
- df = DataFrame({"gr": [1, 1], "a": [1, 2]})
- idx = Index(["gr", "a"])
- expected = DataFrame([[11.0, 11.0], [11.0, 12.0]], columns=idx)
- result = df.rolling(1).apply(numpysum, args=args_kwargs[0], kwargs=args_kwargs[1])
- tm.assert_frame_equal(result, expected)
- midx = MultiIndex.from_tuples([(1, 0), (1, 1)], names=["gr", None])
- expected = Series([11.0, 12.0], index=midx, name="a")
- gb_rolling = df.groupby("gr")["a"].rolling(1)
- result = gb_rolling.apply(numpysum, args=args_kwargs[0], kwargs=args_kwargs[1])
- tm.assert_series_equal(result, expected)
- def test_nans(raw):
- obj = Series(np.random.randn(50))
- obj[:10] = np.NaN
- obj[-10:] = np.NaN
- result = obj.rolling(50, min_periods=30).apply(f, raw=raw)
- tm.assert_almost_equal(result.iloc[-1], np.mean(obj[10:-10]))
- # min_periods is working correctly
- result = obj.rolling(20, min_periods=15).apply(f, raw=raw)
- assert isna(result.iloc[23])
- assert not isna(result.iloc[24])
- assert not isna(result.iloc[-6])
- assert isna(result.iloc[-5])
- obj2 = Series(np.random.randn(20))
- result = obj2.rolling(10, min_periods=5).apply(f, raw=raw)
- assert isna(result.iloc[3])
- assert notna(result.iloc[4])
- result0 = obj.rolling(20, min_periods=0).apply(f, raw=raw)
- result1 = obj.rolling(20, min_periods=1).apply(f, raw=raw)
- tm.assert_almost_equal(result0, result1)
- def test_center(raw):
- obj = Series(np.random.randn(50))
- obj[:10] = np.NaN
- obj[-10:] = np.NaN
- result = obj.rolling(20, min_periods=15, center=True).apply(f, raw=raw)
- expected = (
- concat([obj, Series([np.NaN] * 9)])
- .rolling(20, min_periods=15)
- .apply(f, raw=raw)
- .iloc[9:]
- .reset_index(drop=True)
- )
- tm.assert_series_equal(result, expected)
- def test_series(raw, series):
- result = series.rolling(50).apply(f, raw=raw)
- assert isinstance(result, Series)
- tm.assert_almost_equal(result.iloc[-1], np.mean(series[-50:]))
- def test_frame(raw, frame):
- result = frame.rolling(50).apply(f, raw=raw)
- assert isinstance(result, DataFrame)
- tm.assert_series_equal(
- result.iloc[-1, :],
- frame.iloc[-50:, :].apply(np.mean, axis=0, raw=raw),
- check_names=False,
- )
- def test_time_rule_series(raw, series):
- win = 25
- minp = 10
- ser = series[::2].resample("B").mean()
- series_result = ser.rolling(window=win, min_periods=minp).apply(f, raw=raw)
- last_date = series_result.index[-1]
- prev_date = last_date - 24 * offsets.BDay()
- trunc_series = series[::2].truncate(prev_date, last_date)
- tm.assert_almost_equal(series_result[-1], np.mean(trunc_series))
- def test_time_rule_frame(raw, frame):
- win = 25
- minp = 10
- frm = frame[::2].resample("B").mean()
- frame_result = frm.rolling(window=win, min_periods=minp).apply(f, raw=raw)
- last_date = frame_result.index[-1]
- prev_date = last_date - 24 * offsets.BDay()
- trunc_frame = frame[::2].truncate(prev_date, last_date)
- tm.assert_series_equal(
- frame_result.xs(last_date),
- trunc_frame.apply(np.mean, raw=raw),
- check_names=False,
- )
- @pytest.mark.parametrize("minp", [0, 99, 100])
- def test_min_periods(raw, series, minp, step):
- result = series.rolling(len(series) + 1, min_periods=minp, step=step).apply(
- f, raw=raw
- )
- expected = series.rolling(len(series), min_periods=minp, step=step).apply(
- f, raw=raw
- )
- nan_mask = isna(result)
- tm.assert_series_equal(nan_mask, isna(expected))
- nan_mask = ~nan_mask
- tm.assert_almost_equal(result[nan_mask], expected[nan_mask])
- def test_center_reindex_series(raw, series):
- # shifter index
- s = [f"x{x:d}" for x in range(12)]
- minp = 10
- series_xp = (
- series.reindex(list(series.index) + s)
- .rolling(window=25, min_periods=minp)
- .apply(f, raw=raw)
- .shift(-12)
- .reindex(series.index)
- )
- series_rs = series.rolling(window=25, min_periods=minp, center=True).apply(
- f, raw=raw
- )
- tm.assert_series_equal(series_xp, series_rs)
- def test_center_reindex_frame(raw, frame):
- # shifter index
- s = [f"x{x:d}" for x in range(12)]
- minp = 10
- frame_xp = (
- frame.reindex(list(frame.index) + s)
- .rolling(window=25, min_periods=minp)
- .apply(f, raw=raw)
- .shift(-12)
- .reindex(frame.index)
- )
- frame_rs = frame.rolling(window=25, min_periods=minp, center=True).apply(f, raw=raw)
- tm.assert_frame_equal(frame_xp, frame_rs)
- def test_axis1(raw):
- # GH 45912
- df = DataFrame([1, 2])
- result = df.rolling(window=1, axis=1).apply(np.sum, raw=raw)
- expected = DataFrame([1.0, 2.0])
- tm.assert_frame_equal(result, expected)
|