common.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """Common utility functions for rolling operations"""
  2. from __future__ import annotations
  3. from collections import defaultdict
  4. from typing import cast
  5. import numpy as np
  6. from pandas.core.dtypes.generic import (
  7. ABCDataFrame,
  8. ABCSeries,
  9. )
  10. from pandas.core.indexes.api import MultiIndex
  11. def flex_binary_moment(arg1, arg2, f, pairwise: bool = False):
  12. if isinstance(arg1, ABCSeries) and isinstance(arg2, ABCSeries):
  13. X, Y = prep_binary(arg1, arg2)
  14. return f(X, Y)
  15. elif isinstance(arg1, ABCDataFrame):
  16. from pandas import DataFrame
  17. def dataframe_from_int_dict(data, frame_template) -> DataFrame:
  18. result = DataFrame(data, index=frame_template.index)
  19. if len(result.columns) > 0:
  20. result.columns = frame_template.columns[result.columns]
  21. else:
  22. result.columns = frame_template.columns.copy()
  23. return result
  24. results = {}
  25. if isinstance(arg2, ABCDataFrame):
  26. if pairwise is False:
  27. if arg1 is arg2:
  28. # special case in order to handle duplicate column names
  29. for i in range(len(arg1.columns)):
  30. results[i] = f(arg1.iloc[:, i], arg2.iloc[:, i])
  31. return dataframe_from_int_dict(results, arg1)
  32. else:
  33. if not arg1.columns.is_unique:
  34. raise ValueError("'arg1' columns are not unique")
  35. if not arg2.columns.is_unique:
  36. raise ValueError("'arg2' columns are not unique")
  37. X, Y = arg1.align(arg2, join="outer")
  38. X, Y = prep_binary(X, Y)
  39. res_columns = arg1.columns.union(arg2.columns)
  40. for col in res_columns:
  41. if col in X and col in Y:
  42. results[col] = f(X[col], Y[col])
  43. return DataFrame(results, index=X.index, columns=res_columns)
  44. elif pairwise is True:
  45. results = defaultdict(dict)
  46. for i in range(len(arg1.columns)):
  47. for j in range(len(arg2.columns)):
  48. if j < i and arg2 is arg1:
  49. # Symmetric case
  50. results[i][j] = results[j][i]
  51. else:
  52. results[i][j] = f(
  53. *prep_binary(arg1.iloc[:, i], arg2.iloc[:, j])
  54. )
  55. from pandas import concat
  56. result_index = arg1.index.union(arg2.index)
  57. if len(result_index):
  58. # construct result frame
  59. result = concat(
  60. [
  61. concat(
  62. [results[i][j] for j in range(len(arg2.columns))],
  63. ignore_index=True,
  64. )
  65. for i in range(len(arg1.columns))
  66. ],
  67. ignore_index=True,
  68. axis=1,
  69. )
  70. result.columns = arg1.columns
  71. # set the index and reorder
  72. if arg2.columns.nlevels > 1:
  73. # mypy needs to know columns is a MultiIndex, Index doesn't
  74. # have levels attribute
  75. arg2.columns = cast(MultiIndex, arg2.columns)
  76. # GH 21157: Equivalent to MultiIndex.from_product(
  77. # [result_index], <unique combinations of arg2.columns.levels>,
  78. # )
  79. # A normal MultiIndex.from_product will produce too many
  80. # combinations.
  81. result_level = np.tile(
  82. result_index, len(result) // len(result_index)
  83. )
  84. arg2_levels = (
  85. np.repeat(
  86. arg2.columns.get_level_values(i),
  87. len(result) // len(arg2.columns),
  88. )
  89. for i in range(arg2.columns.nlevels)
  90. )
  91. result_names = list(arg2.columns.names) + [result_index.name]
  92. result.index = MultiIndex.from_arrays(
  93. [*arg2_levels, result_level], names=result_names
  94. )
  95. # GH 34440
  96. num_levels = len(result.index.levels)
  97. new_order = [num_levels - 1] + list(range(num_levels - 1))
  98. result = result.reorder_levels(new_order).sort_index()
  99. else:
  100. result.index = MultiIndex.from_product(
  101. [range(len(arg2.columns)), range(len(result_index))]
  102. )
  103. result = result.swaplevel(1, 0).sort_index()
  104. result.index = MultiIndex.from_product(
  105. [result_index] + [arg2.columns]
  106. )
  107. else:
  108. # empty result
  109. result = DataFrame(
  110. index=MultiIndex(
  111. levels=[arg1.index, arg2.columns], codes=[[], []]
  112. ),
  113. columns=arg2.columns,
  114. dtype="float64",
  115. )
  116. # reset our index names to arg1 names
  117. # reset our column names to arg2 names
  118. # careful not to mutate the original names
  119. result.columns = result.columns.set_names(arg1.columns.names)
  120. result.index = result.index.set_names(
  121. result_index.names + arg2.columns.names
  122. )
  123. return result
  124. else:
  125. results = {
  126. i: f(*prep_binary(arg1.iloc[:, i], arg2))
  127. for i in range(len(arg1.columns))
  128. }
  129. return dataframe_from_int_dict(results, arg1)
  130. else:
  131. return flex_binary_moment(arg2, arg1, f)
  132. def zsqrt(x):
  133. with np.errstate(all="ignore"):
  134. result = np.sqrt(x)
  135. mask = x < 0
  136. if isinstance(x, ABCDataFrame):
  137. if mask._values.any():
  138. result[mask] = 0
  139. else:
  140. if mask.any():
  141. result[mask] = 0
  142. return result
  143. def prep_binary(arg1, arg2):
  144. # mask out values, this also makes a common index...
  145. X = arg1 + 0 * arg2
  146. Y = arg2 + 0 * arg1
  147. return X, Y