12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232 |
- import numpy as np
- import pytest
- from pandas import (
- DataFrame,
- Index,
- MultiIndex,
- Series,
- Timestamp,
- date_range,
- to_datetime,
- )
- import pandas._testing as tm
- from pandas.api.indexers import BaseIndexer
- from pandas.core.groupby.groupby import get_groupby
- @pytest.fixture
- def times_frame():
- """Frame for testing times argument in EWM groupby."""
- return DataFrame(
- {
- "A": ["a", "b", "c", "a", "b", "c", "a", "b", "c", "a"],
- "B": [0, 0, 0, 1, 1, 1, 2, 2, 2, 3],
- "C": to_datetime(
- [
- "2020-01-01",
- "2020-01-01",
- "2020-01-01",
- "2020-01-02",
- "2020-01-10",
- "2020-01-22",
- "2020-01-03",
- "2020-01-23",
- "2020-01-23",
- "2020-01-04",
- ]
- ),
- }
- )
- @pytest.fixture
- def roll_frame():
- return DataFrame({"A": [1] * 20 + [2] * 12 + [3] * 8, "B": np.arange(40)})
- class TestRolling:
- def test_groupby_unsupported_argument(self, roll_frame):
- msg = r"groupby\(\) got an unexpected keyword argument 'foo'"
- with pytest.raises(TypeError, match=msg):
- roll_frame.groupby("A", foo=1)
- def test_getitem(self, roll_frame):
- g = roll_frame.groupby("A")
- g_mutated = get_groupby(roll_frame, by="A")
- expected = g_mutated.B.apply(lambda x: x.rolling(2).mean())
- result = g.rolling(2).mean().B
- tm.assert_series_equal(result, expected)
- result = g.rolling(2).B.mean()
- tm.assert_series_equal(result, expected)
- result = g.B.rolling(2).mean()
- tm.assert_series_equal(result, expected)
- result = roll_frame.B.groupby(roll_frame.A).rolling(2).mean()
- tm.assert_series_equal(result, expected)
- def test_getitem_multiple(self, roll_frame):
- # GH 13174
- g = roll_frame.groupby("A")
- r = g.rolling(2, min_periods=0)
- g_mutated = get_groupby(roll_frame, by="A")
- expected = g_mutated.B.apply(lambda x: x.rolling(2, min_periods=0).count())
- result = r.B.count()
- tm.assert_series_equal(result, expected)
- result = r.B.count()
- tm.assert_series_equal(result, expected)
- @pytest.mark.parametrize(
- "f",
- [
- "sum",
- "mean",
- "min",
- "max",
- "count",
- "kurt",
- "skew",
- ],
- )
- def test_rolling(self, f, roll_frame):
- g = roll_frame.groupby("A", group_keys=False)
- r = g.rolling(window=4)
- result = getattr(r, f)()
- expected = g.apply(lambda x: getattr(x.rolling(4), f)())
- # groupby.apply doesn't drop the grouped-by column
- expected = expected.drop("A", axis=1)
- # GH 39732
- expected_index = MultiIndex.from_arrays([roll_frame["A"], range(40)])
- expected.index = expected_index
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize("f", ["std", "var"])
- def test_rolling_ddof(self, f, roll_frame):
- g = roll_frame.groupby("A", group_keys=False)
- r = g.rolling(window=4)
- result = getattr(r, f)(ddof=1)
- expected = g.apply(lambda x: getattr(x.rolling(4), f)(ddof=1))
- # groupby.apply doesn't drop the grouped-by column
- expected = expected.drop("A", axis=1)
- # GH 39732
- expected_index = MultiIndex.from_arrays([roll_frame["A"], range(40)])
- expected.index = expected_index
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize(
- "interpolation", ["linear", "lower", "higher", "midpoint", "nearest"]
- )
- def test_rolling_quantile(self, interpolation, roll_frame):
- g = roll_frame.groupby("A", group_keys=False)
- r = g.rolling(window=4)
- result = r.quantile(0.4, interpolation=interpolation)
- expected = g.apply(
- lambda x: x.rolling(4).quantile(0.4, interpolation=interpolation)
- )
- # groupby.apply doesn't drop the grouped-by column
- expected = expected.drop("A", axis=1)
- # GH 39732
- expected_index = MultiIndex.from_arrays([roll_frame["A"], range(40)])
- expected.index = expected_index
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize("f, expected_val", [["corr", 1], ["cov", 0.5]])
- def test_rolling_corr_cov_other_same_size_as_groups(self, f, expected_val):
- # GH 42915
- df = DataFrame(
- {"value": range(10), "idx1": [1] * 5 + [2] * 5, "idx2": [1, 2, 3, 4, 5] * 2}
- ).set_index(["idx1", "idx2"])
- other = DataFrame({"value": range(5), "idx2": [1, 2, 3, 4, 5]}).set_index(
- "idx2"
- )
- result = getattr(df.groupby(level=0).rolling(2), f)(other)
- expected_data = ([np.nan] + [expected_val] * 4) * 2
- expected = DataFrame(
- expected_data,
- columns=["value"],
- index=MultiIndex.from_arrays(
- [
- [1] * 5 + [2] * 5,
- [1] * 5 + [2] * 5,
- list(range(1, 6)) * 2,
- ],
- names=["idx1", "idx1", "idx2"],
- ),
- )
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize("f", ["corr", "cov"])
- def test_rolling_corr_cov_other_diff_size_as_groups(self, f, roll_frame):
- g = roll_frame.groupby("A")
- r = g.rolling(window=4)
- result = getattr(r, f)(roll_frame)
- def func(x):
- return getattr(x.rolling(4), f)(roll_frame)
- expected = g.apply(func)
- # GH 39591: The grouped column should be all np.nan
- # (groupby.apply inserts 0s for cov)
- expected["A"] = np.nan
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize("f", ["corr", "cov"])
- def test_rolling_corr_cov_pairwise(self, f, roll_frame):
- g = roll_frame.groupby("A")
- r = g.rolling(window=4)
- result = getattr(r.B, f)(pairwise=True)
- def func(x):
- return getattr(x.B.rolling(4), f)(pairwise=True)
- expected = g.apply(func)
- tm.assert_series_equal(result, expected)
- @pytest.mark.parametrize(
- "func, expected_values",
- [("cov", [[1.0, 1.0], [1.0, 4.0]]), ("corr", [[1.0, 0.5], [0.5, 1.0]])],
- )
- def test_rolling_corr_cov_unordered(self, func, expected_values):
- # GH 43386
- df = DataFrame(
- {
- "a": ["g1", "g2", "g1", "g1"],
- "b": [0, 0, 1, 2],
- "c": [2, 0, 6, 4],
- }
- )
- rol = df.groupby("a").rolling(3)
- result = getattr(rol, func)()
- expected = DataFrame(
- {
- "b": 4 * [np.nan] + expected_values[0] + 2 * [np.nan],
- "c": 4 * [np.nan] + expected_values[1] + 2 * [np.nan],
- },
- index=MultiIndex.from_tuples(
- [
- ("g1", 0, "b"),
- ("g1", 0, "c"),
- ("g1", 2, "b"),
- ("g1", 2, "c"),
- ("g1", 3, "b"),
- ("g1", 3, "c"),
- ("g2", 1, "b"),
- ("g2", 1, "c"),
- ],
- names=["a", None, None],
- ),
- )
- tm.assert_frame_equal(result, expected)
- def test_rolling_apply(self, raw, roll_frame):
- g = roll_frame.groupby("A", group_keys=False)
- r = g.rolling(window=4)
- # reduction
- result = r.apply(lambda x: x.sum(), raw=raw)
- expected = g.apply(lambda x: x.rolling(4).apply(lambda y: y.sum(), raw=raw))
- # groupby.apply doesn't drop the grouped-by column
- expected = expected.drop("A", axis=1)
- # GH 39732
- expected_index = MultiIndex.from_arrays([roll_frame["A"], range(40)])
- expected.index = expected_index
- tm.assert_frame_equal(result, expected)
- def test_rolling_apply_mutability(self):
- # GH 14013
- df = DataFrame({"A": ["foo"] * 3 + ["bar"] * 3, "B": [1] * 6})
- g = df.groupby("A")
- mi = MultiIndex.from_tuples(
- [("bar", 3), ("bar", 4), ("bar", 5), ("foo", 0), ("foo", 1), ("foo", 2)]
- )
- mi.names = ["A", None]
- # Grouped column should not be a part of the output
- expected = DataFrame([np.nan, 2.0, 2.0] * 2, columns=["B"], index=mi)
- result = g.rolling(window=2).sum()
- tm.assert_frame_equal(result, expected)
- # Call an arbitrary function on the groupby
- g.sum()
- # Make sure nothing has been mutated
- result = g.rolling(window=2).sum()
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize("expected_value,raw_value", [[1.0, True], [0.0, False]])
- def test_groupby_rolling(self, expected_value, raw_value):
- # GH 31754
- def isnumpyarray(x):
- return int(isinstance(x, np.ndarray))
- df = DataFrame({"id": [1, 1, 1], "value": [1, 2, 3]})
- result = df.groupby("id").value.rolling(1).apply(isnumpyarray, raw=raw_value)
- expected = Series(
- [expected_value] * 3,
- index=MultiIndex.from_tuples(((1, 0), (1, 1), (1, 2)), names=["id", None]),
- name="value",
- )
- tm.assert_series_equal(result, expected)
- def test_groupby_rolling_center_center(self):
- # GH 35552
- series = Series(range(1, 6))
- result = series.groupby(series).rolling(center=True, window=3).mean()
- expected = Series(
- [np.nan] * 5,
- index=MultiIndex.from_tuples(((1, 0), (2, 1), (3, 2), (4, 3), (5, 4))),
- )
- tm.assert_series_equal(result, expected)
- series = Series(range(1, 5))
- result = series.groupby(series).rolling(center=True, window=3).mean()
- expected = Series(
- [np.nan] * 4,
- index=MultiIndex.from_tuples(((1, 0), (2, 1), (3, 2), (4, 3))),
- )
- tm.assert_series_equal(result, expected)
- df = DataFrame({"a": ["a"] * 5 + ["b"] * 6, "b": range(11)})
- result = df.groupby("a").rolling(center=True, window=3).mean()
- expected = DataFrame(
- [np.nan, 1, 2, 3, np.nan, np.nan, 6, 7, 8, 9, np.nan],
- index=MultiIndex.from_tuples(
- (
- ("a", 0),
- ("a", 1),
- ("a", 2),
- ("a", 3),
- ("a", 4),
- ("b", 5),
- ("b", 6),
- ("b", 7),
- ("b", 8),
- ("b", 9),
- ("b", 10),
- ),
- names=["a", None],
- ),
- columns=["b"],
- )
- tm.assert_frame_equal(result, expected)
- df = DataFrame({"a": ["a"] * 5 + ["b"] * 5, "b": range(10)})
- result = df.groupby("a").rolling(center=True, window=3).mean()
- expected = DataFrame(
- [np.nan, 1, 2, 3, np.nan, np.nan, 6, 7, 8, np.nan],
- index=MultiIndex.from_tuples(
- (
- ("a", 0),
- ("a", 1),
- ("a", 2),
- ("a", 3),
- ("a", 4),
- ("b", 5),
- ("b", 6),
- ("b", 7),
- ("b", 8),
- ("b", 9),
- ),
- names=["a", None],
- ),
- columns=["b"],
- )
- tm.assert_frame_equal(result, expected)
- def test_groupby_rolling_center_on(self):
- # GH 37141
- df = DataFrame(
- data={
- "Date": date_range("2020-01-01", "2020-01-10"),
- "gb": ["group_1"] * 6 + ["group_2"] * 4,
- "value": range(10),
- }
- )
- result = (
- df.groupby("gb")
- .rolling(6, on="Date", center=True, min_periods=1)
- .value.mean()
- )
- expected = Series(
- [1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 7.0, 7.5, 7.5, 7.5],
- name="value",
- index=MultiIndex.from_tuples(
- (
- ("group_1", Timestamp("2020-01-01")),
- ("group_1", Timestamp("2020-01-02")),
- ("group_1", Timestamp("2020-01-03")),
- ("group_1", Timestamp("2020-01-04")),
- ("group_1", Timestamp("2020-01-05")),
- ("group_1", Timestamp("2020-01-06")),
- ("group_2", Timestamp("2020-01-07")),
- ("group_2", Timestamp("2020-01-08")),
- ("group_2", Timestamp("2020-01-09")),
- ("group_2", Timestamp("2020-01-10")),
- ),
- names=["gb", "Date"],
- ),
- )
- tm.assert_series_equal(result, expected)
- @pytest.mark.parametrize("min_periods", [5, 4, 3])
- def test_groupby_rolling_center_min_periods(self, min_periods):
- # GH 36040
- df = DataFrame({"group": ["A"] * 10 + ["B"] * 10, "data": range(20)})
- window_size = 5
- result = (
- df.groupby("group")
- .rolling(window_size, center=True, min_periods=min_periods)
- .mean()
- )
- result = result.reset_index()[["group", "data"]]
- grp_A_mean = [1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 7.5, 8.0]
- grp_B_mean = [x + 10.0 for x in grp_A_mean]
- num_nans = max(0, min_periods - 3) # For window_size of 5
- nans = [np.nan] * num_nans
- grp_A_expected = nans + grp_A_mean[num_nans : 10 - num_nans] + nans
- grp_B_expected = nans + grp_B_mean[num_nans : 10 - num_nans] + nans
- expected = DataFrame(
- {"group": ["A"] * 10 + ["B"] * 10, "data": grp_A_expected + grp_B_expected}
- )
- tm.assert_frame_equal(result, expected)
- def test_groupby_subselect_rolling(self):
- # GH 35486
- df = DataFrame(
- {"a": [1, 2, 3, 2], "b": [4.0, 2.0, 3.0, 1.0], "c": [10, 20, 30, 20]}
- )
- result = df.groupby("a")[["b"]].rolling(2).max()
- expected = DataFrame(
- [np.nan, np.nan, 2.0, np.nan],
- columns=["b"],
- index=MultiIndex.from_tuples(
- ((1, 0), (2, 1), (2, 3), (3, 2)), names=["a", None]
- ),
- )
- tm.assert_frame_equal(result, expected)
- result = df.groupby("a")["b"].rolling(2).max()
- expected = Series(
- [np.nan, np.nan, 2.0, np.nan],
- index=MultiIndex.from_tuples(
- ((1, 0), (2, 1), (2, 3), (3, 2)), names=["a", None]
- ),
- name="b",
- )
- tm.assert_series_equal(result, expected)
- def test_groupby_rolling_custom_indexer(self):
- # GH 35557
- class SimpleIndexer(BaseIndexer):
- def get_window_bounds(
- self,
- num_values=0,
- min_periods=None,
- center=None,
- closed=None,
- step=None,
- ):
- min_periods = self.window_size if min_periods is None else 0
- end = np.arange(num_values, dtype=np.int64) + 1
- start = end.copy() - self.window_size
- start[start < 0] = min_periods
- return start, end
- df = DataFrame(
- {"a": [1.0, 2.0, 3.0, 4.0, 5.0] * 3}, index=[0] * 5 + [1] * 5 + [2] * 5
- )
- result = (
- df.groupby(df.index)
- .rolling(SimpleIndexer(window_size=3), min_periods=1)
- .sum()
- )
- expected = df.groupby(df.index).rolling(window=3, min_periods=1).sum()
- tm.assert_frame_equal(result, expected)
- def test_groupby_rolling_subset_with_closed(self):
- # GH 35549
- df = DataFrame(
- {
- "column1": range(6),
- "column2": range(6),
- "group": 3 * ["A", "B"],
- "date": [Timestamp("2019-01-01")] * 6,
- }
- )
- result = (
- df.groupby("group").rolling("1D", on="date", closed="left")["column1"].sum()
- )
- expected = Series(
- [np.nan, 0.0, 2.0, np.nan, 1.0, 4.0],
- index=MultiIndex.from_tuples(
- [("A", Timestamp("2019-01-01"))] * 3
- + [("B", Timestamp("2019-01-01"))] * 3,
- names=["group", "date"],
- ),
- name="column1",
- )
- tm.assert_series_equal(result, expected)
- def test_groupby_subset_rolling_subset_with_closed(self):
- # GH 35549
- df = DataFrame(
- {
- "column1": range(6),
- "column2": range(6),
- "group": 3 * ["A", "B"],
- "date": [Timestamp("2019-01-01")] * 6,
- }
- )
- result = (
- df.groupby("group")[["column1", "date"]]
- .rolling("1D", on="date", closed="left")["column1"]
- .sum()
- )
- expected = Series(
- [np.nan, 0.0, 2.0, np.nan, 1.0, 4.0],
- index=MultiIndex.from_tuples(
- [("A", Timestamp("2019-01-01"))] * 3
- + [("B", Timestamp("2019-01-01"))] * 3,
- names=["group", "date"],
- ),
- name="column1",
- )
- tm.assert_series_equal(result, expected)
- @pytest.mark.parametrize("func", ["max", "min"])
- def test_groupby_rolling_index_changed(self, func):
- # GH: #36018 nlevels of MultiIndex changed
- ds = Series(
- [1, 2, 2],
- index=MultiIndex.from_tuples(
- [("a", "x"), ("a", "y"), ("c", "z")], names=["1", "2"]
- ),
- name="a",
- )
- result = getattr(ds.groupby(ds).rolling(2), func)()
- expected = Series(
- [np.nan, np.nan, 2.0],
- index=MultiIndex.from_tuples(
- [(1, "a", "x"), (2, "a", "y"), (2, "c", "z")], names=["a", "1", "2"]
- ),
- name="a",
- )
- tm.assert_series_equal(result, expected)
- def test_groupby_rolling_empty_frame(self):
- # GH 36197
- expected = DataFrame({"s1": []})
- result = expected.groupby("s1").rolling(window=1).sum()
- # GH 32262
- expected = expected.drop(columns="s1")
- # GH-38057 from_tuples gives empty object dtype, we now get float/int levels
- # expected.index = MultiIndex.from_tuples([], names=["s1", None])
- expected.index = MultiIndex.from_product(
- [Index([], dtype="float64"), Index([], dtype="int64")], names=["s1", None]
- )
- tm.assert_frame_equal(result, expected)
- expected = DataFrame({"s1": [], "s2": []})
- result = expected.groupby(["s1", "s2"]).rolling(window=1).sum()
- # GH 32262
- expected = expected.drop(columns=["s1", "s2"])
- expected.index = MultiIndex.from_product(
- [
- Index([], dtype="float64"),
- Index([], dtype="float64"),
- Index([], dtype="int64"),
- ],
- names=["s1", "s2", None],
- )
- tm.assert_frame_equal(result, expected)
- def test_groupby_rolling_string_index(self):
- # GH: 36727
- df = DataFrame(
- [
- ["A", "group_1", Timestamp(2019, 1, 1, 9)],
- ["B", "group_1", Timestamp(2019, 1, 2, 9)],
- ["Z", "group_2", Timestamp(2019, 1, 3, 9)],
- ["H", "group_1", Timestamp(2019, 1, 6, 9)],
- ["E", "group_2", Timestamp(2019, 1, 20, 9)],
- ],
- columns=["index", "group", "eventTime"],
- ).set_index("index")
- groups = df.groupby("group")
- df["count_to_date"] = groups.cumcount()
- rolling_groups = groups.rolling("10d", on="eventTime")
- result = rolling_groups.apply(lambda df: df.shape[0])
- expected = DataFrame(
- [
- ["A", "group_1", Timestamp(2019, 1, 1, 9), 1.0],
- ["B", "group_1", Timestamp(2019, 1, 2, 9), 2.0],
- ["H", "group_1", Timestamp(2019, 1, 6, 9), 3.0],
- ["Z", "group_2", Timestamp(2019, 1, 3, 9), 1.0],
- ["E", "group_2", Timestamp(2019, 1, 20, 9), 1.0],
- ],
- columns=["index", "group", "eventTime", "count_to_date"],
- ).set_index(["group", "index"])
- tm.assert_frame_equal(result, expected)
- def test_groupby_rolling_no_sort(self):
- # GH 36889
- result = (
- DataFrame({"foo": [2, 1], "bar": [2, 1]})
- .groupby("foo", sort=False)
- .rolling(1)
- .min()
- )
- expected = DataFrame(
- np.array([[2.0, 2.0], [1.0, 1.0]]),
- columns=["foo", "bar"],
- index=MultiIndex.from_tuples([(2, 0), (1, 1)], names=["foo", None]),
- )
- # GH 32262
- expected = expected.drop(columns="foo")
- tm.assert_frame_equal(result, expected)
- def test_groupby_rolling_count_closed_on(self):
- # GH 35869
- df = DataFrame(
- {
- "column1": range(6),
- "column2": range(6),
- "group": 3 * ["A", "B"],
- "date": date_range(end="20190101", periods=6),
- }
- )
- result = (
- df.groupby("group")
- .rolling("3d", on="date", closed="left")["column1"]
- .count()
- )
- expected = Series(
- [np.nan, 1.0, 1.0, np.nan, 1.0, 1.0],
- name="column1",
- index=MultiIndex.from_tuples(
- [
- ("A", Timestamp("2018-12-27")),
- ("A", Timestamp("2018-12-29")),
- ("A", Timestamp("2018-12-31")),
- ("B", Timestamp("2018-12-28")),
- ("B", Timestamp("2018-12-30")),
- ("B", Timestamp("2019-01-01")),
- ],
- names=["group", "date"],
- ),
- )
- tm.assert_series_equal(result, expected)
- @pytest.mark.parametrize(
- ("func", "kwargs"),
- [("rolling", {"window": 2, "min_periods": 1}), ("expanding", {})],
- )
- def test_groupby_rolling_sem(self, func, kwargs):
- # GH: 26476
- df = DataFrame(
- [["a", 1], ["a", 2], ["b", 1], ["b", 2], ["b", 3]], columns=["a", "b"]
- )
- result = getattr(df.groupby("a"), func)(**kwargs).sem()
- expected = DataFrame(
- {"a": [np.nan] * 5, "b": [np.nan, 0.70711, np.nan, 0.70711, 0.70711]},
- index=MultiIndex.from_tuples(
- [("a", 0), ("a", 1), ("b", 2), ("b", 3), ("b", 4)], names=["a", None]
- ),
- )
- # GH 32262
- expected = expected.drop(columns="a")
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize(
- ("rollings", "key"), [({"on": "a"}, "a"), ({"on": None}, "index")]
- )
- def test_groupby_rolling_nans_in_index(self, rollings, key):
- # GH: 34617
- df = DataFrame(
- {
- "a": to_datetime(["2020-06-01 12:00", "2020-06-01 14:00", np.nan]),
- "b": [1, 2, 3],
- "c": [1, 1, 1],
- }
- )
- if key == "index":
- df = df.set_index("a")
- with pytest.raises(ValueError, match=f"{key} values must not have NaT"):
- df.groupby("c").rolling("60min", **rollings)
- @pytest.mark.parametrize("group_keys", [True, False])
- def test_groupby_rolling_group_keys(self, group_keys):
- # GH 37641
- # GH 38523: GH 37641 actually was not a bug.
- # group_keys only applies to groupby.apply directly
- arrays = [["val1", "val1", "val2"], ["val1", "val1", "val2"]]
- index = MultiIndex.from_arrays(arrays, names=("idx1", "idx2"))
- s = Series([1, 2, 3], index=index)
- result = s.groupby(["idx1", "idx2"], group_keys=group_keys).rolling(1).mean()
- expected = Series(
- [1.0, 2.0, 3.0],
- index=MultiIndex.from_tuples(
- [
- ("val1", "val1", "val1", "val1"),
- ("val1", "val1", "val1", "val1"),
- ("val2", "val2", "val2", "val2"),
- ],
- names=["idx1", "idx2", "idx1", "idx2"],
- ),
- )
- tm.assert_series_equal(result, expected)
- def test_groupby_rolling_index_level_and_column_label(self):
- # The groupby keys should not appear as a resulting column
- arrays = [["val1", "val1", "val2"], ["val1", "val1", "val2"]]
- index = MultiIndex.from_arrays(arrays, names=("idx1", "idx2"))
- df = DataFrame({"A": [1, 1, 2], "B": range(3)}, index=index)
- result = df.groupby(["idx1", "A"]).rolling(1).mean()
- expected = DataFrame(
- {"B": [0.0, 1.0, 2.0]},
- index=MultiIndex.from_tuples(
- [
- ("val1", 1, "val1", "val1"),
- ("val1", 1, "val1", "val1"),
- ("val2", 2, "val2", "val2"),
- ],
- names=["idx1", "A", "idx1", "idx2"],
- ),
- )
- tm.assert_frame_equal(result, expected)
- def test_groupby_rolling_resulting_multiindex(self):
- # a few different cases checking the created MultiIndex of the result
- # https://github.com/pandas-dev/pandas/pull/38057
- # grouping by 1 columns -> 2-level MI as result
- df = DataFrame({"a": np.arange(8.0), "b": [1, 2] * 4})
- result = df.groupby("b").rolling(3).mean()
- expected_index = MultiIndex.from_tuples(
- [(1, 0), (1, 2), (1, 4), (1, 6), (2, 1), (2, 3), (2, 5), (2, 7)],
- names=["b", None],
- )
- tm.assert_index_equal(result.index, expected_index)
- def test_groupby_rolling_resulting_multiindex2(self):
- # grouping by 2 columns -> 3-level MI as result
- df = DataFrame({"a": np.arange(12.0), "b": [1, 2] * 6, "c": [1, 2, 3, 4] * 3})
- result = df.groupby(["b", "c"]).rolling(2).sum()
- expected_index = MultiIndex.from_tuples(
- [
- (1, 1, 0),
- (1, 1, 4),
- (1, 1, 8),
- (1, 3, 2),
- (1, 3, 6),
- (1, 3, 10),
- (2, 2, 1),
- (2, 2, 5),
- (2, 2, 9),
- (2, 4, 3),
- (2, 4, 7),
- (2, 4, 11),
- ],
- names=["b", "c", None],
- )
- tm.assert_index_equal(result.index, expected_index)
- def test_groupby_rolling_resulting_multiindex3(self):
- # grouping with 1 level on dataframe with 2-level MI -> 3-level MI as result
- df = DataFrame({"a": np.arange(8.0), "b": [1, 2] * 4, "c": [1, 2, 3, 4] * 2})
- df = df.set_index("c", append=True)
- result = df.groupby("b").rolling(3).mean()
- expected_index = MultiIndex.from_tuples(
- [
- (1, 0, 1),
- (1, 2, 3),
- (1, 4, 1),
- (1, 6, 3),
- (2, 1, 2),
- (2, 3, 4),
- (2, 5, 2),
- (2, 7, 4),
- ],
- names=["b", None, "c"],
- )
- tm.assert_index_equal(result.index, expected_index, exact="equiv")
- def test_groupby_rolling_object_doesnt_affect_groupby_apply(self, roll_frame):
- # GH 39732
- g = roll_frame.groupby("A", group_keys=False)
- expected = g.apply(lambda x: x.rolling(4).sum()).index
- _ = g.rolling(window=4)
- result = g.apply(lambda x: x.rolling(4).sum()).index
- tm.assert_index_equal(result, expected)
- @pytest.mark.parametrize(
- ("window", "min_periods", "closed", "expected"),
- [
- (2, 0, "left", [None, 0.0, 1.0, 1.0, None, 0.0, 1.0, 1.0]),
- (2, 2, "left", [None, None, 1.0, 1.0, None, None, 1.0, 1.0]),
- (4, 4, "left", [None, None, None, None, None, None, None, None]),
- (4, 4, "right", [None, None, None, 5.0, None, None, None, 5.0]),
- ],
- )
- def test_groupby_rolling_var(self, window, min_periods, closed, expected):
- df = DataFrame([1, 2, 3, 4, 5, 6, 7, 8])
- result = (
- df.groupby([1, 2, 1, 2, 1, 2, 1, 2])
- .rolling(window=window, min_periods=min_periods, closed=closed)
- .var(0)
- )
- expected_result = DataFrame(
- np.array(expected, dtype="float64"),
- index=MultiIndex(
- levels=[np.array([1, 2]), [0, 1, 2, 3, 4, 5, 6, 7]],
- codes=[[0, 0, 0, 0, 1, 1, 1, 1], [0, 2, 4, 6, 1, 3, 5, 7]],
- ),
- )
- tm.assert_frame_equal(result, expected_result)
- @pytest.mark.parametrize(
- "columns", [MultiIndex.from_tuples([("A", ""), ("B", "C")]), ["A", "B"]]
- )
- def test_by_column_not_in_values(self, columns):
- # GH 32262
- df = DataFrame([[1, 0]] * 20 + [[2, 0]] * 12 + [[3, 0]] * 8, columns=columns)
- g = df.groupby("A")
- original_obj = g.obj.copy(deep=True)
- r = g.rolling(4)
- result = r.sum()
- assert "A" not in result.columns
- tm.assert_frame_equal(g.obj, original_obj)
- def test_groupby_level(self):
- # GH 38523, 38787
- arrays = [
- ["Falcon", "Falcon", "Parrot", "Parrot"],
- ["Captive", "Wild", "Captive", "Wild"],
- ]
- index = MultiIndex.from_arrays(arrays, names=("Animal", "Type"))
- df = DataFrame({"Max Speed": [390.0, 350.0, 30.0, 20.0]}, index=index)
- result = df.groupby(level=0)["Max Speed"].rolling(2).sum()
- expected = Series(
- [np.nan, 740.0, np.nan, 50.0],
- index=MultiIndex.from_tuples(
- [
- ("Falcon", "Falcon", "Captive"),
- ("Falcon", "Falcon", "Wild"),
- ("Parrot", "Parrot", "Captive"),
- ("Parrot", "Parrot", "Wild"),
- ],
- names=["Animal", "Animal", "Type"],
- ),
- name="Max Speed",
- )
- tm.assert_series_equal(result, expected)
- @pytest.mark.parametrize(
- "by, expected_data",
- [
- [["id"], {"num": [100.0, 150.0, 150.0, 200.0]}],
- [
- ["id", "index"],
- {
- "date": [
- Timestamp("2018-01-01"),
- Timestamp("2018-01-02"),
- Timestamp("2018-01-01"),
- Timestamp("2018-01-02"),
- ],
- "num": [100.0, 200.0, 150.0, 250.0],
- },
- ],
- ],
- )
- def test_as_index_false(self, by, expected_data):
- # GH 39433
- data = [
- ["A", "2018-01-01", 100.0],
- ["A", "2018-01-02", 200.0],
- ["B", "2018-01-01", 150.0],
- ["B", "2018-01-02", 250.0],
- ]
- df = DataFrame(data, columns=["id", "date", "num"])
- df["date"] = to_datetime(df["date"])
- df = df.set_index(["date"])
- gp_by = [getattr(df, attr) for attr in by]
- result = (
- df.groupby(gp_by, as_index=False).rolling(window=2, min_periods=1).mean()
- )
- expected = {"id": ["A", "A", "B", "B"]}
- expected.update(expected_data)
- expected = DataFrame(
- expected,
- index=df.index,
- )
- tm.assert_frame_equal(result, expected)
- def test_nan_and_zero_endpoints(self, any_int_numpy_dtype):
- # https://github.com/twosigma/pandas/issues/53
- typ = np.dtype(any_int_numpy_dtype).type
- size = 1000
- idx = np.repeat(typ(0), size)
- idx[-1] = 1
- val = 5e25
- arr = np.repeat(val, size)
- arr[0] = np.nan
- arr[-1] = 0
- df = DataFrame(
- {
- "index": idx,
- "adl2": arr,
- }
- ).set_index("index")
- result = df.groupby("index")["adl2"].rolling(window=10, min_periods=1).mean()
- expected = Series(
- arr,
- name="adl2",
- index=MultiIndex.from_arrays(
- [
- Index([0] * 999 + [1], dtype=typ, name="index"),
- Index([0] * 999 + [1], dtype=typ, name="index"),
- ],
- ),
- )
- tm.assert_series_equal(result, expected)
- def test_groupby_rolling_non_monotonic(self):
- # GH 43909
- shuffled = [3, 0, 1, 2]
- sec = 1_000
- df = DataFrame(
- [{"t": Timestamp(2 * x * sec), "x": x + 1, "c": 42} for x in shuffled]
- )
- with pytest.raises(ValueError, match=r".* must be monotonic"):
- df.groupby("c").rolling(on="t", window="3s")
- def test_groupby_monotonic(self):
- # GH 15130
- # we don't need to validate monotonicity when grouping
- # GH 43909 we should raise an error here to match
- # behaviour of non-groupby rolling.
- data = [
- ["David", "1/1/2015", 100],
- ["David", "1/5/2015", 500],
- ["David", "5/30/2015", 50],
- ["David", "7/25/2015", 50],
- ["Ryan", "1/4/2014", 100],
- ["Ryan", "1/19/2015", 500],
- ["Ryan", "3/31/2016", 50],
- ["Joe", "7/1/2015", 100],
- ["Joe", "9/9/2015", 500],
- ["Joe", "10/15/2015", 50],
- ]
- df = DataFrame(data=data, columns=["name", "date", "amount"])
- df["date"] = to_datetime(df["date"])
- df = df.sort_values("date")
- expected = (
- df.set_index("date")
- .groupby("name")
- .apply(lambda x: x.rolling("180D")["amount"].sum())
- )
- result = df.groupby("name").rolling("180D", on="date")["amount"].sum()
- tm.assert_series_equal(result, expected)
- def test_datelike_on_monotonic_within_each_group(self):
- # GH 13966 (similar to #15130, closed by #15175)
- # superseded by 43909
- # GH 46061: OK if the on is monotonic relative to each each group
- dates = date_range(start="2016-01-01 09:30:00", periods=20, freq="s")
- df = DataFrame(
- {
- "A": [1] * 20 + [2] * 12 + [3] * 8,
- "B": np.concatenate((dates, dates)),
- "C": np.arange(40),
- }
- )
- expected = (
- df.set_index("B").groupby("A").apply(lambda x: x.rolling("4s")["C"].mean())
- )
- result = df.groupby("A").rolling("4s", on="B").C.mean()
- tm.assert_series_equal(result, expected)
- def test_datelike_on_not_monotonic_within_each_group(self):
- # GH 46061
- df = DataFrame(
- {
- "A": [1] * 3 + [2] * 3,
- "B": [Timestamp(year, 1, 1) for year in [2020, 2021, 2019]] * 2,
- "C": range(6),
- }
- )
- with pytest.raises(ValueError, match="Each group within B must be monotonic."):
- df.groupby("A").rolling("365D", on="B")
- class TestExpanding:
- @pytest.fixture
- def frame(self):
- return DataFrame({"A": [1] * 20 + [2] * 12 + [3] * 8, "B": np.arange(40)})
- @pytest.mark.parametrize(
- "f", ["sum", "mean", "min", "max", "count", "kurt", "skew"]
- )
- def test_expanding(self, f, frame):
- g = frame.groupby("A", group_keys=False)
- r = g.expanding()
- result = getattr(r, f)()
- expected = g.apply(lambda x: getattr(x.expanding(), f)())
- # groupby.apply doesn't drop the grouped-by column
- expected = expected.drop("A", axis=1)
- # GH 39732
- expected_index = MultiIndex.from_arrays([frame["A"], range(40)])
- expected.index = expected_index
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize("f", ["std", "var"])
- def test_expanding_ddof(self, f, frame):
- g = frame.groupby("A", group_keys=False)
- r = g.expanding()
- result = getattr(r, f)(ddof=0)
- expected = g.apply(lambda x: getattr(x.expanding(), f)(ddof=0))
- # groupby.apply doesn't drop the grouped-by column
- expected = expected.drop("A", axis=1)
- # GH 39732
- expected_index = MultiIndex.from_arrays([frame["A"], range(40)])
- expected.index = expected_index
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize(
- "interpolation", ["linear", "lower", "higher", "midpoint", "nearest"]
- )
- def test_expanding_quantile(self, interpolation, frame):
- g = frame.groupby("A", group_keys=False)
- r = g.expanding()
- result = r.quantile(0.4, interpolation=interpolation)
- expected = g.apply(
- lambda x: x.expanding().quantile(0.4, interpolation=interpolation)
- )
- # groupby.apply doesn't drop the grouped-by column
- expected = expected.drop("A", axis=1)
- # GH 39732
- expected_index = MultiIndex.from_arrays([frame["A"], range(40)])
- expected.index = expected_index
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize("f", ["corr", "cov"])
- def test_expanding_corr_cov(self, f, frame):
- g = frame.groupby("A")
- r = g.expanding()
- result = getattr(r, f)(frame)
- def func_0(x):
- return getattr(x.expanding(), f)(frame)
- expected = g.apply(func_0)
- # GH 39591: groupby.apply returns 1 instead of nan for windows
- # with all nan values
- null_idx = list(range(20, 61)) + list(range(72, 113))
- expected.iloc[null_idx, 1] = np.nan
- # GH 39591: The grouped column should be all np.nan
- # (groupby.apply inserts 0s for cov)
- expected["A"] = np.nan
- tm.assert_frame_equal(result, expected)
- result = getattr(r.B, f)(pairwise=True)
- def func_1(x):
- return getattr(x.B.expanding(), f)(pairwise=True)
- expected = g.apply(func_1)
- tm.assert_series_equal(result, expected)
- def test_expanding_apply(self, raw, frame):
- g = frame.groupby("A", group_keys=False)
- r = g.expanding()
- # reduction
- result = r.apply(lambda x: x.sum(), raw=raw)
- expected = g.apply(lambda x: x.expanding().apply(lambda y: y.sum(), raw=raw))
- # groupby.apply doesn't drop the grouped-by column
- expected = expected.drop("A", axis=1)
- # GH 39732
- expected_index = MultiIndex.from_arrays([frame["A"], range(40)])
- expected.index = expected_index
- tm.assert_frame_equal(result, expected)
- class TestEWM:
- @pytest.mark.parametrize(
- "method, expected_data",
- [
- ["mean", [0.0, 0.6666666666666666, 1.4285714285714286, 2.2666666666666666]],
- ["std", [np.nan, 0.707107, 0.963624, 1.177164]],
- ["var", [np.nan, 0.5, 0.9285714285714286, 1.3857142857142857]],
- ],
- )
- def test_methods(self, method, expected_data):
- # GH 16037
- df = DataFrame({"A": ["a"] * 4, "B": range(4)})
- result = getattr(df.groupby("A").ewm(com=1.0), method)()
- expected = DataFrame(
- {"B": expected_data},
- index=MultiIndex.from_tuples(
- [
- ("a", 0),
- ("a", 1),
- ("a", 2),
- ("a", 3),
- ],
- names=["A", None],
- ),
- )
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize(
- "method, expected_data",
- [["corr", [np.nan, 1.0, 1.0, 1]], ["cov", [np.nan, 0.5, 0.928571, 1.385714]]],
- )
- def test_pairwise_methods(self, method, expected_data):
- # GH 16037
- df = DataFrame({"A": ["a"] * 4, "B": range(4)})
- result = getattr(df.groupby("A").ewm(com=1.0), method)()
- expected = DataFrame(
- {"B": expected_data},
- index=MultiIndex.from_tuples(
- [
- ("a", 0, "B"),
- ("a", 1, "B"),
- ("a", 2, "B"),
- ("a", 3, "B"),
- ],
- names=["A", None, None],
- ),
- )
- tm.assert_frame_equal(result, expected)
- expected = df.groupby("A").apply(lambda x: getattr(x.ewm(com=1.0), method)())
- tm.assert_frame_equal(result, expected)
- def test_times(self, times_frame):
- # GH 40951
- halflife = "23 days"
- # GH#42738
- times = times_frame.pop("C")
- result = times_frame.groupby("A").ewm(halflife=halflife, times=times).mean()
- expected = DataFrame(
- {
- "B": [
- 0.0,
- 0.507534,
- 1.020088,
- 1.537661,
- 0.0,
- 0.567395,
- 1.221209,
- 0.0,
- 0.653141,
- 1.195003,
- ]
- },
- index=MultiIndex.from_tuples(
- [
- ("a", 0),
- ("a", 3),
- ("a", 6),
- ("a", 9),
- ("b", 1),
- ("b", 4),
- ("b", 7),
- ("c", 2),
- ("c", 5),
- ("c", 8),
- ],
- names=["A", None],
- ),
- )
- tm.assert_frame_equal(result, expected)
- def test_times_array(self, times_frame):
- # GH 40951
- halflife = "23 days"
- times = times_frame.pop("C")
- gb = times_frame.groupby("A")
- result = gb.ewm(halflife=halflife, times=times).mean()
- expected = gb.ewm(halflife=halflife, times=times.values).mean()
- tm.assert_frame_equal(result, expected)
- def test_dont_mutate_obj_after_slicing(self):
- # GH 43355
- df = DataFrame(
- {
- "id": ["a", "a", "b", "b", "b"],
- "timestamp": date_range("2021-9-1", periods=5, freq="H"),
- "y": range(5),
- }
- )
- grp = df.groupby("id").rolling("1H", on="timestamp")
- result = grp.count()
- expected_df = DataFrame(
- {
- "timestamp": date_range("2021-9-1", periods=5, freq="H"),
- "y": [1.0] * 5,
- },
- index=MultiIndex.from_arrays(
- [["a", "a", "b", "b", "b"], list(range(5))], names=["id", None]
- ),
- )
- tm.assert_frame_equal(result, expected_df)
- result = grp["y"].count()
- expected_series = Series(
- [1.0] * 5,
- index=MultiIndex.from_arrays(
- [
- ["a", "a", "b", "b", "b"],
- date_range("2021-9-1", periods=5, freq="H"),
- ],
- names=["id", "timestamp"],
- ),
- name="y",
- )
- tm.assert_series_equal(result, expected_series)
- # This is the key test
- result = grp.count()
- tm.assert_frame_equal(result, expected_df)
|