123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- """
- Helper functions to generate range-like data for DatetimeArray
- (and possibly TimedeltaArray/PeriodArray)
- """
- from __future__ import annotations
- import numpy as np
- from pandas._libs.lib import i8max
- from pandas._libs.tslibs import (
- BaseOffset,
- OutOfBoundsDatetime,
- Timedelta,
- Timestamp,
- iNaT,
- )
- from pandas._typing import npt
- def generate_regular_range(
- start: Timestamp | Timedelta | None,
- end: Timestamp | Timedelta | None,
- periods: int | None,
- freq: BaseOffset,
- unit: str = "ns",
- ) -> npt.NDArray[np.intp]:
- """
- Generate a range of dates or timestamps with the spans between dates
- described by the given `freq` DateOffset.
- Parameters
- ----------
- start : Timedelta, Timestamp or None
- First point of produced date range.
- end : Timedelta, Timestamp or None
- Last point of produced date range.
- periods : int or None
- Number of periods in produced date range.
- freq : Tick
- Describes space between dates in produced date range.
- unit : str, default "ns"
- The resolution the output is meant to represent.
- Returns
- -------
- ndarray[np.int64]
- Representing the given resolution.
- """
- istart = start._value if start is not None else None
- iend = end._value if end is not None else None
- freq.nanos # raises if non-fixed frequency
- td = Timedelta(freq)
- try:
- td = td.as_unit( # pyright: ignore[reportGeneralTypeIssues]
- unit, round_ok=False
- )
- except ValueError as err:
- raise ValueError(
- f"freq={freq} is incompatible with unit={unit}. "
- "Use a lower freq or a higher unit instead."
- ) from err
- stride = int(td._value)
- if periods is None and istart is not None and iend is not None:
- b = istart
- # cannot just use e = Timestamp(end) + 1 because arange breaks when
- # stride is too large, see GH10887
- e = b + (iend - b) // stride * stride + stride // 2 + 1
- elif istart is not None and periods is not None:
- b = istart
- e = _generate_range_overflow_safe(b, periods, stride, side="start")
- elif iend is not None and periods is not None:
- e = iend + stride
- b = _generate_range_overflow_safe(e, periods, stride, side="end")
- else:
- raise ValueError(
- "at least 'start' or 'end' should be specified if a 'period' is given."
- )
- with np.errstate(over="raise"):
- # If the range is sufficiently large, np.arange may overflow
- # and incorrectly return an empty array if not caught.
- try:
- values = np.arange(b, e, stride, dtype=np.int64)
- except FloatingPointError:
- xdr = [b]
- while xdr[-1] != e:
- xdr.append(xdr[-1] + stride)
- values = np.array(xdr[:-1], dtype=np.int64)
- return values
- def _generate_range_overflow_safe(
- endpoint: int, periods: int, stride: int, side: str = "start"
- ) -> int:
- """
- Calculate the second endpoint for passing to np.arange, checking
- to avoid an integer overflow. Catch OverflowError and re-raise
- as OutOfBoundsDatetime.
- Parameters
- ----------
- endpoint : int
- nanosecond timestamp of the known endpoint of the desired range
- periods : int
- number of periods in the desired range
- stride : int
- nanoseconds between periods in the desired range
- side : {'start', 'end'}
- which end of the range `endpoint` refers to
- Returns
- -------
- other_end : int
- Raises
- ------
- OutOfBoundsDatetime
- """
- # GH#14187 raise instead of incorrectly wrapping around
- assert side in ["start", "end"]
- i64max = np.uint64(i8max)
- msg = f"Cannot generate range with {side}={endpoint} and periods={periods}"
- with np.errstate(over="raise"):
- # if periods * strides cannot be multiplied within the *uint64* bounds,
- # we cannot salvage the operation by recursing, so raise
- try:
- addend = np.uint64(periods) * np.uint64(np.abs(stride))
- except FloatingPointError as err:
- raise OutOfBoundsDatetime(msg) from err
- if np.abs(addend) <= i64max:
- # relatively easy case without casting concerns
- return _generate_range_overflow_safe_signed(endpoint, periods, stride, side)
- elif (endpoint > 0 and side == "start" and stride > 0) or (
- endpoint < 0 < stride and side == "end"
- ):
- # no chance of not-overflowing
- raise OutOfBoundsDatetime(msg)
- elif side == "end" and endpoint - stride <= i64max < endpoint:
- # in _generate_regular_range we added `stride` thereby overflowing
- # the bounds. Adjust to fix this.
- return _generate_range_overflow_safe(
- endpoint - stride, periods - 1, stride, side
- )
- # split into smaller pieces
- mid_periods = periods // 2
- remaining = periods - mid_periods
- assert 0 < remaining < periods, (remaining, periods, endpoint, stride)
- midpoint = _generate_range_overflow_safe(endpoint, mid_periods, stride, side)
- return _generate_range_overflow_safe(midpoint, remaining, stride, side)
- def _generate_range_overflow_safe_signed(
- endpoint: int, periods: int, stride: int, side: str
- ) -> int:
- """
- A special case for _generate_range_overflow_safe where `periods * stride`
- can be calculated without overflowing int64 bounds.
- """
- assert side in ["start", "end"]
- if side == "end":
- stride *= -1
- with np.errstate(over="raise"):
- addend = np.int64(periods) * np.int64(stride)
- try:
- # easy case with no overflows
- result = np.int64(endpoint) + addend
- if result == iNaT:
- # Putting this into a DatetimeArray/TimedeltaArray
- # would incorrectly be interpreted as NaT
- raise OverflowError
- # error: Incompatible return value type (got "signedinteger[_64Bit]",
- # expected "int")
- return result # type: ignore[return-value]
- except (FloatingPointError, OverflowError):
- # with endpoint negative and addend positive we risk
- # FloatingPointError; with reversed signed we risk OverflowError
- pass
- # if stride and endpoint had opposite signs, then endpoint + addend
- # should never overflow. so they must have the same signs
- assert (stride > 0 and endpoint >= 0) or (stride < 0 and endpoint <= 0)
- if stride > 0:
- # watch out for very special case in which we just slightly
- # exceed implementation bounds, but when passing the result to
- # np.arange will get a result slightly within the bounds
- # error: Incompatible types in assignment (expression has type
- # "unsignedinteger[_64Bit]", variable has type "signedinteger[_64Bit]")
- result = np.uint64(endpoint) + np.uint64(addend) # type: ignore[assignment]
- i64max = np.uint64(i8max)
- assert result > i64max
- if result <= i64max + np.uint64(stride):
- # error: Incompatible return value type (got "unsignedinteger", expected
- # "int")
- return result # type: ignore[return-value]
- raise OutOfBoundsDatetime(
- f"Cannot generate range with {side}={endpoint} and periods={periods}"
- )
|