123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- """Indexer objects for computing start/end window bounds for rolling operations"""
- from __future__ import annotations
- from datetime import timedelta
- import numpy as np
- from pandas._libs.window.indexers import calculate_variable_window_bounds
- from pandas.util._decorators import Appender
- from pandas.core.dtypes.common import ensure_platform_int
- from pandas.tseries.offsets import Nano
- get_window_bounds_doc = """
- Computes the bounds of a window.
- Parameters
- ----------
- num_values : int, default 0
- number of values that will be aggregated over
- window_size : int, default 0
- the number of rows in a window
- min_periods : int, default None
- min_periods passed from the top level rolling API
- center : bool, default None
- center passed from the top level rolling API
- closed : str, default None
- closed passed from the top level rolling API
- step : int, default None
- step passed from the top level rolling API
- .. versionadded:: 1.5
- win_type : str, default None
- win_type passed from the top level rolling API
- Returns
- -------
- A tuple of ndarray[int64]s, indicating the boundaries of each
- window
- """
- class BaseIndexer:
- """Base class for window bounds calculations."""
- def __init__(
- self, index_array: np.ndarray | None = None, window_size: int = 0, **kwargs
- ) -> None:
- """
- Parameters
- ----------
- **kwargs :
- keyword arguments that will be available when get_window_bounds is called
- """
- self.index_array = index_array
- self.window_size = window_size
- # Set user defined kwargs as attributes that can be used in get_window_bounds
- for key, value in kwargs.items():
- setattr(self, key, value)
- @Appender(get_window_bounds_doc)
- def get_window_bounds(
- self,
- num_values: int = 0,
- min_periods: int | None = None,
- center: bool | None = None,
- closed: str | None = None,
- step: int | None = None,
- ) -> tuple[np.ndarray, np.ndarray]:
- raise NotImplementedError
- class FixedWindowIndexer(BaseIndexer):
- """Creates window boundaries that are of fixed length."""
- @Appender(get_window_bounds_doc)
- def get_window_bounds(
- self,
- num_values: int = 0,
- min_periods: int | None = None,
- center: bool | None = None,
- closed: str | None = None,
- step: int | None = None,
- ) -> tuple[np.ndarray, np.ndarray]:
- if center:
- offset = (self.window_size - 1) // 2
- else:
- offset = 0
- end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64")
- start = end - self.window_size
- if closed in ["left", "both"]:
- start -= 1
- if closed in ["left", "neither"]:
- end -= 1
- end = np.clip(end, 0, num_values)
- start = np.clip(start, 0, num_values)
- return start, end
- class VariableWindowIndexer(BaseIndexer):
- """Creates window boundaries that are of variable length, namely for time series."""
- @Appender(get_window_bounds_doc)
- def get_window_bounds(
- self,
- num_values: int = 0,
- min_periods: int | None = None,
- center: bool | None = None,
- closed: str | None = None,
- step: int | None = None,
- ) -> tuple[np.ndarray, np.ndarray]:
- # error: Argument 4 to "calculate_variable_window_bounds" has incompatible
- # type "Optional[bool]"; expected "bool"
- # error: Argument 6 to "calculate_variable_window_bounds" has incompatible
- # type "Optional[ndarray]"; expected "ndarray"
- return calculate_variable_window_bounds(
- num_values,
- self.window_size,
- min_periods,
- center, # type: ignore[arg-type]
- closed,
- self.index_array, # type: ignore[arg-type]
- )
- class VariableOffsetWindowIndexer(BaseIndexer):
- """Calculate window boundaries based on a non-fixed offset such as a BusinessDay."""
- def __init__(
- self,
- index_array: np.ndarray | None = None,
- window_size: int = 0,
- index=None,
- offset=None,
- **kwargs,
- ) -> None:
- super().__init__(index_array, window_size, **kwargs)
- self.index = index
- self.offset = offset
- @Appender(get_window_bounds_doc)
- def get_window_bounds(
- self,
- num_values: int = 0,
- min_periods: int | None = None,
- center: bool | None = None,
- closed: str | None = None,
- step: int | None = None,
- ) -> tuple[np.ndarray, np.ndarray]:
- if step is not None:
- raise NotImplementedError("step not implemented for variable offset window")
- if num_values <= 0:
- return np.empty(0, dtype="int64"), np.empty(0, dtype="int64")
- # if windows is variable, default is 'right', otherwise default is 'both'
- if closed is None:
- closed = "right" if self.index is not None else "both"
- right_closed = closed in ["right", "both"]
- left_closed = closed in ["left", "both"]
- if self.index[num_values - 1] < self.index[0]:
- index_growth_sign = -1
- else:
- index_growth_sign = 1
- start = np.empty(num_values, dtype="int64")
- start.fill(-1)
- end = np.empty(num_values, dtype="int64")
- end.fill(-1)
- start[0] = 0
- # right endpoint is closed
- if right_closed:
- end[0] = 1
- # right endpoint is open
- else:
- end[0] = 0
- # start is start of slice interval (including)
- # end is end of slice interval (not including)
- for i in range(1, num_values):
- end_bound = self.index[i]
- start_bound = self.index[i] - index_growth_sign * self.offset
- # left endpoint is closed
- if left_closed:
- start_bound -= Nano(1)
- # advance the start bound until we are
- # within the constraint
- start[i] = i
- for j in range(start[i - 1], i):
- if (self.index[j] - start_bound) * index_growth_sign > timedelta(0):
- start[i] = j
- break
- # end bound is previous end
- # or current index
- if (self.index[end[i - 1]] - end_bound) * index_growth_sign <= timedelta(0):
- end[i] = i + 1
- else:
- end[i] = end[i - 1]
- # right endpoint is open
- if not right_closed:
- end[i] -= 1
- return start, end
- class ExpandingIndexer(BaseIndexer):
- """Calculate expanding window bounds, mimicking df.expanding()"""
- @Appender(get_window_bounds_doc)
- def get_window_bounds(
- self,
- num_values: int = 0,
- min_periods: int | None = None,
- center: bool | None = None,
- closed: str | None = None,
- step: int | None = None,
- ) -> tuple[np.ndarray, np.ndarray]:
- return (
- np.zeros(num_values, dtype=np.int64),
- np.arange(1, num_values + 1, dtype=np.int64),
- )
- class FixedForwardWindowIndexer(BaseIndexer):
- """
- Creates window boundaries for fixed-length windows that include the current row.
- Examples
- --------
- >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
- >>> df
- B
- 0 0.0
- 1 1.0
- 2 2.0
- 3 NaN
- 4 4.0
- >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2)
- >>> df.rolling(window=indexer, min_periods=1).sum()
- B
- 0 1.0
- 1 3.0
- 2 2.0
- 3 4.0
- 4 4.0
- """
- @Appender(get_window_bounds_doc)
- def get_window_bounds(
- self,
- num_values: int = 0,
- min_periods: int | None = None,
- center: bool | None = None,
- closed: str | None = None,
- step: int | None = None,
- ) -> tuple[np.ndarray, np.ndarray]:
- if center:
- raise ValueError("Forward-looking windows can't have center=True")
- if closed is not None:
- raise ValueError(
- "Forward-looking windows don't support setting the closed argument"
- )
- if step is None:
- step = 1
- start = np.arange(0, num_values, step, dtype="int64")
- end = start + self.window_size
- if self.window_size:
- end = np.clip(end, 0, num_values)
- return start, end
- class GroupbyIndexer(BaseIndexer):
- """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""
- def __init__(
- self,
- index_array: np.ndarray | None = None,
- window_size: int | BaseIndexer = 0,
- groupby_indices: dict | None = None,
- window_indexer: type[BaseIndexer] = BaseIndexer,
- indexer_kwargs: dict | None = None,
- **kwargs,
- ) -> None:
- """
- Parameters
- ----------
- index_array : np.ndarray or None
- np.ndarray of the index of the original object that we are performing
- a chained groupby operation over. This index has been pre-sorted relative to
- the groups
- window_size : int or BaseIndexer
- window size during the windowing operation
- groupby_indices : dict or None
- dict of {group label: [positional index of rows belonging to the group]}
- window_indexer : BaseIndexer
- BaseIndexer class determining the start and end bounds of each group
- indexer_kwargs : dict or None
- Custom kwargs to be passed to window_indexer
- **kwargs :
- keyword arguments that will be available when get_window_bounds is called
- """
- self.groupby_indices = groupby_indices or {}
- self.window_indexer = window_indexer
- self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
- super().__init__(
- index_array=index_array,
- window_size=self.indexer_kwargs.pop("window_size", window_size),
- **kwargs,
- )
- @Appender(get_window_bounds_doc)
- def get_window_bounds(
- self,
- num_values: int = 0,
- min_periods: int | None = None,
- center: bool | None = None,
- closed: str | None = None,
- step: int | None = None,
- ) -> tuple[np.ndarray, np.ndarray]:
- # 1) For each group, get the indices that belong to the group
- # 2) Use the indices to calculate the start & end bounds of the window
- # 3) Append the window bounds in group order
- start_arrays = []
- end_arrays = []
- window_indices_start = 0
- for key, indices in self.groupby_indices.items():
- index_array: np.ndarray | None
- if self.index_array is not None:
- index_array = self.index_array.take(ensure_platform_int(indices))
- else:
- index_array = self.index_array
- indexer = self.window_indexer(
- index_array=index_array,
- window_size=self.window_size,
- **self.indexer_kwargs,
- )
- start, end = indexer.get_window_bounds(
- len(indices), min_periods, center, closed, step
- )
- start = start.astype(np.int64)
- end = end.astype(np.int64)
- assert len(start) == len(
- end
- ), "these should be equal in length from get_window_bounds"
- # Cannot use groupby_indices as they might not be monotonic with the object
- # we're rolling over
- window_indices = np.arange(
- window_indices_start, window_indices_start + len(indices)
- )
- window_indices_start += len(indices)
- # Extend as we'll be slicing window like [start, end)
- window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype(
- np.int64, copy=False
- )
- start_arrays.append(window_indices.take(ensure_platform_int(start)))
- end_arrays.append(window_indices.take(ensure_platform_int(end)))
- if len(start_arrays) == 0:
- return np.array([], dtype=np.int64), np.array([], dtype=np.int64)
- start = np.concatenate(start_arrays)
- end = np.concatenate(end_arrays)
- return start, end
- class ExponentialMovingWindowIndexer(BaseIndexer):
- """Calculate ewm window bounds (the entire window)"""
- @Appender(get_window_bounds_doc)
- def get_window_bounds(
- self,
- num_values: int = 0,
- min_periods: int | None = None,
- center: bool | None = None,
- closed: str | None = None,
- step: int | None = None,
- ) -> tuple[np.ndarray, np.ndarray]:
- return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)
|