test_pairwise.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. import warnings
  2. import numpy as np
  3. import pytest
  4. from pandas import (
  5. DataFrame,
  6. Index,
  7. MultiIndex,
  8. Series,
  9. date_range,
  10. )
  11. import pandas._testing as tm
  12. from pandas.core.algorithms import safe_sort
  13. @pytest.fixture(
  14. params=[
  15. DataFrame([[2, 4], [1, 2], [5, 2], [8, 1]], columns=[1, 0]),
  16. DataFrame([[2, 4], [1, 2], [5, 2], [8, 1]], columns=[1, 1]),
  17. DataFrame([[2, 4], [1, 2], [5, 2], [8, 1]], columns=["C", "C"]),
  18. DataFrame([[2, 4], [1, 2], [5, 2], [8, 1]], columns=[1.0, 0]),
  19. DataFrame([[2, 4], [1, 2], [5, 2], [8, 1]], columns=[0.0, 1]),
  20. DataFrame([[2, 4], [1, 2], [5, 2], [8, 1]], columns=["C", 1]),
  21. DataFrame([[2.0, 4.0], [1.0, 2.0], [5.0, 2.0], [8.0, 1.0]], columns=[1, 0.0]),
  22. DataFrame([[2, 4.0], [1, 2.0], [5, 2.0], [8, 1.0]], columns=[0, 1.0]),
  23. DataFrame([[2, 4], [1, 2], [5, 2], [8, 1.0]], columns=[1.0, "X"]),
  24. ]
  25. )
  26. def pairwise_frames(request):
  27. """Pairwise frames test_pairwise"""
  28. return request.param
  29. @pytest.fixture
  30. def pairwise_target_frame():
  31. """Pairwise target frame for test_pairwise"""
  32. return DataFrame([[2, 4], [1, 2], [5, 2], [8, 1]], columns=[0, 1])
  33. @pytest.fixture
  34. def pairwise_other_frame():
  35. """Pairwise other frame for test_pairwise"""
  36. return DataFrame(
  37. [[None, 1, 1], [None, 1, 2], [None, 3, 2], [None, 8, 1]],
  38. columns=["Y", "Z", "X"],
  39. )
  40. def test_rolling_cov(series):
  41. A = series
  42. B = A + np.random.randn(len(A))
  43. result = A.rolling(window=50, min_periods=25).cov(B)
  44. tm.assert_almost_equal(result[-1], np.cov(A[-50:], B[-50:])[0, 1])
  45. def test_rolling_corr(series):
  46. A = series
  47. B = A + np.random.randn(len(A))
  48. result = A.rolling(window=50, min_periods=25).corr(B)
  49. tm.assert_almost_equal(result[-1], np.corrcoef(A[-50:], B[-50:])[0, 1])
  50. # test for correct bias correction
  51. a = tm.makeTimeSeries()
  52. b = tm.makeTimeSeries()
  53. a[:5] = np.nan
  54. b[:10] = np.nan
  55. result = a.rolling(window=len(a), min_periods=1).corr(b)
  56. tm.assert_almost_equal(result[-1], a.corr(b))
  57. @pytest.mark.parametrize("func", ["cov", "corr"])
  58. def test_rolling_pairwise_cov_corr(func, frame):
  59. result = getattr(frame.rolling(window=10, min_periods=5), func)()
  60. result = result.loc[(slice(None), 1), 5]
  61. result.index = result.index.droplevel(1)
  62. expected = getattr(frame[1].rolling(window=10, min_periods=5), func)(frame[5])
  63. tm.assert_series_equal(result, expected, check_names=False)
  64. @pytest.mark.parametrize("method", ["corr", "cov"])
  65. def test_flex_binary_frame(method, frame):
  66. series = frame[1]
  67. res = getattr(series.rolling(window=10), method)(frame)
  68. res2 = getattr(frame.rolling(window=10), method)(series)
  69. exp = frame.apply(lambda x: getattr(series.rolling(window=10), method)(x))
  70. tm.assert_frame_equal(res, exp)
  71. tm.assert_frame_equal(res2, exp)
  72. frame2 = frame.copy()
  73. frame2 = DataFrame(
  74. np.random.randn(*frame2.shape), index=frame2.index, columns=frame2.columns
  75. )
  76. res3 = getattr(frame.rolling(window=10), method)(frame2)
  77. exp = DataFrame(
  78. {k: getattr(frame[k].rolling(window=10), method)(frame2[k]) for k in frame}
  79. )
  80. tm.assert_frame_equal(res3, exp)
  81. @pytest.mark.parametrize("window", range(7))
  82. def test_rolling_corr_with_zero_variance(window):
  83. # GH 18430
  84. s = Series(np.zeros(20))
  85. other = Series(np.arange(20))
  86. assert s.rolling(window=window).corr(other=other).isna().all()
  87. def test_corr_sanity():
  88. # GH 3155
  89. df = DataFrame(
  90. np.array(
  91. [
  92. [0.87024726, 0.18505595],
  93. [0.64355431, 0.3091617],
  94. [0.92372966, 0.50552513],
  95. [0.00203756, 0.04520709],
  96. [0.84780328, 0.33394331],
  97. [0.78369152, 0.63919667],
  98. ]
  99. )
  100. )
  101. res = df[0].rolling(5, center=True).corr(df[1])
  102. assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res)
  103. df = DataFrame(np.random.rand(30, 2))
  104. res = df[0].rolling(5, center=True).corr(df[1])
  105. assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res)
  106. def test_rolling_cov_diff_length():
  107. # GH 7512
  108. s1 = Series([1, 2, 3], index=[0, 1, 2])
  109. s2 = Series([1, 3], index=[0, 2])
  110. result = s1.rolling(window=3, min_periods=2).cov(s2)
  111. expected = Series([None, None, 2.0])
  112. tm.assert_series_equal(result, expected)
  113. s2a = Series([1, None, 3], index=[0, 1, 2])
  114. result = s1.rolling(window=3, min_periods=2).cov(s2a)
  115. tm.assert_series_equal(result, expected)
  116. def test_rolling_corr_diff_length():
  117. # GH 7512
  118. s1 = Series([1, 2, 3], index=[0, 1, 2])
  119. s2 = Series([1, 3], index=[0, 2])
  120. result = s1.rolling(window=3, min_periods=2).corr(s2)
  121. expected = Series([None, None, 1.0])
  122. tm.assert_series_equal(result, expected)
  123. s2a = Series([1, None, 3], index=[0, 1, 2])
  124. result = s1.rolling(window=3, min_periods=2).corr(s2a)
  125. tm.assert_series_equal(result, expected)
  126. @pytest.mark.parametrize(
  127. "f",
  128. [
  129. lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)),
  130. lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)),
  131. ],
  132. )
  133. def test_rolling_functions_window_non_shrinkage_binary(f):
  134. # corr/cov return a MI DataFrame
  135. df = DataFrame(
  136. [[1, 5], [3, 2], [3, 9], [-1, 0]],
  137. columns=Index(["A", "B"], name="foo"),
  138. index=Index(range(4), name="bar"),
  139. )
  140. df_expected = DataFrame(
  141. columns=Index(["A", "B"], name="foo"),
  142. index=MultiIndex.from_product([df.index, df.columns], names=["bar", "foo"]),
  143. dtype="float64",
  144. )
  145. df_result = f(df)
  146. tm.assert_frame_equal(df_result, df_expected)
  147. @pytest.mark.parametrize(
  148. "f",
  149. [
  150. lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)),
  151. lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)),
  152. ],
  153. )
  154. def test_moment_functions_zero_length_pairwise(f):
  155. df1 = DataFrame()
  156. df2 = DataFrame(columns=Index(["a"], name="foo"), index=Index([], name="bar"))
  157. df2["a"] = df2["a"].astype("float64")
  158. df1_expected = DataFrame(index=MultiIndex.from_product([df1.index, df1.columns]))
  159. df2_expected = DataFrame(
  160. index=MultiIndex.from_product([df2.index, df2.columns], names=["bar", "foo"]),
  161. columns=Index(["a"], name="foo"),
  162. dtype="float64",
  163. )
  164. df1_result = f(df1)
  165. tm.assert_frame_equal(df1_result, df1_expected)
  166. df2_result = f(df2)
  167. tm.assert_frame_equal(df2_result, df2_expected)
  168. class TestPairwise:
  169. # GH 7738
  170. @pytest.mark.parametrize("f", [lambda x: x.cov(), lambda x: x.corr()])
  171. def test_no_flex(self, pairwise_frames, pairwise_target_frame, f):
  172. # DataFrame methods (which do not call flex_binary_moment())
  173. result = f(pairwise_frames)
  174. tm.assert_index_equal(result.index, pairwise_frames.columns)
  175. tm.assert_index_equal(result.columns, pairwise_frames.columns)
  176. expected = f(pairwise_target_frame)
  177. # since we have sorted the results
  178. # we can only compare non-nans
  179. result = result.dropna().values
  180. expected = expected.dropna().values
  181. tm.assert_numpy_array_equal(result, expected, check_dtype=False)
  182. @pytest.mark.parametrize(
  183. "f",
  184. [
  185. lambda x: x.expanding().cov(pairwise=True),
  186. lambda x: x.expanding().corr(pairwise=True),
  187. lambda x: x.rolling(window=3).cov(pairwise=True),
  188. lambda x: x.rolling(window=3).corr(pairwise=True),
  189. lambda x: x.ewm(com=3).cov(pairwise=True),
  190. lambda x: x.ewm(com=3).corr(pairwise=True),
  191. ],
  192. )
  193. def test_pairwise_with_self(self, pairwise_frames, pairwise_target_frame, f):
  194. # DataFrame with itself, pairwise=True
  195. # note that we may construct the 1st level of the MI
  196. # in a non-monotonic way, so compare accordingly
  197. result = f(pairwise_frames)
  198. tm.assert_index_equal(
  199. result.index.levels[0], pairwise_frames.index, check_names=False
  200. )
  201. tm.assert_index_equal(
  202. safe_sort(result.index.levels[1]),
  203. safe_sort(pairwise_frames.columns.unique()),
  204. )
  205. tm.assert_index_equal(result.columns, pairwise_frames.columns)
  206. expected = f(pairwise_target_frame)
  207. # since we have sorted the results
  208. # we can only compare non-nans
  209. result = result.dropna().values
  210. expected = expected.dropna().values
  211. tm.assert_numpy_array_equal(result, expected, check_dtype=False)
  212. @pytest.mark.parametrize(
  213. "f",
  214. [
  215. lambda x: x.expanding().cov(pairwise=False),
  216. lambda x: x.expanding().corr(pairwise=False),
  217. lambda x: x.rolling(window=3).cov(pairwise=False),
  218. lambda x: x.rolling(window=3).corr(pairwise=False),
  219. lambda x: x.ewm(com=3).cov(pairwise=False),
  220. lambda x: x.ewm(com=3).corr(pairwise=False),
  221. ],
  222. )
  223. def test_no_pairwise_with_self(self, pairwise_frames, pairwise_target_frame, f):
  224. # DataFrame with itself, pairwise=False
  225. result = f(pairwise_frames)
  226. tm.assert_index_equal(result.index, pairwise_frames.index)
  227. tm.assert_index_equal(result.columns, pairwise_frames.columns)
  228. expected = f(pairwise_target_frame)
  229. # since we have sorted the results
  230. # we can only compare non-nans
  231. result = result.dropna().values
  232. expected = expected.dropna().values
  233. tm.assert_numpy_array_equal(result, expected, check_dtype=False)
  234. @pytest.mark.parametrize(
  235. "f",
  236. [
  237. lambda x, y: x.expanding().cov(y, pairwise=True),
  238. lambda x, y: x.expanding().corr(y, pairwise=True),
  239. lambda x, y: x.rolling(window=3).cov(y, pairwise=True),
  240. lambda x, y: x.rolling(window=3).corr(y, pairwise=True),
  241. lambda x, y: x.ewm(com=3).cov(y, pairwise=True),
  242. lambda x, y: x.ewm(com=3).corr(y, pairwise=True),
  243. ],
  244. )
  245. def test_pairwise_with_other(
  246. self, pairwise_frames, pairwise_target_frame, pairwise_other_frame, f
  247. ):
  248. # DataFrame with another DataFrame, pairwise=True
  249. result = f(pairwise_frames, pairwise_other_frame)
  250. tm.assert_index_equal(
  251. result.index.levels[0], pairwise_frames.index, check_names=False
  252. )
  253. tm.assert_index_equal(
  254. safe_sort(result.index.levels[1]),
  255. safe_sort(pairwise_other_frame.columns.unique()),
  256. )
  257. expected = f(pairwise_target_frame, pairwise_other_frame)
  258. # since we have sorted the results
  259. # we can only compare non-nans
  260. result = result.dropna().values
  261. expected = expected.dropna().values
  262. tm.assert_numpy_array_equal(result, expected, check_dtype=False)
  263. @pytest.mark.parametrize(
  264. "f",
  265. [
  266. lambda x, y: x.expanding().cov(y, pairwise=False),
  267. lambda x, y: x.expanding().corr(y, pairwise=False),
  268. lambda x, y: x.rolling(window=3).cov(y, pairwise=False),
  269. lambda x, y: x.rolling(window=3).corr(y, pairwise=False),
  270. lambda x, y: x.ewm(com=3).cov(y, pairwise=False),
  271. lambda x, y: x.ewm(com=3).corr(y, pairwise=False),
  272. ],
  273. )
  274. def test_no_pairwise_with_other(self, pairwise_frames, pairwise_other_frame, f):
  275. # DataFrame with another DataFrame, pairwise=False
  276. result = (
  277. f(pairwise_frames, pairwise_other_frame)
  278. if pairwise_frames.columns.is_unique
  279. else None
  280. )
  281. if result is not None:
  282. with warnings.catch_warnings(record=True):
  283. warnings.simplefilter("ignore", RuntimeWarning)
  284. # we can have int and str columns
  285. expected_index = pairwise_frames.index.union(pairwise_other_frame.index)
  286. expected_columns = pairwise_frames.columns.union(
  287. pairwise_other_frame.columns
  288. )
  289. tm.assert_index_equal(result.index, expected_index)
  290. tm.assert_index_equal(result.columns, expected_columns)
  291. else:
  292. with pytest.raises(ValueError, match="'arg1' columns are not unique"):
  293. f(pairwise_frames, pairwise_other_frame)
  294. with pytest.raises(ValueError, match="'arg2' columns are not unique"):
  295. f(pairwise_other_frame, pairwise_frames)
  296. @pytest.mark.parametrize(
  297. "f",
  298. [
  299. lambda x, y: x.expanding().cov(y),
  300. lambda x, y: x.expanding().corr(y),
  301. lambda x, y: x.rolling(window=3).cov(y),
  302. lambda x, y: x.rolling(window=3).corr(y),
  303. lambda x, y: x.ewm(com=3).cov(y),
  304. lambda x, y: x.ewm(com=3).corr(y),
  305. ],
  306. )
  307. def test_pairwise_with_series(self, pairwise_frames, pairwise_target_frame, f):
  308. # DataFrame with a Series
  309. result = f(pairwise_frames, Series([1, 1, 3, 8]))
  310. tm.assert_index_equal(result.index, pairwise_frames.index)
  311. tm.assert_index_equal(result.columns, pairwise_frames.columns)
  312. expected = f(pairwise_target_frame, Series([1, 1, 3, 8]))
  313. # since we have sorted the results
  314. # we can only compare non-nans
  315. result = result.dropna().values
  316. expected = expected.dropna().values
  317. tm.assert_numpy_array_equal(result, expected, check_dtype=False)
  318. result = f(Series([1, 1, 3, 8]), pairwise_frames)
  319. tm.assert_index_equal(result.index, pairwise_frames.index)
  320. tm.assert_index_equal(result.columns, pairwise_frames.columns)
  321. expected = f(Series([1, 1, 3, 8]), pairwise_target_frame)
  322. # since we have sorted the results
  323. # we can only compare non-nans
  324. result = result.dropna().values
  325. expected = expected.dropna().values
  326. tm.assert_numpy_array_equal(result, expected, check_dtype=False)
  327. def test_corr_freq_memory_error(self):
  328. # GH 31789
  329. s = Series(range(5), index=date_range("2020", periods=5))
  330. result = s.rolling("12H").corr(s)
  331. expected = Series([np.nan] * 5, index=date_range("2020", periods=5))
  332. tm.assert_series_equal(result, expected)
  333. def test_cov_mulittindex(self):
  334. # GH 34440
  335. columns = MultiIndex.from_product([list("ab"), list("xy"), list("AB")])
  336. index = range(3)
  337. df = DataFrame(np.arange(24).reshape(3, 8), index=index, columns=columns)
  338. result = df.ewm(alpha=0.1).cov()
  339. index = MultiIndex.from_product([range(3), list("ab"), list("xy"), list("AB")])
  340. columns = MultiIndex.from_product([list("ab"), list("xy"), list("AB")])
  341. expected = DataFrame(
  342. np.vstack(
  343. (
  344. np.full((8, 8), np.NaN),
  345. np.full((8, 8), 32.000000),
  346. np.full((8, 8), 63.881919),
  347. )
  348. ),
  349. index=index,
  350. columns=columns,
  351. )
  352. tm.assert_frame_equal(result, expected)
  353. def test_multindex_columns_pairwise_func(self):
  354. # GH 21157
  355. columns = MultiIndex.from_arrays([["M", "N"], ["P", "Q"]], names=["a", "b"])
  356. df = DataFrame(np.ones((5, 2)), columns=columns)
  357. result = df.rolling(3).corr()
  358. expected = DataFrame(
  359. np.nan,
  360. index=MultiIndex.from_arrays(
  361. [
  362. np.repeat(np.arange(5, dtype=np.int64), 2),
  363. ["M", "N"] * 5,
  364. ["P", "Q"] * 5,
  365. ],
  366. names=[None, "a", "b"],
  367. ),
  368. columns=columns,
  369. )
  370. tm.assert_frame_equal(result, expected)