objects.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. """Indexer objects for computing start/end window bounds for rolling operations"""
  2. from __future__ import annotations
  3. from datetime import timedelta
  4. import numpy as np
  5. from pandas._libs.window.indexers import calculate_variable_window_bounds
  6. from pandas.util._decorators import Appender
  7. from pandas.core.dtypes.common import ensure_platform_int
  8. from pandas.tseries.offsets import Nano
  9. get_window_bounds_doc = """
  10. Computes the bounds of a window.
  11. Parameters
  12. ----------
  13. num_values : int, default 0
  14. number of values that will be aggregated over
  15. window_size : int, default 0
  16. the number of rows in a window
  17. min_periods : int, default None
  18. min_periods passed from the top level rolling API
  19. center : bool, default None
  20. center passed from the top level rolling API
  21. closed : str, default None
  22. closed passed from the top level rolling API
  23. step : int, default None
  24. step passed from the top level rolling API
  25. .. versionadded:: 1.5
  26. win_type : str, default None
  27. win_type passed from the top level rolling API
  28. Returns
  29. -------
  30. A tuple of ndarray[int64]s, indicating the boundaries of each
  31. window
  32. """
  33. class BaseIndexer:
  34. """Base class for window bounds calculations."""
  35. def __init__(
  36. self, index_array: np.ndarray | None = None, window_size: int = 0, **kwargs
  37. ) -> None:
  38. """
  39. Parameters
  40. ----------
  41. **kwargs :
  42. keyword arguments that will be available when get_window_bounds is called
  43. """
  44. self.index_array = index_array
  45. self.window_size = window_size
  46. # Set user defined kwargs as attributes that can be used in get_window_bounds
  47. for key, value in kwargs.items():
  48. setattr(self, key, value)
  49. @Appender(get_window_bounds_doc)
  50. def get_window_bounds(
  51. self,
  52. num_values: int = 0,
  53. min_periods: int | None = None,
  54. center: bool | None = None,
  55. closed: str | None = None,
  56. step: int | None = None,
  57. ) -> tuple[np.ndarray, np.ndarray]:
  58. raise NotImplementedError
  59. class FixedWindowIndexer(BaseIndexer):
  60. """Creates window boundaries that are of fixed length."""
  61. @Appender(get_window_bounds_doc)
  62. def get_window_bounds(
  63. self,
  64. num_values: int = 0,
  65. min_periods: int | None = None,
  66. center: bool | None = None,
  67. closed: str | None = None,
  68. step: int | None = None,
  69. ) -> tuple[np.ndarray, np.ndarray]:
  70. if center:
  71. offset = (self.window_size - 1) // 2
  72. else:
  73. offset = 0
  74. end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64")
  75. start = end - self.window_size
  76. if closed in ["left", "both"]:
  77. start -= 1
  78. if closed in ["left", "neither"]:
  79. end -= 1
  80. end = np.clip(end, 0, num_values)
  81. start = np.clip(start, 0, num_values)
  82. return start, end
  83. class VariableWindowIndexer(BaseIndexer):
  84. """Creates window boundaries that are of variable length, namely for time series."""
  85. @Appender(get_window_bounds_doc)
  86. def get_window_bounds(
  87. self,
  88. num_values: int = 0,
  89. min_periods: int | None = None,
  90. center: bool | None = None,
  91. closed: str | None = None,
  92. step: int | None = None,
  93. ) -> tuple[np.ndarray, np.ndarray]:
  94. # error: Argument 4 to "calculate_variable_window_bounds" has incompatible
  95. # type "Optional[bool]"; expected "bool"
  96. # error: Argument 6 to "calculate_variable_window_bounds" has incompatible
  97. # type "Optional[ndarray]"; expected "ndarray"
  98. return calculate_variable_window_bounds(
  99. num_values,
  100. self.window_size,
  101. min_periods,
  102. center, # type: ignore[arg-type]
  103. closed,
  104. self.index_array, # type: ignore[arg-type]
  105. )
  106. class VariableOffsetWindowIndexer(BaseIndexer):
  107. """Calculate window boundaries based on a non-fixed offset such as a BusinessDay."""
  108. def __init__(
  109. self,
  110. index_array: np.ndarray | None = None,
  111. window_size: int = 0,
  112. index=None,
  113. offset=None,
  114. **kwargs,
  115. ) -> None:
  116. super().__init__(index_array, window_size, **kwargs)
  117. self.index = index
  118. self.offset = offset
  119. @Appender(get_window_bounds_doc)
  120. def get_window_bounds(
  121. self,
  122. num_values: int = 0,
  123. min_periods: int | None = None,
  124. center: bool | None = None,
  125. closed: str | None = None,
  126. step: int | None = None,
  127. ) -> tuple[np.ndarray, np.ndarray]:
  128. if step is not None:
  129. raise NotImplementedError("step not implemented for variable offset window")
  130. if num_values <= 0:
  131. return np.empty(0, dtype="int64"), np.empty(0, dtype="int64")
  132. # if windows is variable, default is 'right', otherwise default is 'both'
  133. if closed is None:
  134. closed = "right" if self.index is not None else "both"
  135. right_closed = closed in ["right", "both"]
  136. left_closed = closed in ["left", "both"]
  137. if self.index[num_values - 1] < self.index[0]:
  138. index_growth_sign = -1
  139. else:
  140. index_growth_sign = 1
  141. start = np.empty(num_values, dtype="int64")
  142. start.fill(-1)
  143. end = np.empty(num_values, dtype="int64")
  144. end.fill(-1)
  145. start[0] = 0
  146. # right endpoint is closed
  147. if right_closed:
  148. end[0] = 1
  149. # right endpoint is open
  150. else:
  151. end[0] = 0
  152. # start is start of slice interval (including)
  153. # end is end of slice interval (not including)
  154. for i in range(1, num_values):
  155. end_bound = self.index[i]
  156. start_bound = self.index[i] - index_growth_sign * self.offset
  157. # left endpoint is closed
  158. if left_closed:
  159. start_bound -= Nano(1)
  160. # advance the start bound until we are
  161. # within the constraint
  162. start[i] = i
  163. for j in range(start[i - 1], i):
  164. if (self.index[j] - start_bound) * index_growth_sign > timedelta(0):
  165. start[i] = j
  166. break
  167. # end bound is previous end
  168. # or current index
  169. if (self.index[end[i - 1]] - end_bound) * index_growth_sign <= timedelta(0):
  170. end[i] = i + 1
  171. else:
  172. end[i] = end[i - 1]
  173. # right endpoint is open
  174. if not right_closed:
  175. end[i] -= 1
  176. return start, end
  177. class ExpandingIndexer(BaseIndexer):
  178. """Calculate expanding window bounds, mimicking df.expanding()"""
  179. @Appender(get_window_bounds_doc)
  180. def get_window_bounds(
  181. self,
  182. num_values: int = 0,
  183. min_periods: int | None = None,
  184. center: bool | None = None,
  185. closed: str | None = None,
  186. step: int | None = None,
  187. ) -> tuple[np.ndarray, np.ndarray]:
  188. return (
  189. np.zeros(num_values, dtype=np.int64),
  190. np.arange(1, num_values + 1, dtype=np.int64),
  191. )
  192. class FixedForwardWindowIndexer(BaseIndexer):
  193. """
  194. Creates window boundaries for fixed-length windows that include the current row.
  195. Examples
  196. --------
  197. >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
  198. >>> df
  199. B
  200. 0 0.0
  201. 1 1.0
  202. 2 2.0
  203. 3 NaN
  204. 4 4.0
  205. >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2)
  206. >>> df.rolling(window=indexer, min_periods=1).sum()
  207. B
  208. 0 1.0
  209. 1 3.0
  210. 2 2.0
  211. 3 4.0
  212. 4 4.0
  213. """
  214. @Appender(get_window_bounds_doc)
  215. def get_window_bounds(
  216. self,
  217. num_values: int = 0,
  218. min_periods: int | None = None,
  219. center: bool | None = None,
  220. closed: str | None = None,
  221. step: int | None = None,
  222. ) -> tuple[np.ndarray, np.ndarray]:
  223. if center:
  224. raise ValueError("Forward-looking windows can't have center=True")
  225. if closed is not None:
  226. raise ValueError(
  227. "Forward-looking windows don't support setting the closed argument"
  228. )
  229. if step is None:
  230. step = 1
  231. start = np.arange(0, num_values, step, dtype="int64")
  232. end = start + self.window_size
  233. if self.window_size:
  234. end = np.clip(end, 0, num_values)
  235. return start, end
  236. class GroupbyIndexer(BaseIndexer):
  237. """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""
  238. def __init__(
  239. self,
  240. index_array: np.ndarray | None = None,
  241. window_size: int | BaseIndexer = 0,
  242. groupby_indices: dict | None = None,
  243. window_indexer: type[BaseIndexer] = BaseIndexer,
  244. indexer_kwargs: dict | None = None,
  245. **kwargs,
  246. ) -> None:
  247. """
  248. Parameters
  249. ----------
  250. index_array : np.ndarray or None
  251. np.ndarray of the index of the original object that we are performing
  252. a chained groupby operation over. This index has been pre-sorted relative to
  253. the groups
  254. window_size : int or BaseIndexer
  255. window size during the windowing operation
  256. groupby_indices : dict or None
  257. dict of {group label: [positional index of rows belonging to the group]}
  258. window_indexer : BaseIndexer
  259. BaseIndexer class determining the start and end bounds of each group
  260. indexer_kwargs : dict or None
  261. Custom kwargs to be passed to window_indexer
  262. **kwargs :
  263. keyword arguments that will be available when get_window_bounds is called
  264. """
  265. self.groupby_indices = groupby_indices or {}
  266. self.window_indexer = window_indexer
  267. self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
  268. super().__init__(
  269. index_array=index_array,
  270. window_size=self.indexer_kwargs.pop("window_size", window_size),
  271. **kwargs,
  272. )
  273. @Appender(get_window_bounds_doc)
  274. def get_window_bounds(
  275. self,
  276. num_values: int = 0,
  277. min_periods: int | None = None,
  278. center: bool | None = None,
  279. closed: str | None = None,
  280. step: int | None = None,
  281. ) -> tuple[np.ndarray, np.ndarray]:
  282. # 1) For each group, get the indices that belong to the group
  283. # 2) Use the indices to calculate the start & end bounds of the window
  284. # 3) Append the window bounds in group order
  285. start_arrays = []
  286. end_arrays = []
  287. window_indices_start = 0
  288. for key, indices in self.groupby_indices.items():
  289. index_array: np.ndarray | None
  290. if self.index_array is not None:
  291. index_array = self.index_array.take(ensure_platform_int(indices))
  292. else:
  293. index_array = self.index_array
  294. indexer = self.window_indexer(
  295. index_array=index_array,
  296. window_size=self.window_size,
  297. **self.indexer_kwargs,
  298. )
  299. start, end = indexer.get_window_bounds(
  300. len(indices), min_periods, center, closed, step
  301. )
  302. start = start.astype(np.int64)
  303. end = end.astype(np.int64)
  304. assert len(start) == len(
  305. end
  306. ), "these should be equal in length from get_window_bounds"
  307. # Cannot use groupby_indices as they might not be monotonic with the object
  308. # we're rolling over
  309. window_indices = np.arange(
  310. window_indices_start, window_indices_start + len(indices)
  311. )
  312. window_indices_start += len(indices)
  313. # Extend as we'll be slicing window like [start, end)
  314. window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype(
  315. np.int64, copy=False
  316. )
  317. start_arrays.append(window_indices.take(ensure_platform_int(start)))
  318. end_arrays.append(window_indices.take(ensure_platform_int(end)))
  319. if len(start_arrays) == 0:
  320. return np.array([], dtype=np.int64), np.array([], dtype=np.int64)
  321. start = np.concatenate(start_arrays)
  322. end = np.concatenate(end_arrays)
  323. return start, end
  324. class ExponentialMovingWindowIndexer(BaseIndexer):
  325. """Calculate ewm window bounds (the entire window)"""
  326. @Appender(get_window_bounds_doc)
  327. def get_window_bounds(
  328. self,
  329. num_values: int = 0,
  330. min_periods: int | None = None,
  331. center: bool | None = None,
  332. closed: str | None = None,
  333. step: int | None = None,
  334. ) -> tuple[np.ndarray, np.ndarray]:
  335. return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)