test_cython_aggregations.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. from functools import partial
  2. import sys
  3. import numpy as np
  4. import pytest
  5. import pandas._libs.window.aggregations as window_aggregations
  6. from pandas import Series
  7. import pandas._testing as tm
  8. def _get_rolling_aggregations():
  9. # list pairs of name and function
  10. # each function has this signature:
  11. # (const float64_t[:] values, ndarray[int64_t] start,
  12. # ndarray[int64_t] end, int64_t minp) -> np.ndarray
  13. named_roll_aggs = (
  14. [
  15. ("roll_sum", window_aggregations.roll_sum),
  16. ("roll_mean", window_aggregations.roll_mean),
  17. ]
  18. + [
  19. (f"roll_var({ddof})", partial(window_aggregations.roll_var, ddof=ddof))
  20. for ddof in [0, 1]
  21. ]
  22. + [
  23. ("roll_skew", window_aggregations.roll_skew),
  24. ("roll_kurt", window_aggregations.roll_kurt),
  25. ("roll_median_c", window_aggregations.roll_median_c),
  26. ("roll_max", window_aggregations.roll_max),
  27. ("roll_min", window_aggregations.roll_min),
  28. ]
  29. + [
  30. (
  31. f"roll_quantile({quantile},{interpolation})",
  32. partial(
  33. window_aggregations.roll_quantile,
  34. quantile=quantile,
  35. interpolation=interpolation,
  36. ),
  37. )
  38. for quantile in [0.0001, 0.5, 0.9999]
  39. for interpolation in window_aggregations.interpolation_types
  40. ]
  41. + [
  42. (
  43. f"roll_rank({percentile},{method},{ascending})",
  44. partial(
  45. window_aggregations.roll_rank,
  46. percentile=percentile,
  47. method=method,
  48. ascending=ascending,
  49. ),
  50. )
  51. for percentile in [True, False]
  52. for method in window_aggregations.rolling_rank_tiebreakers.keys()
  53. for ascending in [True, False]
  54. ]
  55. )
  56. # unzip to a list of 2 tuples, names and functions
  57. unzipped = list(zip(*named_roll_aggs))
  58. return {"ids": unzipped[0], "params": unzipped[1]}
  59. _rolling_aggregations = _get_rolling_aggregations()
  60. @pytest.fixture(
  61. params=_rolling_aggregations["params"], ids=_rolling_aggregations["ids"]
  62. )
  63. def rolling_aggregation(request):
  64. """Make a rolling aggregation function as fixture."""
  65. return request.param
  66. def test_rolling_aggregation_boundary_consistency(rolling_aggregation):
  67. # GH-45647
  68. minp, step, width, size, selection = 0, 1, 3, 11, [2, 7]
  69. values = np.arange(1, 1 + size, dtype=np.float64)
  70. end = np.arange(width, size, step, dtype=np.int64)
  71. start = end - width
  72. selarr = np.array(selection, dtype=np.int32)
  73. result = Series(rolling_aggregation(values, start[selarr], end[selarr], minp))
  74. expected = Series(rolling_aggregation(values, start, end, minp)[selarr])
  75. tm.assert_equal(expected, result)
  76. def test_rolling_aggregation_with_unused_elements(rolling_aggregation):
  77. # GH-45647
  78. minp, width = 0, 5 # width at least 4 for kurt
  79. size = 2 * width + 5
  80. values = np.arange(1, size + 1, dtype=np.float64)
  81. values[width : width + 2] = sys.float_info.min
  82. values[width + 2] = np.nan
  83. values[width + 3 : width + 5] = sys.float_info.max
  84. start = np.array([0, size - width], dtype=np.int64)
  85. end = np.array([width, size], dtype=np.int64)
  86. loc = np.array(
  87. [j for i in range(len(start)) for j in range(start[i], end[i])],
  88. dtype=np.int32,
  89. )
  90. result = Series(rolling_aggregation(values, start, end, minp))
  91. compact_values = np.array(values[loc], dtype=np.float64)
  92. compact_start = np.arange(0, len(start) * width, width, dtype=np.int64)
  93. compact_end = compact_start + width
  94. expected = Series(
  95. rolling_aggregation(compact_values, compact_start, compact_end, minp)
  96. )
  97. assert np.isfinite(expected.values).all(), "Not all expected values are finite"
  98. tm.assert_equal(expected, result)