12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744 |
- """
- Provide a generic structure to support window functions,
- similar to how we have a Groupby object.
- """
- from __future__ import annotations
- import copy
- from datetime import timedelta
- from functools import partial
- import inspect
- from textwrap import dedent
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Hashable,
- Iterator,
- Sized,
- cast,
- )
- import numpy as np
- from pandas._libs.tslibs import (
- BaseOffset,
- to_offset,
- )
- import pandas._libs.window.aggregations as window_aggregations
- from pandas._typing import (
- ArrayLike,
- Axis,
- NDFrameT,
- QuantileInterpolation,
- WindowingRankType,
- )
- from pandas.compat._optional import import_optional_dependency
- from pandas.errors import DataError
- from pandas.util._decorators import doc
- from pandas.core.dtypes.common import (
- ensure_float64,
- is_bool,
- is_integer,
- is_list_like,
- is_numeric_dtype,
- is_scalar,
- needs_i8_conversion,
- )
- from pandas.core.dtypes.generic import (
- ABCDataFrame,
- ABCSeries,
- )
- from pandas.core.dtypes.missing import notna
- from pandas.core._numba import executor
- from pandas.core.algorithms import factorize
- from pandas.core.apply import ResamplerWindowApply
- from pandas.core.arrays import ExtensionArray
- from pandas.core.base import SelectionMixin
- import pandas.core.common as com
- from pandas.core.indexers.objects import (
- BaseIndexer,
- FixedWindowIndexer,
- GroupbyIndexer,
- VariableWindowIndexer,
- )
- from pandas.core.indexes.api import (
- DatetimeIndex,
- Index,
- MultiIndex,
- PeriodIndex,
- TimedeltaIndex,
- )
- from pandas.core.reshape.concat import concat
- from pandas.core.util.numba_ import (
- get_jit_arguments,
- maybe_use_numba,
- )
- from pandas.core.window.common import (
- flex_binary_moment,
- zsqrt,
- )
- from pandas.core.window.doc import (
- _shared_docs,
- create_section_header,
- kwargs_numeric_only,
- kwargs_scipy,
- numba_notes,
- template_header,
- template_returns,
- template_see_also,
- window_agg_numba_parameters,
- window_apply_parameters,
- )
- from pandas.core.window.numba_ import (
- generate_manual_numpy_nan_agg_with_axis,
- generate_numba_apply_func,
- generate_numba_table_func,
- )
- if TYPE_CHECKING:
- from pandas import (
- DataFrame,
- Series,
- )
- from pandas.core.generic import NDFrame
- from pandas.core.groupby.ops import BaseGrouper
- class BaseWindow(SelectionMixin):
- """Provides utilities for performing windowing operations."""
- _attributes: list[str] = []
- exclusions: frozenset[Hashable] = frozenset()
- _on: Index
- def __init__(
- self,
- obj: NDFrame,
- window=None,
- min_periods: int | None = None,
- center: bool | None = False,
- win_type: str | None = None,
- axis: Axis = 0,
- on: str | Index | None = None,
- closed: str | None = None,
- step: int | None = None,
- method: str = "single",
- *,
- selection=None,
- ) -> None:
- self.obj = obj
- self.on = on
- self.closed = closed
- self.step = step
- self.window = window
- self.min_periods = min_periods
- self.center = center
- self.win_type = win_type
- self.axis = obj._get_axis_number(axis) if axis is not None else None
- self.method = method
- self._win_freq_i8: int | None = None
- if self.on is None:
- if self.axis == 0:
- self._on = self.obj.index
- else:
- # i.e. self.axis == 1
- self._on = self.obj.columns
- elif isinstance(self.on, Index):
- self._on = self.on
- elif isinstance(self.obj, ABCDataFrame) and self.on in self.obj.columns:
- self._on = Index(self.obj[self.on])
- else:
- raise ValueError(
- f"invalid on specified as {self.on}, "
- "must be a column (of DataFrame), an Index or None"
- )
- self._selection = selection
- self._validate()
- def _validate(self) -> None:
- if self.center is not None and not is_bool(self.center):
- raise ValueError("center must be a boolean")
- if self.min_periods is not None:
- if not is_integer(self.min_periods):
- raise ValueError("min_periods must be an integer")
- if self.min_periods < 0:
- raise ValueError("min_periods must be >= 0")
- if is_integer(self.window) and self.min_periods > self.window:
- raise ValueError(
- f"min_periods {self.min_periods} must be <= window {self.window}"
- )
- if self.closed is not None and self.closed not in [
- "right",
- "both",
- "left",
- "neither",
- ]:
- raise ValueError("closed must be 'right', 'left', 'both' or 'neither'")
- if not isinstance(self.obj, (ABCSeries, ABCDataFrame)):
- raise TypeError(f"invalid type: {type(self)}")
- if isinstance(self.window, BaseIndexer):
- # Validate that the passed BaseIndexer subclass has
- # a get_window_bounds with the correct signature.
- get_window_bounds_signature = inspect.signature(
- self.window.get_window_bounds
- ).parameters.keys()
- expected_signature = inspect.signature(
- BaseIndexer().get_window_bounds
- ).parameters.keys()
- if get_window_bounds_signature != expected_signature:
- raise ValueError(
- f"{type(self.window).__name__} does not implement "
- f"the correct signature for get_window_bounds"
- )
- if self.method not in ["table", "single"]:
- raise ValueError("method must be 'table' or 'single")
- if self.step is not None:
- if not is_integer(self.step):
- raise ValueError("step must be an integer")
- if self.step < 0:
- raise ValueError("step must be >= 0")
- def _check_window_bounds(
- self, start: np.ndarray, end: np.ndarray, num_vals: int
- ) -> None:
- if len(start) != len(end):
- raise ValueError(
- f"start ({len(start)}) and end ({len(end)}) bounds must be the "
- f"same length"
- )
- if len(start) != (num_vals + (self.step or 1) - 1) // (self.step or 1):
- raise ValueError(
- f"start and end bounds ({len(start)}) must be the same length "
- f"as the object ({num_vals}) divided by the step ({self.step}) "
- f"if given and rounded up"
- )
- def _slice_axis_for_step(self, index: Index, result: Sized | None = None) -> Index:
- """
- Slices the index for a given result and the preset step.
- """
- return (
- index
- if result is None or len(result) == len(index)
- else index[:: self.step]
- )
- def _validate_numeric_only(self, name: str, numeric_only: bool) -> None:
- """
- Validate numeric_only argument, raising if invalid for the input.
- Parameters
- ----------
- name : str
- Name of the operator (kernel).
- numeric_only : bool
- Value passed by user.
- """
- if (
- self._selected_obj.ndim == 1
- and numeric_only
- and not is_numeric_dtype(self._selected_obj.dtype)
- ):
- raise NotImplementedError(
- f"{type(self).__name__}.{name} does not implement numeric_only"
- )
- def _make_numeric_only(self, obj: NDFrameT) -> NDFrameT:
- """Subset DataFrame to numeric columns.
- Parameters
- ----------
- obj : DataFrame
- Returns
- -------
- obj subset to numeric-only columns.
- """
- result = obj.select_dtypes(include=["number"], exclude=["timedelta"])
- return result
- def _create_data(self, obj: NDFrameT, numeric_only: bool = False) -> NDFrameT:
- """
- Split data into blocks & return conformed data.
- """
- # filter out the on from the object
- if self.on is not None and not isinstance(self.on, Index) and obj.ndim == 2:
- obj = obj.reindex(columns=obj.columns.difference([self.on]), copy=False)
- if obj.ndim > 1 and (numeric_only or self.axis == 1):
- # GH: 20649 in case of mixed dtype and axis=1 we have to convert everything
- # to float to calculate the complete row at once. We exclude all non-numeric
- # dtypes.
- obj = self._make_numeric_only(obj)
- if self.axis == 1:
- obj = obj.astype("float64", copy=False)
- obj._mgr = obj._mgr.consolidate()
- return obj
- def _gotitem(self, key, ndim, subset=None):
- """
- Sub-classes to define. Return a sliced object.
- Parameters
- ----------
- key : str / list of selections
- ndim : {1, 2}
- requested ndim of result
- subset : object, default None
- subset to act on
- """
- # create a new object to prevent aliasing
- if subset is None:
- subset = self.obj
- # we need to make a shallow copy of ourselves
- # with the same groupby
- kwargs = {attr: getattr(self, attr) for attr in self._attributes}
- selection = None
- if subset.ndim == 2 and (
- (is_scalar(key) and key in subset) or is_list_like(key)
- ):
- selection = key
- elif subset.ndim == 1 and is_scalar(key) and key == subset.name:
- selection = key
- new_win = type(self)(subset, selection=selection, **kwargs)
- return new_win
- def __getattr__(self, attr: str):
- if attr in self._internal_names_set:
- return object.__getattribute__(self, attr)
- if attr in self.obj:
- return self[attr]
- raise AttributeError(
- f"'{type(self).__name__}' object has no attribute '{attr}'"
- )
- def _dir_additions(self):
- return self.obj._dir_additions()
- def __repr__(self) -> str:
- """
- Provide a nice str repr of our rolling object.
- """
- attrs_list = (
- f"{attr_name}={getattr(self, attr_name)}"
- for attr_name in self._attributes
- if getattr(self, attr_name, None) is not None and attr_name[0] != "_"
- )
- attrs = ",".join(attrs_list)
- return f"{type(self).__name__} [{attrs}]"
- def __iter__(self) -> Iterator:
- obj = self._selected_obj.set_axis(self._on)
- obj = self._create_data(obj)
- indexer = self._get_window_indexer()
- start, end = indexer.get_window_bounds(
- num_values=len(obj),
- min_periods=self.min_periods,
- center=self.center,
- closed=self.closed,
- step=self.step,
- )
- self._check_window_bounds(start, end, len(obj))
- for s, e in zip(start, end):
- result = obj.iloc[slice(s, e)]
- yield result
- def _prep_values(self, values: ArrayLike) -> np.ndarray:
- """Convert input to numpy arrays for Cython routines"""
- if needs_i8_conversion(values.dtype):
- raise NotImplementedError(
- f"ops for {type(self).__name__} for this "
- f"dtype {values.dtype} are not implemented"
- )
- # GH #12373 : rolling functions error on float32 data
- # make sure the data is coerced to float64
- try:
- if isinstance(values, ExtensionArray):
- values = values.to_numpy(np.float64, na_value=np.nan)
- else:
- values = ensure_float64(values)
- except (ValueError, TypeError) as err:
- raise TypeError(f"cannot handle this type -> {values.dtype}") from err
- # Convert inf to nan for C funcs
- inf = np.isinf(values)
- if inf.any():
- values = np.where(inf, np.nan, values)
- return values
- def _insert_on_column(self, result: DataFrame, obj: DataFrame) -> None:
- # if we have an 'on' column we want to put it back into
- # the results in the same location
- from pandas import Series
- if self.on is not None and not self._on.equals(obj.index):
- name = self._on.name
- extra_col = Series(self._on, index=self.obj.index, name=name, copy=False)
- if name in result.columns:
- # TODO: sure we want to overwrite results?
- result[name] = extra_col
- elif name in result.index.names:
- pass
- elif name in self._selected_obj.columns:
- # insert in the same location as we had in _selected_obj
- old_cols = self._selected_obj.columns
- new_cols = result.columns
- old_loc = old_cols.get_loc(name)
- overlap = new_cols.intersection(old_cols[:old_loc])
- new_loc = len(overlap)
- result.insert(new_loc, name, extra_col)
- else:
- # insert at the end
- result[name] = extra_col
- @property
- def _index_array(self):
- # TODO: why do we get here with e.g. MultiIndex?
- if needs_i8_conversion(self._on.dtype):
- idx = cast("PeriodIndex | DatetimeIndex | TimedeltaIndex", self._on)
- return idx.asi8
- return None
- def _resolve_output(self, out: DataFrame, obj: DataFrame) -> DataFrame:
- """Validate and finalize result."""
- if out.shape[1] == 0 and obj.shape[1] > 0:
- raise DataError("No numeric types to aggregate")
- if out.shape[1] == 0:
- return obj.astype("float64")
- self._insert_on_column(out, obj)
- return out
- def _get_window_indexer(self) -> BaseIndexer:
- """
- Return an indexer class that will compute the window start and end bounds
- """
- if isinstance(self.window, BaseIndexer):
- return self.window
- if self._win_freq_i8 is not None:
- return VariableWindowIndexer(
- index_array=self._index_array,
- window_size=self._win_freq_i8,
- center=self.center,
- )
- return FixedWindowIndexer(window_size=self.window)
- def _apply_series(
- self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None
- ) -> Series:
- """
- Series version of _apply_blockwise
- """
- obj = self._create_data(self._selected_obj)
- if name == "count":
- # GH 12541: Special case for count where we support date-like types
- obj = notna(obj).astype(int)
- try:
- values = self._prep_values(obj._values)
- except (TypeError, NotImplementedError) as err:
- raise DataError("No numeric types to aggregate") from err
- result = homogeneous_func(values)
- index = self._slice_axis_for_step(obj.index, result)
- return obj._constructor(result, index=index, name=obj.name)
- def _apply_blockwise(
- self,
- homogeneous_func: Callable[..., ArrayLike],
- name: str,
- numeric_only: bool = False,
- ) -> DataFrame | Series:
- """
- Apply the given function to the DataFrame broken down into homogeneous
- sub-frames.
- """
- self._validate_numeric_only(name, numeric_only)
- if self._selected_obj.ndim == 1:
- return self._apply_series(homogeneous_func, name)
- obj = self._create_data(self._selected_obj, numeric_only)
- if name == "count":
- # GH 12541: Special case for count where we support date-like types
- obj = notna(obj).astype(int)
- obj._mgr = obj._mgr.consolidate()
- if self.axis == 1:
- obj = obj.T
- taker = []
- res_values = []
- for i, arr in enumerate(obj._iter_column_arrays()):
- # GH#42736 operate column-wise instead of block-wise
- # As of 2.0, hfunc will raise for nuisance columns
- try:
- arr = self._prep_values(arr)
- except (TypeError, NotImplementedError) as err:
- raise DataError(
- f"Cannot aggregate non-numeric type: {arr.dtype}"
- ) from err
- res = homogeneous_func(arr)
- res_values.append(res)
- taker.append(i)
- index = self._slice_axis_for_step(
- obj.index, res_values[0] if len(res_values) > 0 else None
- )
- df = type(obj)._from_arrays(
- res_values,
- index=index,
- columns=obj.columns.take(taker),
- verify_integrity=False,
- )
- if self.axis == 1:
- df = df.T
- return self._resolve_output(df, obj)
- def _apply_tablewise(
- self,
- homogeneous_func: Callable[..., ArrayLike],
- name: str | None = None,
- numeric_only: bool = False,
- ) -> DataFrame | Series:
- """
- Apply the given function to the DataFrame across the entire object
- """
- if self._selected_obj.ndim == 1:
- raise ValueError("method='table' not applicable for Series objects.")
- obj = self._create_data(self._selected_obj, numeric_only)
- values = self._prep_values(obj.to_numpy())
- values = values.T if self.axis == 1 else values
- result = homogeneous_func(values)
- result = result.T if self.axis == 1 else result
- index = self._slice_axis_for_step(obj.index, result)
- columns = (
- obj.columns
- if result.shape[1] == len(obj.columns)
- else obj.columns[:: self.step]
- )
- out = obj._constructor(result, index=index, columns=columns)
- return self._resolve_output(out, obj)
- def _apply_pairwise(
- self,
- target: DataFrame | Series,
- other: DataFrame | Series | None,
- pairwise: bool | None,
- func: Callable[[DataFrame | Series, DataFrame | Series], DataFrame | Series],
- numeric_only: bool,
- ) -> DataFrame | Series:
- """
- Apply the given pairwise function given 2 pandas objects (DataFrame/Series)
- """
- target = self._create_data(target, numeric_only)
- if other is None:
- other = target
- # only default unset
- pairwise = True if pairwise is None else pairwise
- elif not isinstance(other, (ABCDataFrame, ABCSeries)):
- raise ValueError("other must be a DataFrame or Series")
- elif other.ndim == 2 and numeric_only:
- other = self._make_numeric_only(other)
- return flex_binary_moment(target, other, func, pairwise=bool(pairwise))
- def _apply(
- self,
- func: Callable[..., Any],
- name: str,
- numeric_only: bool = False,
- numba_args: tuple[Any, ...] = (),
- **kwargs,
- ):
- """
- Rolling statistical measure using supplied function.
- Designed to be used with passed-in Cython array-based functions.
- Parameters
- ----------
- func : callable function to apply
- name : str,
- numba_args : tuple
- args to be passed when func is a numba func
- **kwargs
- additional arguments for rolling function and window function
- Returns
- -------
- y : type of input
- """
- window_indexer = self._get_window_indexer()
- min_periods = (
- self.min_periods
- if self.min_periods is not None
- else window_indexer.window_size
- )
- def homogeneous_func(values: np.ndarray):
- # calculation function
- if values.size == 0:
- return values.copy()
- def calc(x):
- start, end = window_indexer.get_window_bounds(
- num_values=len(x),
- min_periods=min_periods,
- center=self.center,
- closed=self.closed,
- step=self.step,
- )
- self._check_window_bounds(start, end, len(x))
- return func(x, start, end, min_periods, *numba_args)
- with np.errstate(all="ignore"):
- result = calc(values)
- return result
- if self.method == "single":
- return self._apply_blockwise(homogeneous_func, name, numeric_only)
- else:
- return self._apply_tablewise(homogeneous_func, name, numeric_only)
- def _numba_apply(
- self,
- func: Callable[..., Any],
- engine_kwargs: dict[str, bool] | None = None,
- *func_args,
- ):
- window_indexer = self._get_window_indexer()
- min_periods = (
- self.min_periods
- if self.min_periods is not None
- else window_indexer.window_size
- )
- obj = self._create_data(self._selected_obj)
- if self.axis == 1:
- obj = obj.T
- values = self._prep_values(obj.to_numpy())
- if values.ndim == 1:
- values = values.reshape(-1, 1)
- start, end = window_indexer.get_window_bounds(
- num_values=len(values),
- min_periods=min_periods,
- center=self.center,
- closed=self.closed,
- step=self.step,
- )
- self._check_window_bounds(start, end, len(values))
- aggregator = executor.generate_shared_aggregator(
- func, **get_jit_arguments(engine_kwargs)
- )
- result = aggregator(values, start, end, min_periods, *func_args)
- result = result.T if self.axis == 1 else result
- index = self._slice_axis_for_step(obj.index, result)
- if obj.ndim == 1:
- result = result.squeeze()
- out = obj._constructor(result, index=index, name=obj.name)
- return out
- else:
- columns = self._slice_axis_for_step(obj.columns, result.T)
- out = obj._constructor(result, index=index, columns=columns)
- return self._resolve_output(out, obj)
- def aggregate(self, func, *args, **kwargs):
- result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
- if result is None:
- return self.apply(func, raw=False, args=args, kwargs=kwargs)
- return result
- agg = aggregate
- class BaseWindowGroupby(BaseWindow):
- """
- Provide the groupby windowing facilities.
- """
- _grouper: BaseGrouper
- _as_index: bool
- _attributes: list[str] = ["_grouper"]
- def __init__(
- self,
- obj: DataFrame | Series,
- *args,
- _grouper: BaseGrouper,
- _as_index: bool = True,
- **kwargs,
- ) -> None:
- from pandas.core.groupby.ops import BaseGrouper
- if not isinstance(_grouper, BaseGrouper):
- raise ValueError("Must pass a BaseGrouper object.")
- self._grouper = _grouper
- self._as_index = _as_index
- # GH 32262: It's convention to keep the grouping column in
- # groupby.<agg_func>, but unexpected to users in
- # groupby.rolling.<agg_func>
- obj = obj.drop(columns=self._grouper.names, errors="ignore")
- # GH 15354
- if kwargs.get("step") is not None:
- raise NotImplementedError("step not implemented for groupby")
- super().__init__(obj, *args, **kwargs)
- def _apply(
- self,
- func: Callable[..., Any],
- name: str,
- numeric_only: bool = False,
- numba_args: tuple[Any, ...] = (),
- **kwargs,
- ) -> DataFrame | Series:
- result = super()._apply(
- func,
- name,
- numeric_only,
- numba_args,
- **kwargs,
- )
- # Reconstruct the resulting MultiIndex
- # 1st set of levels = group by labels
- # 2nd set of levels = original DataFrame/Series index
- grouped_object_index = self.obj.index
- grouped_index_name = [*grouped_object_index.names]
- groupby_keys = copy.copy(self._grouper.names)
- result_index_names = groupby_keys + grouped_index_name
- drop_columns = [
- key
- for key in self._grouper.names
- if key not in self.obj.index.names or key is None
- ]
- if len(drop_columns) != len(groupby_keys):
- # Our result will have still kept the column in the result
- result = result.drop(columns=drop_columns, errors="ignore")
- codes = self._grouper.codes
- levels = copy.copy(self._grouper.levels)
- group_indices = self._grouper.indices.values()
- if group_indices:
- indexer = np.concatenate(list(group_indices))
- else:
- indexer = np.array([], dtype=np.intp)
- codes = [c.take(indexer) for c in codes]
- # if the index of the original dataframe needs to be preserved, append
- # this index (but reordered) to the codes/levels from the groupby
- if grouped_object_index is not None:
- idx = grouped_object_index.take(indexer)
- if not isinstance(idx, MultiIndex):
- idx = MultiIndex.from_arrays([idx])
- codes.extend(list(idx.codes))
- levels.extend(list(idx.levels))
- result_index = MultiIndex(
- levels, codes, names=result_index_names, verify_integrity=False
- )
- result.index = result_index
- if not self._as_index:
- result = result.reset_index(level=list(range(len(groupby_keys))))
- return result
- def _apply_pairwise(
- self,
- target: DataFrame | Series,
- other: DataFrame | Series | None,
- pairwise: bool | None,
- func: Callable[[DataFrame | Series, DataFrame | Series], DataFrame | Series],
- numeric_only: bool,
- ) -> DataFrame | Series:
- """
- Apply the given pairwise function given 2 pandas objects (DataFrame/Series)
- """
- # Manually drop the grouping column first
- target = target.drop(columns=self._grouper.names, errors="ignore")
- result = super()._apply_pairwise(target, other, pairwise, func, numeric_only)
- # 1) Determine the levels + codes of the groupby levels
- if other is not None and not all(
- len(group) == len(other) for group in self._grouper.indices.values()
- ):
- # GH 42915
- # len(other) != len(any group), so must reindex (expand) the result
- # from flex_binary_moment to a "transform"-like result
- # per groupby combination
- old_result_len = len(result)
- result = concat(
- [
- result.take(gb_indices).reindex(result.index)
- for gb_indices in self._grouper.indices.values()
- ]
- )
- gb_pairs = (
- com.maybe_make_list(pair) for pair in self._grouper.indices.keys()
- )
- groupby_codes = []
- groupby_levels = []
- # e.g. [[1, 2], [4, 5]] as [[1, 4], [2, 5]]
- for gb_level_pair in map(list, zip(*gb_pairs)):
- labels = np.repeat(np.array(gb_level_pair), old_result_len)
- codes, levels = factorize(labels)
- groupby_codes.append(codes)
- groupby_levels.append(levels)
- else:
- # pairwise=True or len(other) == len(each group), so repeat
- # the groupby labels by the number of columns in the original object
- groupby_codes = self._grouper.codes
- # error: Incompatible types in assignment (expression has type
- # "List[Index]", variable has type "List[Union[ndarray, Index]]")
- groupby_levels = self._grouper.levels # type: ignore[assignment]
- group_indices = self._grouper.indices.values()
- if group_indices:
- indexer = np.concatenate(list(group_indices))
- else:
- indexer = np.array([], dtype=np.intp)
- if target.ndim == 1:
- repeat_by = 1
- else:
- repeat_by = len(target.columns)
- groupby_codes = [
- np.repeat(c.take(indexer), repeat_by) for c in groupby_codes
- ]
- # 2) Determine the levels + codes of the result from super()._apply_pairwise
- if isinstance(result.index, MultiIndex):
- result_codes = list(result.index.codes)
- result_levels = list(result.index.levels)
- result_names = list(result.index.names)
- else:
- idx_codes, idx_levels = factorize(result.index)
- result_codes = [idx_codes]
- result_levels = [idx_levels]
- result_names = [result.index.name]
- # 3) Create the resulting index by combining 1) + 2)
- result_codes = groupby_codes + result_codes
- result_levels = groupby_levels + result_levels
- result_names = self._grouper.names + result_names
- result_index = MultiIndex(
- result_levels, result_codes, names=result_names, verify_integrity=False
- )
- result.index = result_index
- return result
- def _create_data(self, obj: NDFrameT, numeric_only: bool = False) -> NDFrameT:
- """
- Split data into blocks & return conformed data.
- """
- # Ensure the object we're rolling over is monotonically sorted relative
- # to the groups
- # GH 36197
- if not obj.empty:
- groupby_order = np.concatenate(list(self._grouper.indices.values())).astype(
- np.int64
- )
- obj = obj.take(groupby_order)
- return super()._create_data(obj, numeric_only)
- def _gotitem(self, key, ndim, subset=None):
- # we are setting the index on the actual object
- # here so our index is carried through to the selected obj
- # when we do the splitting for the groupby
- if self.on is not None:
- # GH 43355
- subset = self.obj.set_index(self._on)
- return super()._gotitem(key, ndim, subset=subset)
- class Window(BaseWindow):
- """
- Provide rolling window calculations.
- Parameters
- ----------
- window : int, timedelta, str, offset, or BaseIndexer subclass
- Size of the moving window.
- If an integer, the fixed number of observations used for
- each window.
- If a timedelta, str, or offset, the time period of each window. Each
- window will be a variable sized based on the observations included in
- the time-period. This is only valid for datetimelike indexes.
- To learn more about the offsets & frequency strings, please see `this link
- <https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`__.
- If a BaseIndexer subclass, the window boundaries
- based on the defined ``get_window_bounds`` method. Additional rolling
- keyword arguments, namely ``min_periods``, ``center``, ``closed`` and
- ``step`` will be passed to ``get_window_bounds``.
- min_periods : int, default None
- Minimum number of observations in window required to have a value;
- otherwise, result is ``np.nan``.
- For a window that is specified by an offset, ``min_periods`` will default to 1.
- For a window that is specified by an integer, ``min_periods`` will default
- to the size of the window.
- center : bool, default False
- If False, set the window labels as the right edge of the window index.
- If True, set the window labels as the center of the window index.
- win_type : str, default None
- If ``None``, all points are evenly weighted.
- If a string, it must be a valid `scipy.signal window function
- <https://docs.scipy.org/doc/scipy/reference/signal.windows.html#module-scipy.signal.windows>`__.
- Certain Scipy window types require additional parameters to be passed
- in the aggregation function. The additional parameters must match
- the keywords specified in the Scipy window type method signature.
- on : str, optional
- For a DataFrame, a column label or Index level on which
- to calculate the rolling window, rather than the DataFrame's index.
- Provided integer column is ignored and excluded from result since
- an integer index is not used to calculate the rolling window.
- axis : int or str, default 0
- If ``0`` or ``'index'``, roll across the rows.
- If ``1`` or ``'columns'``, roll across the columns.
- For `Series` this parameter is unused and defaults to 0.
- closed : str, default None
- If ``'right'``, the first point in the window is excluded from calculations.
- If ``'left'``, the last point in the window is excluded from calculations.
- If ``'both'``, the no points in the window are excluded from calculations.
- If ``'neither'``, the first and last points in the window are excluded
- from calculations.
- Default ``None`` (``'right'``).
- .. versionchanged:: 1.2.0
- The closed parameter with fixed windows is now supported.
- step : int, default None
- .. versionadded:: 1.5.0
- Evaluate the window at every ``step`` result, equivalent to slicing as
- ``[::step]``. ``window`` must be an integer. Using a step argument other
- than None or 1 will produce a result with a different shape than the input.
- method : str {'single', 'table'}, default 'single'
- .. versionadded:: 1.3.0
- Execute the rolling operation per single column or row (``'single'``)
- or over the entire object (``'table'``).
- This argument is only implemented when specifying ``engine='numba'``
- in the method call.
- Returns
- -------
- ``Window`` subclass if a ``win_type`` is passed
- ``Rolling`` subclass if ``win_type`` is not passed
- See Also
- --------
- expanding : Provides expanding transformations.
- ewm : Provides exponential weighted functions.
- Notes
- -----
- See :ref:`Windowing Operations <window.generic>` for further usage details
- and examples.
- Examples
- --------
- >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
- >>> df
- B
- 0 0.0
- 1 1.0
- 2 2.0
- 3 NaN
- 4 4.0
- **window**
- Rolling sum with a window length of 2 observations.
- >>> df.rolling(2).sum()
- B
- 0 NaN
- 1 1.0
- 2 3.0
- 3 NaN
- 4 NaN
- Rolling sum with a window span of 2 seconds.
- >>> df_time = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]},
- ... index = [pd.Timestamp('20130101 09:00:00'),
- ... pd.Timestamp('20130101 09:00:02'),
- ... pd.Timestamp('20130101 09:00:03'),
- ... pd.Timestamp('20130101 09:00:05'),
- ... pd.Timestamp('20130101 09:00:06')])
- >>> df_time
- B
- 2013-01-01 09:00:00 0.0
- 2013-01-01 09:00:02 1.0
- 2013-01-01 09:00:03 2.0
- 2013-01-01 09:00:05 NaN
- 2013-01-01 09:00:06 4.0
- >>> df_time.rolling('2s').sum()
- B
- 2013-01-01 09:00:00 0.0
- 2013-01-01 09:00:02 1.0
- 2013-01-01 09:00:03 3.0
- 2013-01-01 09:00:05 NaN
- 2013-01-01 09:00:06 4.0
- Rolling sum with forward looking windows with 2 observations.
- >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2)
- >>> df.rolling(window=indexer, min_periods=1).sum()
- B
- 0 1.0
- 1 3.0
- 2 2.0
- 3 4.0
- 4 4.0
- **min_periods**
- Rolling sum with a window length of 2 observations, but only needs a minimum of 1
- observation to calculate a value.
- >>> df.rolling(2, min_periods=1).sum()
- B
- 0 0.0
- 1 1.0
- 2 3.0
- 3 2.0
- 4 4.0
- **center**
- Rolling sum with the result assigned to the center of the window index.
- >>> df.rolling(3, min_periods=1, center=True).sum()
- B
- 0 1.0
- 1 3.0
- 2 3.0
- 3 6.0
- 4 4.0
- >>> df.rolling(3, min_periods=1, center=False).sum()
- B
- 0 0.0
- 1 1.0
- 2 3.0
- 3 3.0
- 4 6.0
- **step**
- Rolling sum with a window length of 2 observations, minimum of 1 observation to
- calculate a value, and a step of 2.
- >>> df.rolling(2, min_periods=1, step=2).sum()
- B
- 0 0.0
- 2 3.0
- 4 4.0
- **win_type**
- Rolling sum with a window length of 2, using the Scipy ``'gaussian'``
- window type. ``std`` is required in the aggregation function.
- >>> df.rolling(2, win_type='gaussian').sum(std=3)
- B
- 0 NaN
- 1 0.986207
- 2 2.958621
- 3 NaN
- 4 NaN
- **on**
- Rolling sum with a window length of 2 days.
- >>> df = pd.DataFrame({
- ... 'A': [pd.to_datetime('2020-01-01'),
- ... pd.to_datetime('2020-01-01'),
- ... pd.to_datetime('2020-01-02'),],
- ... 'B': [1, 2, 3], },
- ... index=pd.date_range('2020', periods=3))
- >>> df
- A B
- 2020-01-01 2020-01-01 1
- 2020-01-02 2020-01-01 2
- 2020-01-03 2020-01-02 3
- >>> df.rolling('2D', on='A').sum()
- A B
- 2020-01-01 2020-01-01 1.0
- 2020-01-02 2020-01-01 3.0
- 2020-01-03 2020-01-02 6.0
- """
- _attributes = [
- "window",
- "min_periods",
- "center",
- "win_type",
- "axis",
- "on",
- "closed",
- "step",
- "method",
- ]
- def _validate(self):
- super()._validate()
- if not isinstance(self.win_type, str):
- raise ValueError(f"Invalid win_type {self.win_type}")
- signal = import_optional_dependency(
- "scipy.signal.windows", extra="Scipy is required to generate window weight."
- )
- self._scipy_weight_generator = getattr(signal, self.win_type, None)
- if self._scipy_weight_generator is None:
- raise ValueError(f"Invalid win_type {self.win_type}")
- if isinstance(self.window, BaseIndexer):
- raise NotImplementedError(
- "BaseIndexer subclasses not implemented with win_types."
- )
- if not is_integer(self.window) or self.window < 0:
- raise ValueError("window must be an integer 0 or greater")
- if self.method != "single":
- raise NotImplementedError("'single' is the only supported method type.")
- def _center_window(self, result: np.ndarray, offset: int) -> np.ndarray:
- """
- Center the result in the window for weighted rolling aggregations.
- """
- if offset > 0:
- lead_indexer = [slice(offset, None)]
- result = np.copy(result[tuple(lead_indexer)])
- return result
- def _apply(
- self,
- func: Callable[[np.ndarray, int, int], np.ndarray],
- name: str,
- numeric_only: bool = False,
- numba_args: tuple[Any, ...] = (),
- **kwargs,
- ):
- """
- Rolling with weights statistical measure using supplied function.
- Designed to be used with passed-in Cython array-based functions.
- Parameters
- ----------
- func : callable function to apply
- name : str,
- numeric_only : bool, default False
- Whether to only operate on bool, int, and float columns
- numba_args : tuple
- unused
- **kwargs
- additional arguments for scipy windows if necessary
- Returns
- -------
- y : type of input
- """
- # "None" not callable [misc]
- window = self._scipy_weight_generator( # type: ignore[misc]
- self.window, **kwargs
- )
- offset = (len(window) - 1) // 2 if self.center else 0
- def homogeneous_func(values: np.ndarray):
- # calculation function
- if values.size == 0:
- return values.copy()
- def calc(x):
- additional_nans = np.array([np.nan] * offset)
- x = np.concatenate((x, additional_nans))
- return func(x, window, self.min_periods or len(window))
- with np.errstate(all="ignore"):
- # Our weighted aggregations return memoryviews
- result = np.asarray(calc(values))
- if self.center:
- result = self._center_window(result, offset)
- return result
- return self._apply_blockwise(homogeneous_func, name, numeric_only)[:: self.step]
- @doc(
- _shared_docs["aggregate"],
- see_also=dedent(
- """
- See Also
- --------
- pandas.DataFrame.aggregate : Similar DataFrame method.
- pandas.Series.aggregate : Similar Series method.
- """
- ),
- examples=dedent(
- """
- Examples
- --------
- >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]})
- >>> df
- A B C
- 0 1 4 7
- 1 2 5 8
- 2 3 6 9
- >>> df.rolling(2, win_type="boxcar").agg("mean")
- A B C
- 0 NaN NaN NaN
- 1 1.5 4.5 7.5
- 2 2.5 5.5 8.5
- """
- ),
- klass="Series/DataFrame",
- axis="",
- )
- def aggregate(self, func, *args, **kwargs):
- result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
- if result is None:
- # these must apply directly
- result = func(self)
- return result
- agg = aggregate
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- kwargs_scipy,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also[:-1],
- window_method="rolling",
- aggregation_description="weighted window sum",
- agg_method="sum",
- )
- def sum(self, numeric_only: bool = False, **kwargs):
- window_func = window_aggregations.roll_weighted_sum
- # error: Argument 1 to "_apply" of "Window" has incompatible type
- # "Callable[[ndarray, ndarray, int], ndarray]"; expected
- # "Callable[[ndarray, int, int], ndarray]"
- return self._apply(
- window_func, # type: ignore[arg-type]
- name="sum",
- numeric_only=numeric_only,
- **kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- kwargs_scipy,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also[:-1],
- window_method="rolling",
- aggregation_description="weighted window mean",
- agg_method="mean",
- )
- def mean(self, numeric_only: bool = False, **kwargs):
- window_func = window_aggregations.roll_weighted_mean
- # error: Argument 1 to "_apply" of "Window" has incompatible type
- # "Callable[[ndarray, ndarray, int], ndarray]"; expected
- # "Callable[[ndarray, int, int], ndarray]"
- return self._apply(
- window_func, # type: ignore[arg-type]
- name="mean",
- numeric_only=numeric_only,
- **kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- kwargs_scipy,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also[:-1],
- window_method="rolling",
- aggregation_description="weighted window variance",
- agg_method="var",
- )
- def var(self, ddof: int = 1, numeric_only: bool = False, **kwargs):
- window_func = partial(window_aggregations.roll_weighted_var, ddof=ddof)
- kwargs.pop("name", None)
- return self._apply(window_func, name="var", numeric_only=numeric_only, **kwargs)
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- kwargs_scipy,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also[:-1],
- window_method="rolling",
- aggregation_description="weighted window standard deviation",
- agg_method="std",
- )
- def std(self, ddof: int = 1, numeric_only: bool = False, **kwargs):
- return zsqrt(
- self.var(ddof=ddof, name="std", numeric_only=numeric_only, **kwargs)
- )
- class RollingAndExpandingMixin(BaseWindow):
- def count(self, numeric_only: bool = False):
- window_func = window_aggregations.roll_sum
- return self._apply(window_func, name="count", numeric_only=numeric_only)
- def apply(
- self,
- func: Callable[..., Any],
- raw: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- args: tuple[Any, ...] | None = None,
- kwargs: dict[str, Any] | None = None,
- ):
- if args is None:
- args = ()
- if kwargs is None:
- kwargs = {}
- if not is_bool(raw):
- raise ValueError("raw parameter must be `True` or `False`")
- numba_args: tuple[Any, ...] = ()
- if maybe_use_numba(engine):
- if raw is False:
- raise ValueError("raw must be `True` when using the numba engine")
- numba_args = args
- if self.method == "single":
- apply_func = generate_numba_apply_func(
- func, **get_jit_arguments(engine_kwargs, kwargs)
- )
- else:
- apply_func = generate_numba_table_func(
- func, **get_jit_arguments(engine_kwargs, kwargs)
- )
- elif engine in ("cython", None):
- if engine_kwargs is not None:
- raise ValueError("cython engine does not accept engine_kwargs")
- apply_func = self._generate_cython_apply_func(args, kwargs, raw, func)
- else:
- raise ValueError("engine must be either 'numba' or 'cython'")
- return self._apply(
- apply_func,
- name="apply",
- numba_args=numba_args,
- )
- def _generate_cython_apply_func(
- self,
- args: tuple[Any, ...],
- kwargs: dict[str, Any],
- raw: bool,
- function: Callable[..., Any],
- ) -> Callable[[np.ndarray, np.ndarray, np.ndarray, int], np.ndarray]:
- from pandas import Series
- window_func = partial(
- window_aggregations.roll_apply,
- args=args,
- kwargs=kwargs,
- raw=raw,
- function=function,
- )
- def apply_func(values, begin, end, min_periods, raw=raw):
- if not raw:
- # GH 45912
- values = Series(values, index=self._on, copy=False)
- return window_func(values, begin, end, min_periods)
- return apply_func
- def sum(
- self,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- if maybe_use_numba(engine):
- if self.method == "table":
- func = generate_manual_numpy_nan_agg_with_axis(np.nansum)
- return self.apply(
- func,
- raw=True,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- else:
- from pandas.core._numba.kernels import sliding_sum
- return self._numba_apply(sliding_sum, engine_kwargs)
- window_func = window_aggregations.roll_sum
- return self._apply(window_func, name="sum", numeric_only=numeric_only)
- def max(
- self,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- if maybe_use_numba(engine):
- if self.method == "table":
- func = generate_manual_numpy_nan_agg_with_axis(np.nanmax)
- return self.apply(
- func,
- raw=True,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- else:
- from pandas.core._numba.kernels import sliding_min_max
- return self._numba_apply(sliding_min_max, engine_kwargs, True)
- window_func = window_aggregations.roll_max
- return self._apply(window_func, name="max", numeric_only=numeric_only)
- def min(
- self,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- if maybe_use_numba(engine):
- if self.method == "table":
- func = generate_manual_numpy_nan_agg_with_axis(np.nanmin)
- return self.apply(
- func,
- raw=True,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- else:
- from pandas.core._numba.kernels import sliding_min_max
- return self._numba_apply(sliding_min_max, engine_kwargs, False)
- window_func = window_aggregations.roll_min
- return self._apply(window_func, name="min", numeric_only=numeric_only)
- def mean(
- self,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- if maybe_use_numba(engine):
- if self.method == "table":
- func = generate_manual_numpy_nan_agg_with_axis(np.nanmean)
- return self.apply(
- func,
- raw=True,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- else:
- from pandas.core._numba.kernels import sliding_mean
- return self._numba_apply(sliding_mean, engine_kwargs)
- window_func = window_aggregations.roll_mean
- return self._apply(window_func, name="mean", numeric_only=numeric_only)
- def median(
- self,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- if maybe_use_numba(engine):
- if self.method == "table":
- func = generate_manual_numpy_nan_agg_with_axis(np.nanmedian)
- else:
- func = np.nanmedian
- return self.apply(
- func,
- raw=True,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- window_func = window_aggregations.roll_median_c
- return self._apply(window_func, name="median", numeric_only=numeric_only)
- def std(
- self,
- ddof: int = 1,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- if maybe_use_numba(engine):
- if self.method == "table":
- raise NotImplementedError("std not supported with method='table'")
- from pandas.core._numba.kernels import sliding_var
- return zsqrt(self._numba_apply(sliding_var, engine_kwargs, ddof))
- window_func = window_aggregations.roll_var
- def zsqrt_func(values, begin, end, min_periods):
- return zsqrt(window_func(values, begin, end, min_periods, ddof=ddof))
- return self._apply(
- zsqrt_func,
- name="std",
- numeric_only=numeric_only,
- )
- def var(
- self,
- ddof: int = 1,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- if maybe_use_numba(engine):
- if self.method == "table":
- raise NotImplementedError("var not supported with method='table'")
- from pandas.core._numba.kernels import sliding_var
- return self._numba_apply(sliding_var, engine_kwargs, ddof)
- window_func = partial(window_aggregations.roll_var, ddof=ddof)
- return self._apply(
- window_func,
- name="var",
- numeric_only=numeric_only,
- )
- def skew(self, numeric_only: bool = False):
- window_func = window_aggregations.roll_skew
- return self._apply(
- window_func,
- name="skew",
- numeric_only=numeric_only,
- )
- def sem(self, ddof: int = 1, numeric_only: bool = False):
- # Raise here so error message says sem instead of std
- self._validate_numeric_only("sem", numeric_only)
- return self.std(numeric_only=numeric_only) / (
- self.count(numeric_only=numeric_only) - ddof
- ).pow(0.5)
- def kurt(self, numeric_only: bool = False):
- window_func = window_aggregations.roll_kurt
- return self._apply(
- window_func,
- name="kurt",
- numeric_only=numeric_only,
- )
- def quantile(
- self,
- quantile: float,
- interpolation: QuantileInterpolation = "linear",
- numeric_only: bool = False,
- ):
- if quantile == 1.0:
- window_func = window_aggregations.roll_max
- elif quantile == 0.0:
- window_func = window_aggregations.roll_min
- else:
- window_func = partial(
- window_aggregations.roll_quantile,
- quantile=quantile,
- interpolation=interpolation,
- )
- return self._apply(window_func, name="quantile", numeric_only=numeric_only)
- def rank(
- self,
- method: WindowingRankType = "average",
- ascending: bool = True,
- pct: bool = False,
- numeric_only: bool = False,
- ):
- window_func = partial(
- window_aggregations.roll_rank,
- method=method,
- ascending=ascending,
- percentile=pct,
- )
- return self._apply(window_func, name="rank", numeric_only=numeric_only)
- def cov(
- self,
- other: DataFrame | Series | None = None,
- pairwise: bool | None = None,
- ddof: int = 1,
- numeric_only: bool = False,
- ):
- if self.step is not None:
- raise NotImplementedError("step not implemented for cov")
- self._validate_numeric_only("cov", numeric_only)
- from pandas import Series
- def cov_func(x, y):
- x_array = self._prep_values(x)
- y_array = self._prep_values(y)
- window_indexer = self._get_window_indexer()
- min_periods = (
- self.min_periods
- if self.min_periods is not None
- else window_indexer.window_size
- )
- start, end = window_indexer.get_window_bounds(
- num_values=len(x_array),
- min_periods=min_periods,
- center=self.center,
- closed=self.closed,
- step=self.step,
- )
- self._check_window_bounds(start, end, len(x_array))
- with np.errstate(all="ignore"):
- mean_x_y = window_aggregations.roll_mean(
- x_array * y_array, start, end, min_periods
- )
- mean_x = window_aggregations.roll_mean(x_array, start, end, min_periods)
- mean_y = window_aggregations.roll_mean(y_array, start, end, min_periods)
- count_x_y = window_aggregations.roll_sum(
- notna(x_array + y_array).astype(np.float64), start, end, 0
- )
- result = (mean_x_y - mean_x * mean_y) * (count_x_y / (count_x_y - ddof))
- return Series(result, index=x.index, name=x.name, copy=False)
- return self._apply_pairwise(
- self._selected_obj, other, pairwise, cov_func, numeric_only
- )
- def corr(
- self,
- other: DataFrame | Series | None = None,
- pairwise: bool | None = None,
- ddof: int = 1,
- numeric_only: bool = False,
- ):
- if self.step is not None:
- raise NotImplementedError("step not implemented for corr")
- self._validate_numeric_only("corr", numeric_only)
- from pandas import Series
- def corr_func(x, y):
- x_array = self._prep_values(x)
- y_array = self._prep_values(y)
- window_indexer = self._get_window_indexer()
- min_periods = (
- self.min_periods
- if self.min_periods is not None
- else window_indexer.window_size
- )
- start, end = window_indexer.get_window_bounds(
- num_values=len(x_array),
- min_periods=min_periods,
- center=self.center,
- closed=self.closed,
- step=self.step,
- )
- self._check_window_bounds(start, end, len(x_array))
- with np.errstate(all="ignore"):
- mean_x_y = window_aggregations.roll_mean(
- x_array * y_array, start, end, min_periods
- )
- mean_x = window_aggregations.roll_mean(x_array, start, end, min_periods)
- mean_y = window_aggregations.roll_mean(y_array, start, end, min_periods)
- count_x_y = window_aggregations.roll_sum(
- notna(x_array + y_array).astype(np.float64), start, end, 0
- )
- x_var = window_aggregations.roll_var(
- x_array, start, end, min_periods, ddof
- )
- y_var = window_aggregations.roll_var(
- y_array, start, end, min_periods, ddof
- )
- numerator = (mean_x_y - mean_x * mean_y) * (
- count_x_y / (count_x_y - ddof)
- )
- denominator = (x_var * y_var) ** 0.5
- result = numerator / denominator
- return Series(result, index=x.index, name=x.name, copy=False)
- return self._apply_pairwise(
- self._selected_obj, other, pairwise, corr_func, numeric_only
- )
- class Rolling(RollingAndExpandingMixin):
- _attributes: list[str] = [
- "window",
- "min_periods",
- "center",
- "win_type",
- "axis",
- "on",
- "closed",
- "step",
- "method",
- ]
- def _validate(self):
- super()._validate()
- # we allow rolling on a datetimelike index
- if (
- self.obj.empty
- or isinstance(self._on, (DatetimeIndex, TimedeltaIndex, PeriodIndex))
- ) and isinstance(self.window, (str, BaseOffset, timedelta)):
- self._validate_datetimelike_monotonic()
- # this will raise ValueError on non-fixed freqs
- try:
- freq = to_offset(self.window)
- except (TypeError, ValueError) as err:
- raise ValueError(
- f"passed window {self.window} is not "
- "compatible with a datetimelike index"
- ) from err
- if isinstance(self._on, PeriodIndex):
- # error: Incompatible types in assignment (expression has type
- # "float", variable has type "Optional[int]")
- self._win_freq_i8 = freq.nanos / ( # type: ignore[assignment]
- self._on.freq.nanos / self._on.freq.n
- )
- else:
- self._win_freq_i8 = freq.nanos
- # min_periods must be an integer
- if self.min_periods is None:
- self.min_periods = 1
- if self.step is not None:
- raise NotImplementedError(
- "step is not supported with frequency windows"
- )
- elif isinstance(self.window, BaseIndexer):
- # Passed BaseIndexer subclass should handle all other rolling kwargs
- pass
- elif not is_integer(self.window) or self.window < 0:
- raise ValueError("window must be an integer 0 or greater")
- def _validate_datetimelike_monotonic(self) -> None:
- """
- Validate self._on is monotonic (increasing or decreasing) and has
- no NaT values for frequency windows.
- """
- if self._on.hasnans:
- self._raise_monotonic_error("values must not have NaT")
- if not (self._on.is_monotonic_increasing or self._on.is_monotonic_decreasing):
- self._raise_monotonic_error("values must be monotonic")
- def _raise_monotonic_error(self, msg: str):
- on = self.on
- if on is None:
- if self.axis == 0:
- on = "index"
- else:
- on = "column"
- raise ValueError(f"{on} {msg}")
- @doc(
- _shared_docs["aggregate"],
- see_also=dedent(
- """
- See Also
- --------
- pandas.Series.rolling : Calling object with Series data.
- pandas.DataFrame.rolling : Calling object with DataFrame data.
- """
- ),
- examples=dedent(
- """
- Examples
- --------
- >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]})
- >>> df
- A B C
- 0 1 4 7
- 1 2 5 8
- 2 3 6 9
- >>> df.rolling(2).sum()
- A B C
- 0 NaN NaN NaN
- 1 3.0 9.0 15.0
- 2 5.0 11.0 17.0
- >>> df.rolling(2).agg({"A": "sum", "B": "min"})
- A B
- 0 NaN NaN
- 1 3.0 4.0
- 2 5.0 5.0
- """
- ),
- klass="Series/Dataframe",
- axis="",
- )
- def aggregate(self, func, *args, **kwargs):
- return super().aggregate(func, *args, **kwargs)
- agg = aggregate
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also,
- create_section_header("Examples"),
- dedent(
- """
- >>> s = pd.Series([2, 3, np.nan, 10])
- >>> s.rolling(2).count()
- 0 NaN
- 1 2.0
- 2 1.0
- 3 1.0
- dtype: float64
- >>> s.rolling(3).count()
- 0 NaN
- 1 NaN
- 2 2.0
- 3 2.0
- dtype: float64
- >>> s.rolling(4).count()
- 0 NaN
- 1 NaN
- 2 NaN
- 3 3.0
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="count of non NaN observations",
- agg_method="count",
- )
- def count(self, numeric_only: bool = False):
- return super().count(numeric_only)
- @doc(
- template_header,
- create_section_header("Parameters"),
- window_apply_parameters,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also[:-1],
- window_method="rolling",
- aggregation_description="custom aggregation function",
- agg_method="apply",
- )
- def apply(
- self,
- func: Callable[..., Any],
- raw: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- args: tuple[Any, ...] | None = None,
- kwargs: dict[str, Any] | None = None,
- ):
- return super().apply(
- func,
- raw=raw,
- engine=engine,
- engine_kwargs=engine_kwargs,
- args=args,
- kwargs=kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- window_agg_numba_parameters(),
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also,
- create_section_header("Notes"),
- numba_notes,
- create_section_header("Examples"),
- dedent(
- """
- >>> s = pd.Series([1, 2, 3, 4, 5])
- >>> s
- 0 1
- 1 2
- 2 3
- 3 4
- 4 5
- dtype: int64
- >>> s.rolling(3).sum()
- 0 NaN
- 1 NaN
- 2 6.0
- 3 9.0
- 4 12.0
- dtype: float64
- >>> s.rolling(3, center=True).sum()
- 0 NaN
- 1 6.0
- 2 9.0
- 3 12.0
- 4 NaN
- dtype: float64
- For DataFrame, each sum is computed column-wise.
- >>> df = pd.DataFrame({{"A": s, "B": s ** 2}})
- >>> df
- A B
- 0 1 1
- 1 2 4
- 2 3 9
- 3 4 16
- 4 5 25
- >>> df.rolling(3).sum()
- A B
- 0 NaN NaN
- 1 NaN NaN
- 2 6.0 14.0
- 3 9.0 29.0
- 4 12.0 50.0
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="sum",
- agg_method="sum",
- )
- def sum(
- self,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- return super().sum(
- numeric_only=numeric_only,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- window_agg_numba_parameters(),
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also,
- create_section_header("Notes"),
- numba_notes[:-1],
- window_method="rolling",
- aggregation_description="maximum",
- agg_method="max",
- )
- def max(
- self,
- numeric_only: bool = False,
- *args,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- **kwargs,
- ):
- return super().max(
- numeric_only=numeric_only,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- window_agg_numba_parameters(),
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also,
- create_section_header("Notes"),
- numba_notes,
- create_section_header("Examples"),
- dedent(
- """
- Performing a rolling minimum with a window size of 3.
- >>> s = pd.Series([4, 3, 5, 2, 6])
- >>> s.rolling(3).min()
- 0 NaN
- 1 NaN
- 2 3.0
- 3 2.0
- 4 2.0
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="minimum",
- agg_method="min",
- )
- def min(
- self,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- return super().min(
- numeric_only=numeric_only,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- window_agg_numba_parameters(),
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also,
- create_section_header("Notes"),
- numba_notes,
- create_section_header("Examples"),
- dedent(
- """
- The below examples will show rolling mean calculations with window sizes of
- two and three, respectively.
- >>> s = pd.Series([1, 2, 3, 4])
- >>> s.rolling(2).mean()
- 0 NaN
- 1 1.5
- 2 2.5
- 3 3.5
- dtype: float64
- >>> s.rolling(3).mean()
- 0 NaN
- 1 NaN
- 2 2.0
- 3 3.0
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="mean",
- agg_method="mean",
- )
- def mean(
- self,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- return super().mean(
- numeric_only=numeric_only,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- window_agg_numba_parameters(),
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also,
- create_section_header("Notes"),
- numba_notes,
- create_section_header("Examples"),
- dedent(
- """
- Compute the rolling median of a series with a window size of 3.
- >>> s = pd.Series([0, 1, 2, 3, 4])
- >>> s.rolling(3).median()
- 0 NaN
- 1 NaN
- 2 1.0
- 3 2.0
- 4 3.0
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="median",
- agg_method="median",
- )
- def median(
- self,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- return super().median(
- numeric_only=numeric_only,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- dedent(
- """
- ddof : int, default 1
- Delta Degrees of Freedom. The divisor used in calculations
- is ``N - ddof``, where ``N`` represents the number of elements.
- """
- ).replace("\n", "", 1),
- kwargs_numeric_only,
- window_agg_numba_parameters("1.4"),
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- "numpy.std : Equivalent method for NumPy array.\n",
- template_see_also,
- create_section_header("Notes"),
- dedent(
- """
- The default ``ddof`` of 1 used in :meth:`Series.std` is different
- than the default ``ddof`` of 0 in :func:`numpy.std`.
- A minimum of one period is required for the rolling calculation.\n
- """
- ).replace("\n", "", 1),
- create_section_header("Examples"),
- dedent(
- """
- >>> s = pd.Series([5, 5, 6, 7, 5, 5, 5])
- >>> s.rolling(3).std()
- 0 NaN
- 1 NaN
- 2 0.577350
- 3 1.000000
- 4 1.000000
- 5 1.154701
- 6 0.000000
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="standard deviation",
- agg_method="std",
- )
- def std(
- self,
- ddof: int = 1,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- return super().std(
- ddof=ddof,
- numeric_only=numeric_only,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- dedent(
- """
- ddof : int, default 1
- Delta Degrees of Freedom. The divisor used in calculations
- is ``N - ddof``, where ``N`` represents the number of elements.
- """
- ).replace("\n", "", 1),
- kwargs_numeric_only,
- window_agg_numba_parameters("1.4"),
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- "numpy.var : Equivalent method for NumPy array.\n",
- template_see_also,
- create_section_header("Notes"),
- dedent(
- """
- The default ``ddof`` of 1 used in :meth:`Series.var` is different
- than the default ``ddof`` of 0 in :func:`numpy.var`.
- A minimum of one period is required for the rolling calculation.\n
- """
- ).replace("\n", "", 1),
- create_section_header("Examples"),
- dedent(
- """
- >>> s = pd.Series([5, 5, 6, 7, 5, 5, 5])
- >>> s.rolling(3).var()
- 0 NaN
- 1 NaN
- 2 0.333333
- 3 1.000000
- 4 1.000000
- 5 1.333333
- 6 0.000000
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="variance",
- agg_method="var",
- )
- def var(
- self,
- ddof: int = 1,
- numeric_only: bool = False,
- engine: str | None = None,
- engine_kwargs: dict[str, bool] | None = None,
- ):
- return super().var(
- ddof=ddof,
- numeric_only=numeric_only,
- engine=engine,
- engine_kwargs=engine_kwargs,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- "scipy.stats.skew : Third moment of a probability density.\n",
- template_see_also,
- create_section_header("Notes"),
- "A minimum of three periods is required for the rolling calculation.\n",
- window_method="rolling",
- aggregation_description="unbiased skewness",
- agg_method="skew",
- )
- def skew(self, numeric_only: bool = False):
- return super().skew(numeric_only=numeric_only)
- @doc(
- template_header,
- create_section_header("Parameters"),
- dedent(
- """
- ddof : int, default 1
- Delta Degrees of Freedom. The divisor used in calculations
- is ``N - ddof``, where ``N`` represents the number of elements.
- """
- ).replace("\n", "", 1),
- kwargs_numeric_only,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also,
- create_section_header("Notes"),
- "A minimum of one period is required for the calculation.\n\n",
- create_section_header("Examples"),
- dedent(
- """
- >>> s = pd.Series([0, 1, 2, 3])
- >>> s.rolling(2, min_periods=1).sem()
- 0 NaN
- 1 0.707107
- 2 0.707107
- 3 0.707107
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="standard error of mean",
- agg_method="sem",
- )
- def sem(self, ddof: int = 1, numeric_only: bool = False):
- # Raise here so error message says sem instead of std
- self._validate_numeric_only("sem", numeric_only)
- return self.std(numeric_only=numeric_only) / (
- self.count(numeric_only) - ddof
- ).pow(0.5)
- @doc(
- template_header,
- create_section_header("Parameters"),
- kwargs_numeric_only,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- "scipy.stats.kurtosis : Reference SciPy method.\n",
- template_see_also,
- create_section_header("Notes"),
- "A minimum of four periods is required for the calculation.\n\n",
- create_section_header("Examples"),
- dedent(
- """
- The example below will show a rolling calculation with a window size of
- four matching the equivalent function call using `scipy.stats`.
- >>> arr = [1, 2, 3, 4, 999]
- >>> import scipy.stats
- >>> print(f"{{scipy.stats.kurtosis(arr[:-1], bias=False):.6f}}")
- -1.200000
- >>> print(f"{{scipy.stats.kurtosis(arr[1:], bias=False):.6f}}")
- 3.999946
- >>> s = pd.Series(arr)
- >>> s.rolling(4).kurt()
- 0 NaN
- 1 NaN
- 2 NaN
- 3 -1.200000
- 4 3.999946
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="Fisher's definition of kurtosis without bias",
- agg_method="kurt",
- )
- def kurt(self, numeric_only: bool = False):
- return super().kurt(numeric_only=numeric_only)
- @doc(
- template_header,
- create_section_header("Parameters"),
- dedent(
- """
- quantile : float
- Quantile to compute. 0 <= quantile <= 1.
- interpolation : {{'linear', 'lower', 'higher', 'midpoint', 'nearest'}}
- This optional parameter specifies the interpolation method to use,
- when the desired quantile lies between two data points `i` and `j`:
- * linear: `i + (j - i) * fraction`, where `fraction` is the
- fractional part of the index surrounded by `i` and `j`.
- * lower: `i`.
- * higher: `j`.
- * nearest: `i` or `j` whichever is nearest.
- * midpoint: (`i` + `j`) / 2.
- """
- ).replace("\n", "", 1),
- kwargs_numeric_only,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also,
- create_section_header("Examples"),
- dedent(
- """
- >>> s = pd.Series([1, 2, 3, 4])
- >>> s.rolling(2).quantile(.4, interpolation='lower')
- 0 NaN
- 1 1.0
- 2 2.0
- 3 3.0
- dtype: float64
- >>> s.rolling(2).quantile(.4, interpolation='midpoint')
- 0 NaN
- 1 1.5
- 2 2.5
- 3 3.5
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="quantile",
- agg_method="quantile",
- )
- def quantile(
- self,
- quantile: float,
- interpolation: QuantileInterpolation = "linear",
- numeric_only: bool = False,
- ):
- return super().quantile(
- quantile=quantile,
- interpolation=interpolation,
- numeric_only=numeric_only,
- )
- @doc(
- template_header,
- ".. versionadded:: 1.4.0 \n\n",
- create_section_header("Parameters"),
- dedent(
- """
- method : {{'average', 'min', 'max'}}, default 'average'
- How to rank the group of records that have the same value (i.e. ties):
- * average: average rank of the group
- * min: lowest rank in the group
- * max: highest rank in the group
- ascending : bool, default True
- Whether or not the elements should be ranked in ascending order.
- pct : bool, default False
- Whether or not to display the returned rankings in percentile
- form.
- """
- ).replace("\n", "", 1),
- kwargs_numeric_only,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also,
- create_section_header("Examples"),
- dedent(
- """
- >>> s = pd.Series([1, 4, 2, 3, 5, 3])
- >>> s.rolling(3).rank()
- 0 NaN
- 1 NaN
- 2 2.0
- 3 2.0
- 4 3.0
- 5 1.5
- dtype: float64
- >>> s.rolling(3).rank(method="max")
- 0 NaN
- 1 NaN
- 2 2.0
- 3 2.0
- 4 3.0
- 5 2.0
- dtype: float64
- >>> s.rolling(3).rank(method="min")
- 0 NaN
- 1 NaN
- 2 2.0
- 3 2.0
- 4 3.0
- 5 1.0
- dtype: float64
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="rank",
- agg_method="rank",
- )
- def rank(
- self,
- method: WindowingRankType = "average",
- ascending: bool = True,
- pct: bool = False,
- numeric_only: bool = False,
- ):
- return super().rank(
- method=method,
- ascending=ascending,
- pct=pct,
- numeric_only=numeric_only,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- dedent(
- """
- other : Series or DataFrame, optional
- If not supplied then will default to self and produce pairwise
- output.
- pairwise : bool, default None
- If False then only matching columns between self and other will be
- used and the output will be a DataFrame.
- If True then all pairwise combinations will be calculated and the
- output will be a MultiIndexed DataFrame in the case of DataFrame
- inputs. In the case of missing elements, only complete pairwise
- observations will be used.
- ddof : int, default 1
- Delta Degrees of Freedom. The divisor used in calculations
- is ``N - ddof``, where ``N`` represents the number of elements.
- """
- ).replace("\n", "", 1),
- kwargs_numeric_only,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- template_see_also[:-1],
- window_method="rolling",
- aggregation_description="sample covariance",
- agg_method="cov",
- )
- def cov(
- self,
- other: DataFrame | Series | None = None,
- pairwise: bool | None = None,
- ddof: int = 1,
- numeric_only: bool = False,
- ):
- return super().cov(
- other=other,
- pairwise=pairwise,
- ddof=ddof,
- numeric_only=numeric_only,
- )
- @doc(
- template_header,
- create_section_header("Parameters"),
- dedent(
- """
- other : Series or DataFrame, optional
- If not supplied then will default to self and produce pairwise
- output.
- pairwise : bool, default None
- If False then only matching columns between self and other will be
- used and the output will be a DataFrame.
- If True then all pairwise combinations will be calculated and the
- output will be a MultiIndexed DataFrame in the case of DataFrame
- inputs. In the case of missing elements, only complete pairwise
- observations will be used.
- ddof : int, default 1
- Delta Degrees of Freedom. The divisor used in calculations
- is ``N - ddof``, where ``N`` represents the number of elements.
- """
- ).replace("\n", "", 1),
- kwargs_numeric_only,
- create_section_header("Returns"),
- template_returns,
- create_section_header("See Also"),
- dedent(
- """
- cov : Similar method to calculate covariance.
- numpy.corrcoef : NumPy Pearson's correlation calculation.
- """
- ).replace("\n", "", 1),
- template_see_also,
- create_section_header("Notes"),
- dedent(
- """
- This function uses Pearson's definition of correlation
- (https://en.wikipedia.org/wiki/Pearson_correlation_coefficient).
- When `other` is not specified, the output will be self correlation (e.g.
- all 1's), except for :class:`~pandas.DataFrame` inputs with `pairwise`
- set to `True`.
- Function will return ``NaN`` for correlations of equal valued sequences;
- this is the result of a 0/0 division error.
- When `pairwise` is set to `False`, only matching columns between `self` and
- `other` will be used.
- When `pairwise` is set to `True`, the output will be a MultiIndex DataFrame
- with the original index on the first level, and the `other` DataFrame
- columns on the second level.
- In the case of missing elements, only complete pairwise observations
- will be used.\n
- """
- ).replace("\n", "", 1),
- create_section_header("Examples"),
- dedent(
- """
- The below example shows a rolling calculation with a window size of
- four matching the equivalent function call using :meth:`numpy.corrcoef`.
- >>> v1 = [3, 3, 3, 5, 8]
- >>> v2 = [3, 4, 4, 4, 8]
- >>> # numpy returns a 2X2 array, the correlation coefficient
- >>> # is the number at entry [0][1]
- >>> print(f"{{np.corrcoef(v1[:-1], v2[:-1])[0][1]:.6f}}")
- 0.333333
- >>> print(f"{{np.corrcoef(v1[1:], v2[1:])[0][1]:.6f}}")
- 0.916949
- >>> s1 = pd.Series(v1)
- >>> s2 = pd.Series(v2)
- >>> s1.rolling(4).corr(s2)
- 0 NaN
- 1 NaN
- 2 NaN
- 3 0.333333
- 4 0.916949
- dtype: float64
- The below example shows a similar rolling calculation on a
- DataFrame using the pairwise option.
- >>> matrix = np.array([[51., 35.], [49., 30.], [47., 32.],\
- [46., 31.], [50., 36.]])
- >>> print(np.corrcoef(matrix[:-1,0], matrix[:-1,1]).round(7))
- [[1. 0.6263001]
- [0.6263001 1. ]]
- >>> print(np.corrcoef(matrix[1:,0], matrix[1:,1]).round(7))
- [[1. 0.5553681]
- [0.5553681 1. ]]
- >>> df = pd.DataFrame(matrix, columns=['X','Y'])
- >>> df
- X Y
- 0 51.0 35.0
- 1 49.0 30.0
- 2 47.0 32.0
- 3 46.0 31.0
- 4 50.0 36.0
- >>> df.rolling(4).corr(pairwise=True)
- X Y
- 0 X NaN NaN
- Y NaN NaN
- 1 X NaN NaN
- Y NaN NaN
- 2 X NaN NaN
- Y NaN NaN
- 3 X 1.000000 0.626300
- Y 0.626300 1.000000
- 4 X 1.000000 0.555368
- Y 0.555368 1.000000
- """
- ).replace("\n", "", 1),
- window_method="rolling",
- aggregation_description="correlation",
- agg_method="corr",
- )
- def corr(
- self,
- other: DataFrame | Series | None = None,
- pairwise: bool | None = None,
- ddof: int = 1,
- numeric_only: bool = False,
- ):
- return super().corr(
- other=other,
- pairwise=pairwise,
- ddof=ddof,
- numeric_only=numeric_only,
- )
- Rolling.__doc__ = Window.__doc__
- class RollingGroupby(BaseWindowGroupby, Rolling):
- """
- Provide a rolling groupby implementation.
- """
- _attributes = Rolling._attributes + BaseWindowGroupby._attributes
- def _get_window_indexer(self) -> GroupbyIndexer:
- """
- Return an indexer class that will compute the window start and end bounds
- Returns
- -------
- GroupbyIndexer
- """
- rolling_indexer: type[BaseIndexer]
- indexer_kwargs: dict[str, Any] | None = None
- index_array = self._index_array
- if isinstance(self.window, BaseIndexer):
- rolling_indexer = type(self.window)
- indexer_kwargs = self.window.__dict__.copy()
- assert isinstance(indexer_kwargs, dict) # for mypy
- # We'll be using the index of each group later
- indexer_kwargs.pop("index_array", None)
- window = self.window
- elif self._win_freq_i8 is not None:
- rolling_indexer = VariableWindowIndexer
- # error: Incompatible types in assignment (expression has type
- # "int", variable has type "BaseIndexer")
- window = self._win_freq_i8 # type: ignore[assignment]
- else:
- rolling_indexer = FixedWindowIndexer
- window = self.window
- window_indexer = GroupbyIndexer(
- index_array=index_array,
- window_size=window,
- groupby_indices=self._grouper.indices,
- window_indexer=rolling_indexer,
- indexer_kwargs=indexer_kwargs,
- )
- return window_indexer
- def _validate_datetimelike_monotonic(self):
- """
- Validate that each group in self._on is monotonic
- """
- # GH 46061
- if self._on.hasnans:
- self._raise_monotonic_error("values must not have NaT")
- for group_indices in self._grouper.indices.values():
- group_on = self._on.take(group_indices)
- if not (
- group_on.is_monotonic_increasing or group_on.is_monotonic_decreasing
- ):
- on = "index" if self.on is None else self.on
- raise ValueError(
- f"Each group within {on} must be monotonic. "
- f"Sort the values in {on} first."
- )
|