test_numba.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. import numpy as np
  2. import pytest
  3. from pandas.compat import (
  4. is_ci_environment,
  5. is_platform_mac,
  6. is_platform_windows,
  7. )
  8. from pandas.errors import NumbaUtilError
  9. import pandas.util._test_decorators as td
  10. from pandas import (
  11. DataFrame,
  12. Series,
  13. option_context,
  14. to_datetime,
  15. )
  16. import pandas._testing as tm
  17. # TODO(GH#44584): Mark these as pytest.mark.single_cpu
  18. pytestmark = pytest.mark.skipif(
  19. is_ci_environment() and (is_platform_windows() or is_platform_mac()),
  20. reason="On GHA CI, Windows can fail with "
  21. "'Windows fatal exception: stack overflow' "
  22. "and macOS can timeout",
  23. )
  24. @pytest.fixture(params=["single", "table"])
  25. def method(request):
  26. """method keyword in rolling/expanding/ewm constructor"""
  27. return request.param
  28. @pytest.fixture(
  29. params=[
  30. ["sum", {}],
  31. ["mean", {}],
  32. ["median", {}],
  33. ["max", {}],
  34. ["min", {}],
  35. ["var", {}],
  36. ["var", {"ddof": 0}],
  37. ["std", {}],
  38. ["std", {"ddof": 0}],
  39. ]
  40. )
  41. def arithmetic_numba_supported_operators(request):
  42. return request.param
  43. @td.skip_if_no("numba")
  44. @pytest.mark.filterwarnings("ignore")
  45. # Filter warnings when parallel=True and the function can't be parallelized by Numba
  46. class TestEngine:
  47. @pytest.mark.parametrize("jit", [True, False])
  48. def test_numba_vs_cython_apply(self, jit, nogil, parallel, nopython, center, step):
  49. def f(x, *args):
  50. arg_sum = 0
  51. for arg in args:
  52. arg_sum += arg
  53. return np.mean(x) + arg_sum
  54. if jit:
  55. import numba
  56. f = numba.jit(f)
  57. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  58. args = (2,)
  59. s = Series(range(10))
  60. result = s.rolling(2, center=center, step=step).apply(
  61. f, args=args, engine="numba", engine_kwargs=engine_kwargs, raw=True
  62. )
  63. expected = s.rolling(2, center=center, step=step).apply(
  64. f, engine="cython", args=args, raw=True
  65. )
  66. tm.assert_series_equal(result, expected)
  67. @pytest.mark.parametrize(
  68. "data",
  69. [
  70. DataFrame(np.eye(5)),
  71. DataFrame(
  72. [
  73. [5, 7, 7, 7, np.nan, np.inf, 4, 3, 3, 3],
  74. [5, 7, 7, 7, np.nan, np.inf, 7, 3, 3, 3],
  75. [np.nan, np.nan, 5, 6, 7, 5, 5, 5, 5, 5],
  76. ]
  77. ).T,
  78. Series(range(5), name="foo"),
  79. Series([20, 10, 10, np.inf, 1, 1, 2, 3]),
  80. Series([20, 10, 10, np.nan, 10, 1, 2, 3]),
  81. ],
  82. )
  83. def test_numba_vs_cython_rolling_methods(
  84. self,
  85. data,
  86. nogil,
  87. parallel,
  88. nopython,
  89. arithmetic_numba_supported_operators,
  90. step,
  91. ):
  92. method, kwargs = arithmetic_numba_supported_operators
  93. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  94. roll = data.rolling(3, step=step)
  95. result = getattr(roll, method)(
  96. engine="numba", engine_kwargs=engine_kwargs, **kwargs
  97. )
  98. expected = getattr(roll, method)(engine="cython", **kwargs)
  99. tm.assert_equal(result, expected)
  100. @pytest.mark.parametrize(
  101. "data", [DataFrame(np.eye(5)), Series(range(5), name="foo")]
  102. )
  103. def test_numba_vs_cython_expanding_methods(
  104. self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators
  105. ):
  106. method, kwargs = arithmetic_numba_supported_operators
  107. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  108. data = DataFrame(np.eye(5))
  109. expand = data.expanding()
  110. result = getattr(expand, method)(
  111. engine="numba", engine_kwargs=engine_kwargs, **kwargs
  112. )
  113. expected = getattr(expand, method)(engine="cython", **kwargs)
  114. tm.assert_equal(result, expected)
  115. @pytest.mark.parametrize("jit", [True, False])
  116. def test_cache_apply(self, jit, nogil, parallel, nopython, step):
  117. # Test that the functions are cached correctly if we switch functions
  118. def func_1(x):
  119. return np.mean(x) + 4
  120. def func_2(x):
  121. return np.std(x) * 5
  122. if jit:
  123. import numba
  124. func_1 = numba.jit(func_1)
  125. func_2 = numba.jit(func_2)
  126. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  127. roll = Series(range(10)).rolling(2, step=step)
  128. result = roll.apply(
  129. func_1, engine="numba", engine_kwargs=engine_kwargs, raw=True
  130. )
  131. expected = roll.apply(func_1, engine="cython", raw=True)
  132. tm.assert_series_equal(result, expected)
  133. result = roll.apply(
  134. func_2, engine="numba", engine_kwargs=engine_kwargs, raw=True
  135. )
  136. expected = roll.apply(func_2, engine="cython", raw=True)
  137. tm.assert_series_equal(result, expected)
  138. # This run should use the cached func_1
  139. result = roll.apply(
  140. func_1, engine="numba", engine_kwargs=engine_kwargs, raw=True
  141. )
  142. expected = roll.apply(func_1, engine="cython", raw=True)
  143. tm.assert_series_equal(result, expected)
  144. @pytest.mark.parametrize(
  145. "window,window_kwargs",
  146. [
  147. ["rolling", {"window": 3, "min_periods": 0}],
  148. ["expanding", {}],
  149. ],
  150. )
  151. def test_dont_cache_args(
  152. self, window, window_kwargs, nogil, parallel, nopython, method
  153. ):
  154. # GH 42287
  155. def add(values, x):
  156. return np.sum(values) + x
  157. engine_kwargs = {"nopython": nopython, "nogil": nogil, "parallel": parallel}
  158. df = DataFrame({"value": [0, 0, 0]})
  159. result = getattr(df, window)(method=method, **window_kwargs).apply(
  160. add, raw=True, engine="numba", engine_kwargs=engine_kwargs, args=(1,)
  161. )
  162. expected = DataFrame({"value": [1.0, 1.0, 1.0]})
  163. tm.assert_frame_equal(result, expected)
  164. result = getattr(df, window)(method=method, **window_kwargs).apply(
  165. add, raw=True, engine="numba", engine_kwargs=engine_kwargs, args=(2,)
  166. )
  167. expected = DataFrame({"value": [2.0, 2.0, 2.0]})
  168. tm.assert_frame_equal(result, expected)
  169. def test_dont_cache_engine_kwargs(self):
  170. # If the user passes a different set of engine_kwargs don't return the same
  171. # jitted function
  172. nogil = False
  173. parallel = True
  174. nopython = True
  175. def func(x):
  176. return nogil + parallel + nopython
  177. engine_kwargs = {"nopython": nopython, "nogil": nogil, "parallel": parallel}
  178. df = DataFrame({"value": [0, 0, 0]})
  179. result = df.rolling(1).apply(
  180. func, raw=True, engine="numba", engine_kwargs=engine_kwargs
  181. )
  182. expected = DataFrame({"value": [2.0, 2.0, 2.0]})
  183. tm.assert_frame_equal(result, expected)
  184. parallel = False
  185. engine_kwargs = {"nopython": nopython, "nogil": nogil, "parallel": parallel}
  186. result = df.rolling(1).apply(
  187. func, raw=True, engine="numba", engine_kwargs=engine_kwargs
  188. )
  189. expected = DataFrame({"value": [1.0, 1.0, 1.0]})
  190. tm.assert_frame_equal(result, expected)
  191. @td.skip_if_no("numba")
  192. class TestEWM:
  193. @pytest.mark.parametrize(
  194. "grouper", [lambda x: x, lambda x: x.groupby("A")], ids=["None", "groupby"]
  195. )
  196. @pytest.mark.parametrize("method", ["mean", "sum"])
  197. def test_invalid_engine(self, grouper, method):
  198. df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)})
  199. with pytest.raises(ValueError, match="engine must be either"):
  200. getattr(grouper(df).ewm(com=1.0), method)(engine="foo")
  201. @pytest.mark.parametrize(
  202. "grouper", [lambda x: x, lambda x: x.groupby("A")], ids=["None", "groupby"]
  203. )
  204. @pytest.mark.parametrize("method", ["mean", "sum"])
  205. def test_invalid_engine_kwargs(self, grouper, method):
  206. df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)})
  207. with pytest.raises(ValueError, match="cython engine does not"):
  208. getattr(grouper(df).ewm(com=1.0), method)(
  209. engine="cython", engine_kwargs={"nopython": True}
  210. )
  211. @pytest.mark.parametrize("grouper", ["None", "groupby"])
  212. @pytest.mark.parametrize("method", ["mean", "sum"])
  213. def test_cython_vs_numba(
  214. self, grouper, method, nogil, parallel, nopython, ignore_na, adjust
  215. ):
  216. df = DataFrame({"B": range(4)})
  217. if grouper == "None":
  218. grouper = lambda x: x
  219. else:
  220. df["A"] = ["a", "b", "a", "b"]
  221. grouper = lambda x: x.groupby("A")
  222. if method == "sum":
  223. adjust = True
  224. ewm = grouper(df).ewm(com=1.0, adjust=adjust, ignore_na=ignore_na)
  225. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  226. result = getattr(ewm, method)(engine="numba", engine_kwargs=engine_kwargs)
  227. expected = getattr(ewm, method)(engine="cython")
  228. tm.assert_frame_equal(result, expected)
  229. @pytest.mark.parametrize("grouper", ["None", "groupby"])
  230. def test_cython_vs_numba_times(self, grouper, nogil, parallel, nopython, ignore_na):
  231. # GH 40951
  232. df = DataFrame({"B": [0, 0, 1, 1, 2, 2]})
  233. if grouper == "None":
  234. grouper = lambda x: x
  235. else:
  236. grouper = lambda x: x.groupby("A")
  237. df["A"] = ["a", "b", "a", "b", "b", "a"]
  238. halflife = "23 days"
  239. times = to_datetime(
  240. [
  241. "2020-01-01",
  242. "2020-01-01",
  243. "2020-01-02",
  244. "2020-01-10",
  245. "2020-02-23",
  246. "2020-01-03",
  247. ]
  248. )
  249. ewm = grouper(df).ewm(
  250. halflife=halflife, adjust=True, ignore_na=ignore_na, times=times
  251. )
  252. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  253. result = ewm.mean(engine="numba", engine_kwargs=engine_kwargs)
  254. expected = ewm.mean(engine="cython")
  255. tm.assert_frame_equal(result, expected)
  256. @td.skip_if_no("numba")
  257. def test_use_global_config():
  258. def f(x):
  259. return np.mean(x) + 2
  260. s = Series(range(10))
  261. with option_context("compute.use_numba", True):
  262. result = s.rolling(2).apply(f, engine=None, raw=True)
  263. expected = s.rolling(2).apply(f, engine="numba", raw=True)
  264. tm.assert_series_equal(expected, result)
  265. @td.skip_if_no("numba")
  266. def test_invalid_kwargs_nopython():
  267. with pytest.raises(NumbaUtilError, match="numba does not support kwargs with"):
  268. Series(range(1)).rolling(1).apply(
  269. lambda x: x, kwargs={"a": 1}, engine="numba", raw=True
  270. )
  271. @td.skip_if_no("numba")
  272. @pytest.mark.slow
  273. @pytest.mark.filterwarnings("ignore")
  274. # Filter warnings when parallel=True and the function can't be parallelized by Numba
  275. class TestTableMethod:
  276. def test_table_series_valueerror(self):
  277. def f(x):
  278. return np.sum(x, axis=0) + 1
  279. with pytest.raises(
  280. ValueError, match="method='table' not applicable for Series objects."
  281. ):
  282. Series(range(1)).rolling(1, method="table").apply(
  283. f, engine="numba", raw=True
  284. )
  285. def test_table_method_rolling_methods(
  286. self,
  287. axis,
  288. nogil,
  289. parallel,
  290. nopython,
  291. arithmetic_numba_supported_operators,
  292. step,
  293. ):
  294. method, kwargs = arithmetic_numba_supported_operators
  295. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  296. df = DataFrame(np.eye(3))
  297. roll_table = df.rolling(2, method="table", axis=axis, min_periods=0, step=step)
  298. if method in ("var", "std"):
  299. with pytest.raises(NotImplementedError, match=f"{method} not supported"):
  300. getattr(roll_table, method)(
  301. engine_kwargs=engine_kwargs, engine="numba", **kwargs
  302. )
  303. else:
  304. roll_single = df.rolling(
  305. 2, method="single", axis=axis, min_periods=0, step=step
  306. )
  307. result = getattr(roll_table, method)(
  308. engine_kwargs=engine_kwargs, engine="numba", **kwargs
  309. )
  310. expected = getattr(roll_single, method)(
  311. engine_kwargs=engine_kwargs, engine="numba", **kwargs
  312. )
  313. tm.assert_frame_equal(result, expected)
  314. def test_table_method_rolling_apply(self, axis, nogil, parallel, nopython, step):
  315. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  316. def f(x):
  317. return np.sum(x, axis=0) + 1
  318. df = DataFrame(np.eye(3))
  319. result = df.rolling(
  320. 2, method="table", axis=axis, min_periods=0, step=step
  321. ).apply(f, raw=True, engine_kwargs=engine_kwargs, engine="numba")
  322. expected = df.rolling(
  323. 2, method="single", axis=axis, min_periods=0, step=step
  324. ).apply(f, raw=True, engine_kwargs=engine_kwargs, engine="numba")
  325. tm.assert_frame_equal(result, expected)
  326. def test_table_method_rolling_weighted_mean(self, step):
  327. def weighted_mean(x):
  328. arr = np.ones((1, x.shape[1]))
  329. arr[:, :2] = (x[:, :2] * x[:, 2]).sum(axis=0) / x[:, 2].sum()
  330. return arr
  331. df = DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]])
  332. result = df.rolling(2, method="table", min_periods=0, step=step).apply(
  333. weighted_mean, raw=True, engine="numba"
  334. )
  335. expected = DataFrame(
  336. [
  337. [1.0, 2.0, 1.0],
  338. [1.8, 2.0, 1.0],
  339. [3.333333, 2.333333, 1.0],
  340. [1.555556, 7, 1.0],
  341. ]
  342. )[::step]
  343. tm.assert_frame_equal(result, expected)
  344. def test_table_method_expanding_apply(self, axis, nogil, parallel, nopython):
  345. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  346. def f(x):
  347. return np.sum(x, axis=0) + 1
  348. df = DataFrame(np.eye(3))
  349. result = df.expanding(method="table", axis=axis).apply(
  350. f, raw=True, engine_kwargs=engine_kwargs, engine="numba"
  351. )
  352. expected = df.expanding(method="single", axis=axis).apply(
  353. f, raw=True, engine_kwargs=engine_kwargs, engine="numba"
  354. )
  355. tm.assert_frame_equal(result, expected)
  356. def test_table_method_expanding_methods(
  357. self, axis, nogil, parallel, nopython, arithmetic_numba_supported_operators
  358. ):
  359. method, kwargs = arithmetic_numba_supported_operators
  360. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  361. df = DataFrame(np.eye(3))
  362. expand_table = df.expanding(method="table", axis=axis)
  363. if method in ("var", "std"):
  364. with pytest.raises(NotImplementedError, match=f"{method} not supported"):
  365. getattr(expand_table, method)(
  366. engine_kwargs=engine_kwargs, engine="numba", **kwargs
  367. )
  368. else:
  369. expand_single = df.expanding(method="single", axis=axis)
  370. result = getattr(expand_table, method)(
  371. engine_kwargs=engine_kwargs, engine="numba", **kwargs
  372. )
  373. expected = getattr(expand_single, method)(
  374. engine_kwargs=engine_kwargs, engine="numba", **kwargs
  375. )
  376. tm.assert_frame_equal(result, expected)
  377. @pytest.mark.parametrize("data", [np.eye(3), np.ones((2, 3)), np.ones((3, 2))])
  378. @pytest.mark.parametrize("method", ["mean", "sum"])
  379. def test_table_method_ewm(self, data, method, axis, nogil, parallel, nopython):
  380. engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
  381. df = DataFrame(data)
  382. result = getattr(df.ewm(com=1, method="table", axis=axis), method)(
  383. engine_kwargs=engine_kwargs, engine="numba"
  384. )
  385. expected = getattr(df.ewm(com=1, method="single", axis=axis), method)(
  386. engine_kwargs=engine_kwargs, engine="numba"
  387. )
  388. tm.assert_frame_equal(result, expected)