test_setops.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import numpy as np
  2. import pytest
  3. import pandas as pd
  4. from pandas import (
  5. Index,
  6. TimedeltaIndex,
  7. timedelta_range,
  8. )
  9. import pandas._testing as tm
  10. from pandas.tseries.offsets import Hour
  11. class TestTimedeltaIndex:
  12. def test_union(self):
  13. i1 = timedelta_range("1day", periods=5)
  14. i2 = timedelta_range("3day", periods=5)
  15. result = i1.union(i2)
  16. expected = timedelta_range("1day", periods=7)
  17. tm.assert_index_equal(result, expected)
  18. i1 = Index(np.arange(0, 20, 2, dtype=np.int64))
  19. i2 = timedelta_range(start="1 day", periods=10, freq="D")
  20. i1.union(i2) # Works
  21. i2.union(i1) # Fails with "AttributeError: can't set attribute"
  22. def test_union_sort_false(self):
  23. tdi = timedelta_range("1day", periods=5)
  24. left = tdi[3:]
  25. right = tdi[:3]
  26. # Check that we are testing the desired code path
  27. assert left._can_fast_union(right)
  28. result = left.union(right)
  29. tm.assert_index_equal(result, tdi)
  30. result = left.union(right, sort=False)
  31. expected = TimedeltaIndex(["4 Days", "5 Days", "1 Days", "2 Day", "3 Days"])
  32. tm.assert_index_equal(result, expected)
  33. def test_union_coverage(self):
  34. idx = TimedeltaIndex(["3d", "1d", "2d"])
  35. ordered = TimedeltaIndex(idx.sort_values(), freq="infer")
  36. result = ordered.union(idx)
  37. tm.assert_index_equal(result, ordered)
  38. result = ordered[:0].union(ordered)
  39. tm.assert_index_equal(result, ordered)
  40. assert result.freq == ordered.freq
  41. def test_union_bug_1730(self):
  42. rng_a = timedelta_range("1 day", periods=4, freq="3H")
  43. rng_b = timedelta_range("1 day", periods=4, freq="4H")
  44. result = rng_a.union(rng_b)
  45. exp = TimedeltaIndex(sorted(set(rng_a) | set(rng_b)))
  46. tm.assert_index_equal(result, exp)
  47. def test_union_bug_1745(self):
  48. left = TimedeltaIndex(["1 day 15:19:49.695000"])
  49. right = TimedeltaIndex(
  50. ["2 day 13:04:21.322000", "1 day 15:27:24.873000", "1 day 15:31:05.350000"]
  51. )
  52. result = left.union(right)
  53. exp = TimedeltaIndex(sorted(set(left) | set(right)))
  54. tm.assert_index_equal(result, exp)
  55. def test_union_bug_4564(self):
  56. left = timedelta_range("1 day", "30d")
  57. right = left + pd.offsets.Minute(15)
  58. result = left.union(right)
  59. exp = TimedeltaIndex(sorted(set(left) | set(right)))
  60. tm.assert_index_equal(result, exp)
  61. def test_union_freq_infer(self):
  62. # When taking the union of two TimedeltaIndexes, we infer
  63. # a freq even if the arguments don't have freq. This matches
  64. # DatetimeIndex behavior.
  65. tdi = timedelta_range("1 Day", periods=5)
  66. left = tdi[[0, 1, 3, 4]]
  67. right = tdi[[2, 3, 1]]
  68. assert left.freq is None
  69. assert right.freq is None
  70. result = left.union(right)
  71. tm.assert_index_equal(result, tdi)
  72. assert result.freq == "D"
  73. def test_intersection_bug_1708(self):
  74. index_1 = timedelta_range("1 day", periods=4, freq="h")
  75. index_2 = index_1 + pd.offsets.Hour(5)
  76. result = index_1.intersection(index_2)
  77. assert len(result) == 0
  78. index_1 = timedelta_range("1 day", periods=4, freq="h")
  79. index_2 = index_1 + pd.offsets.Hour(1)
  80. result = index_1.intersection(index_2)
  81. expected = timedelta_range("1 day 01:00:00", periods=3, freq="h")
  82. tm.assert_index_equal(result, expected)
  83. assert result.freq == expected.freq
  84. def test_intersection_equal(self, sort):
  85. # GH 24471 Test intersection outcome given the sort keyword
  86. # for equal indices intersection should return the original index
  87. first = timedelta_range("1 day", periods=4, freq="h")
  88. second = timedelta_range("1 day", periods=4, freq="h")
  89. intersect = first.intersection(second, sort=sort)
  90. if sort is None:
  91. tm.assert_index_equal(intersect, second.sort_values())
  92. assert tm.equalContents(intersect, second)
  93. # Corner cases
  94. inter = first.intersection(first, sort=sort)
  95. assert inter is first
  96. @pytest.mark.parametrize("period_1, period_2", [(0, 4), (4, 0)])
  97. def test_intersection_zero_length(self, period_1, period_2, sort):
  98. # GH 24471 test for non overlap the intersection should be zero length
  99. index_1 = timedelta_range("1 day", periods=period_1, freq="h")
  100. index_2 = timedelta_range("1 day", periods=period_2, freq="h")
  101. expected = timedelta_range("1 day", periods=0, freq="h")
  102. result = index_1.intersection(index_2, sort=sort)
  103. tm.assert_index_equal(result, expected)
  104. def test_zero_length_input_index(self, sort):
  105. # GH 24966 test for 0-len intersections are copied
  106. index_1 = timedelta_range("1 day", periods=0, freq="h")
  107. index_2 = timedelta_range("1 day", periods=3, freq="h")
  108. result = index_1.intersection(index_2, sort=sort)
  109. assert index_1 is not result
  110. assert index_2 is not result
  111. tm.assert_copy(result, index_1)
  112. @pytest.mark.parametrize(
  113. "rng, expected",
  114. # if target has the same name, it is preserved
  115. [
  116. (
  117. timedelta_range("1 day", periods=5, freq="h", name="idx"),
  118. timedelta_range("1 day", periods=4, freq="h", name="idx"),
  119. ),
  120. # if target name is different, it will be reset
  121. (
  122. timedelta_range("1 day", periods=5, freq="h", name="other"),
  123. timedelta_range("1 day", periods=4, freq="h", name=None),
  124. ),
  125. # if no overlap exists return empty index
  126. (
  127. timedelta_range("1 day", periods=10, freq="h", name="idx")[5:],
  128. TimedeltaIndex([], freq="h", name="idx"),
  129. ),
  130. ],
  131. )
  132. def test_intersection(self, rng, expected, sort):
  133. # GH 4690 (with tz)
  134. base = timedelta_range("1 day", periods=4, freq="h", name="idx")
  135. result = base.intersection(rng, sort=sort)
  136. if sort is None:
  137. expected = expected.sort_values()
  138. tm.assert_index_equal(result, expected)
  139. assert result.name == expected.name
  140. assert result.freq == expected.freq
  141. @pytest.mark.parametrize(
  142. "rng, expected",
  143. # part intersection works
  144. [
  145. (
  146. TimedeltaIndex(["5 hour", "2 hour", "4 hour", "9 hour"], name="idx"),
  147. TimedeltaIndex(["2 hour", "4 hour"], name="idx"),
  148. ),
  149. # reordered part intersection
  150. (
  151. TimedeltaIndex(["2 hour", "5 hour", "5 hour", "1 hour"], name="other"),
  152. TimedeltaIndex(["1 hour", "2 hour"], name=None),
  153. ),
  154. # reversed index
  155. (
  156. TimedeltaIndex(["1 hour", "2 hour", "4 hour", "3 hour"], name="idx")[
  157. ::-1
  158. ],
  159. TimedeltaIndex(["1 hour", "2 hour", "4 hour", "3 hour"], name="idx"),
  160. ),
  161. ],
  162. )
  163. def test_intersection_non_monotonic(self, rng, expected, sort):
  164. # 24471 non-monotonic
  165. base = TimedeltaIndex(["1 hour", "2 hour", "4 hour", "3 hour"], name="idx")
  166. result = base.intersection(rng, sort=sort)
  167. if sort is None:
  168. expected = expected.sort_values()
  169. tm.assert_index_equal(result, expected)
  170. assert result.name == expected.name
  171. # if reversed order, frequency is still the same
  172. if all(base == rng[::-1]) and sort is None:
  173. assert isinstance(result.freq, Hour)
  174. else:
  175. assert result.freq is None
  176. class TestTimedeltaIndexDifference:
  177. def test_difference_freq(self, sort):
  178. # GH14323: Difference of TimedeltaIndex should not preserve frequency
  179. index = timedelta_range("0 days", "5 days", freq="D")
  180. other = timedelta_range("1 days", "4 days", freq="D")
  181. expected = TimedeltaIndex(["0 days", "5 days"], freq=None)
  182. idx_diff = index.difference(other, sort)
  183. tm.assert_index_equal(idx_diff, expected)
  184. tm.assert_attr_equal("freq", idx_diff, expected)
  185. other = timedelta_range("2 days", "5 days", freq="D")
  186. idx_diff = index.difference(other, sort)
  187. expected = TimedeltaIndex(["0 days", "1 days"], freq=None)
  188. tm.assert_index_equal(idx_diff, expected)
  189. tm.assert_attr_equal("freq", idx_diff, expected)
  190. def test_difference_sort(self, sort):
  191. index = TimedeltaIndex(
  192. ["5 days", "3 days", "2 days", "4 days", "1 days", "0 days"]
  193. )
  194. other = timedelta_range("1 days", "4 days", freq="D")
  195. idx_diff = index.difference(other, sort)
  196. expected = TimedeltaIndex(["5 days", "0 days"], freq=None)
  197. if sort is None:
  198. expected = expected.sort_values()
  199. tm.assert_index_equal(idx_diff, expected)
  200. tm.assert_attr_equal("freq", idx_diff, expected)
  201. other = timedelta_range("2 days", "5 days", freq="D")
  202. idx_diff = index.difference(other, sort)
  203. expected = TimedeltaIndex(["1 days", "0 days"], freq=None)
  204. if sort is None:
  205. expected = expected.sort_values()
  206. tm.assert_index_equal(idx_diff, expected)
  207. tm.assert_attr_equal("freq", idx_diff, expected)