test_timedeltas.py 10.0 KB


  1. from datetime import timedelta
  2. import numpy as np
  3. import pytest
  4. import pandas as pd
  5. from pandas import Timedelta
  6. import pandas._testing as tm
  7. from pandas.core.arrays import (
  8. DatetimeArray,
  9. TimedeltaArray,
  10. )
  11. class TestNonNano:
  12. @pytest.fixture(params=["s", "ms", "us"])
  13. def unit(self, request):
  14. return request.param
  15. @pytest.fixture
  16. def tda(self, unit):
  17. arr = np.arange(5, dtype=np.int64).view(f"m8[{unit}]")
  18. return TimedeltaArray._simple_new(arr, dtype=arr.dtype)
  19. def test_non_nano(self, unit):
  20. arr = np.arange(5, dtype=np.int64).view(f"m8[{unit}]")
  21. tda = TimedeltaArray._simple_new(arr, dtype=arr.dtype)
  22. assert tda.dtype == arr.dtype
  23. assert tda[0].unit == unit
  24. def test_as_unit_raises(self, tda):
  25. # GH#50616
  26. with pytest.raises(ValueError, match="Supported units"):
  27. tda.as_unit("D")
  28. tdi = pd.Index(tda)
  29. with pytest.raises(ValueError, match="Supported units"):
  30. tdi.as_unit("D")
  31. @pytest.mark.parametrize("field", TimedeltaArray._field_ops)
  32. def test_fields(self, tda, field):
  33. as_nano = tda._ndarray.astype("m8[ns]")
  34. tda_nano = TimedeltaArray._simple_new(as_nano, dtype=as_nano.dtype)
  35. result = getattr(tda, field)
  36. expected = getattr(tda_nano, field)
  37. tm.assert_numpy_array_equal(result, expected)
  38. def test_to_pytimedelta(self, tda):
  39. as_nano = tda._ndarray.astype("m8[ns]")
  40. tda_nano = TimedeltaArray._simple_new(as_nano, dtype=as_nano.dtype)
  41. result = tda.to_pytimedelta()
  42. expected = tda_nano.to_pytimedelta()
  43. tm.assert_numpy_array_equal(result, expected)
  44. def test_total_seconds(self, unit, tda):
  45. as_nano = tda._ndarray.astype("m8[ns]")
  46. tda_nano = TimedeltaArray._simple_new(as_nano, dtype=as_nano.dtype)
  47. result = tda.total_seconds()
  48. expected = tda_nano.total_seconds()
  49. tm.assert_numpy_array_equal(result, expected)
  50. def test_timedelta_array_total_seconds(self):
  51. # GH34290
  52. expected = Timedelta("2 min").total_seconds()
  53. result = pd.array([Timedelta("2 min")]).total_seconds()[0]
  54. assert result == expected
  55. @pytest.mark.parametrize(
  56. "nat", [np.datetime64("NaT", "ns"), np.datetime64("NaT", "us")]
  57. )
  58. def test_add_nat_datetimelike_scalar(self, nat, tda):
  59. result = tda + nat
  60. assert isinstance(result, DatetimeArray)
  61. assert result._creso == tda._creso
  62. assert result.isna().all()
  63. result = nat + tda
  64. assert isinstance(result, DatetimeArray)
  65. assert result._creso == tda._creso
  66. assert result.isna().all()
  67. def test_add_pdnat(self, tda):
  68. result = tda + pd.NaT
  69. assert isinstance(result, TimedeltaArray)
  70. assert result._creso == tda._creso
  71. assert result.isna().all()
  72. result = pd.NaT + tda
  73. assert isinstance(result, TimedeltaArray)
  74. assert result._creso == tda._creso
  75. assert result.isna().all()
  76. # TODO: 2022-07-11 this is the only test that gets to DTA.tz_convert
  77. # or tz_localize with non-nano; implement tests specific to that.
  78. def test_add_datetimelike_scalar(self, tda, tz_naive_fixture):
  79. ts = pd.Timestamp("2016-01-01", tz=tz_naive_fixture).as_unit("ns")
  80. expected = tda.as_unit("ns") + ts
  81. res = tda + ts
  82. tm.assert_extension_array_equal(res, expected)
  83. res = ts + tda
  84. tm.assert_extension_array_equal(res, expected)
  85. ts += Timedelta(1) # case where we can't cast losslessly
  86. exp_values = tda._ndarray + ts.asm8
  87. expected = (
  88. DatetimeArray._simple_new(exp_values, dtype=exp_values.dtype)
  89. .tz_localize("UTC")
  90. .tz_convert(ts.tz)
  91. )
  92. result = tda + ts
  93. tm.assert_extension_array_equal(result, expected)
  94. result = ts + tda
  95. tm.assert_extension_array_equal(result, expected)
  96. def test_mul_scalar(self, tda):
  97. other = 2
  98. result = tda * other
  99. expected = TimedeltaArray._simple_new(tda._ndarray * other, dtype=tda.dtype)
  100. tm.assert_extension_array_equal(result, expected)
  101. assert result._creso == tda._creso
  102. def test_mul_listlike(self, tda):
  103. other = np.arange(len(tda))
  104. result = tda * other
  105. expected = TimedeltaArray._simple_new(tda._ndarray * other, dtype=tda.dtype)
  106. tm.assert_extension_array_equal(result, expected)
  107. assert result._creso == tda._creso
  108. def test_mul_listlike_object(self, tda):
  109. other = np.arange(len(tda))
  110. result = tda * other.astype(object)
  111. expected = TimedeltaArray._simple_new(tda._ndarray * other, dtype=tda.dtype)
  112. tm.assert_extension_array_equal(result, expected)
  113. assert result._creso == tda._creso
  114. def test_div_numeric_scalar(self, tda):
  115. other = 2
  116. result = tda / other
  117. expected = TimedeltaArray._simple_new(tda._ndarray / other, dtype=tda.dtype)
  118. tm.assert_extension_array_equal(result, expected)
  119. assert result._creso == tda._creso
  120. def test_div_td_scalar(self, tda):
  121. other = timedelta(seconds=1)
  122. result = tda / other
  123. expected = tda._ndarray / np.timedelta64(1, "s")
  124. tm.assert_numpy_array_equal(result, expected)
  125. def test_div_numeric_array(self, tda):
  126. other = np.arange(len(tda))
  127. result = tda / other
  128. expected = TimedeltaArray._simple_new(tda._ndarray / other, dtype=tda.dtype)
  129. tm.assert_extension_array_equal(result, expected)
  130. assert result._creso == tda._creso
  131. def test_div_td_array(self, tda):
  132. other = tda._ndarray + tda._ndarray[-1]
  133. result = tda / other
  134. expected = tda._ndarray / other
  135. tm.assert_numpy_array_equal(result, expected)
  136. def test_add_timedeltaarraylike(self, tda):
  137. tda_nano = tda.astype("m8[ns]")
  138. expected = tda_nano * 2
  139. res = tda_nano + tda
  140. tm.assert_extension_array_equal(res, expected)
  141. res = tda + tda_nano
  142. tm.assert_extension_array_equal(res, expected)
  143. expected = tda_nano * 0
  144. res = tda - tda_nano
  145. tm.assert_extension_array_equal(res, expected)
  146. res = tda_nano - tda
  147. tm.assert_extension_array_equal(res, expected)
  148. class TestTimedeltaArray:
  149. @pytest.mark.parametrize("dtype", [int, np.int32, np.int64, "uint32", "uint64"])
  150. def test_astype_int(self, dtype):
  151. arr = TimedeltaArray._from_sequence([Timedelta("1H"), Timedelta("2H")])
  152. if np.dtype(dtype) != np.int64:
  153. with pytest.raises(TypeError, match=r"Do obj.astype\('int64'\)"):
  154. arr.astype(dtype)
  155. return
  156. result = arr.astype(dtype)
  157. expected = arr._ndarray.view("i8")
  158. tm.assert_numpy_array_equal(result, expected)
  159. def test_setitem_clears_freq(self):
  160. a = TimedeltaArray(pd.timedelta_range("1H", periods=2, freq="H"))
  161. a[0] = Timedelta("1H")
  162. assert a.freq is None
  163. @pytest.mark.parametrize(
  164. "obj",
  165. [
  166. Timedelta(seconds=1),
  167. Timedelta(seconds=1).to_timedelta64(),
  168. Timedelta(seconds=1).to_pytimedelta(),
  169. ],
  170. )
  171. def test_setitem_objects(self, obj):
  172. # make sure we accept timedelta64 and timedelta in addition to Timedelta
  173. tdi = pd.timedelta_range("2 Days", periods=4, freq="H")
  174. arr = TimedeltaArray(tdi, freq=tdi.freq)
  175. arr[0] = obj
  176. assert arr[0] == Timedelta(seconds=1)
  177. @pytest.mark.parametrize(
  178. "other",
  179. [
  180. 1,
  181. np.int64(1),
  182. 1.0,
  183. np.datetime64("NaT"),
  184. pd.Timestamp("2021-01-01"),
  185. "invalid",
  186. np.arange(10, dtype="i8") * 24 * 3600 * 10**9,
  187. (np.arange(10) * 24 * 3600 * 10**9).view("datetime64[ns]"),
  188. pd.Timestamp("2021-01-01").to_period("D"),
  189. ],
  190. )
  191. @pytest.mark.parametrize("index", [True, False])
  192. def test_searchsorted_invalid_types(self, other, index):
  193. data = np.arange(10, dtype="i8") * 24 * 3600 * 10**9
  194. arr = TimedeltaArray(data, freq="D")
  195. if index:
  196. arr = pd.Index(arr)
  197. msg = "|".join(
  198. [
  199. "searchsorted requires compatible dtype or scalar",
  200. "value should be a 'Timedelta', 'NaT', or array of those. Got",
  201. ]
  202. )
  203. with pytest.raises(TypeError, match=msg):
  204. arr.searchsorted(other)
  205. class TestUnaryOps:
  206. def test_abs(self):
  207. vals = np.array([-3600 * 10**9, "NaT", 7200 * 10**9], dtype="m8[ns]")
  208. arr = TimedeltaArray(vals)
  209. evals = np.array([3600 * 10**9, "NaT", 7200 * 10**9], dtype="m8[ns]")
  210. expected = TimedeltaArray(evals)
  211. result = abs(arr)
  212. tm.assert_timedelta_array_equal(result, expected)
  213. result2 = np.abs(arr)
  214. tm.assert_timedelta_array_equal(result2, expected)
  215. def test_pos(self):
  216. vals = np.array([-3600 * 10**9, "NaT", 7200 * 10**9], dtype="m8[ns]")
  217. arr = TimedeltaArray(vals)
  218. result = +arr
  219. tm.assert_timedelta_array_equal(result, arr)
  220. assert not tm.shares_memory(result, arr)
  221. result2 = np.positive(arr)
  222. tm.assert_timedelta_array_equal(result2, arr)
  223. assert not tm.shares_memory(result2, arr)
  224. def test_neg(self):
  225. vals = np.array([-3600 * 10**9, "NaT", 7200 * 10**9], dtype="m8[ns]")
  226. arr = TimedeltaArray(vals)
  227. evals = np.array([3600 * 10**9, "NaT", -7200 * 10**9], dtype="m8[ns]")
  228. expected = TimedeltaArray(evals)
  229. result = -arr
  230. tm.assert_timedelta_array_equal(result, expected)
  231. result2 = np.negative(arr)
  232. tm.assert_timedelta_array_equal(result2, expected)
  233. def test_neg_freq(self):
  234. tdi = pd.timedelta_range("2 Days", periods=4, freq="H")
  235. arr = TimedeltaArray(tdi, freq=tdi.freq)
  236. expected = TimedeltaArray(-tdi._data, freq=-tdi.freq)
  237. result = -arr
  238. tm.assert_timedelta_array_equal(result, expected)
  239. result2 = np.negative(arr)
  240. tm.assert_timedelta_array_equal(result2, expected)