from __future__ import annotations import numpy as np from pandas._libs import lib from pandas._libs.tslibs import ( get_unit_from_dtype, is_supported_unit, ) from pandas._typing import ( AxisInt, Dtype, NpDtype, Scalar, npt, ) from pandas.compat.numpy import function as nv from pandas.core.dtypes.astype import astype_array from pandas.core.dtypes.cast import construct_1d_object_array_from_listlike from pandas.core.dtypes.common import ( is_dtype_equal, pandas_dtype, ) from pandas.core.dtypes.dtypes import PandasDtype from pandas.core.dtypes.missing import isna from pandas.core import ( arraylike, nanops, ops, ) from pandas.core.arraylike import OpsMixin from pandas.core.arrays._mixins import NDArrayBackedExtensionArray from pandas.core.construction import ensure_wrapped_if_datetimelike from pandas.core.strings.object_array import ObjectStringArrayMixin class PandasArray( OpsMixin, NDArrayBackedExtensionArray, ObjectStringArrayMixin, ): """ A pandas ExtensionArray for NumPy data. This is mostly for internal compatibility, and is not especially useful on its own. Parameters ---------- values : ndarray The NumPy ndarray to wrap. Must be 1-dimensional. copy : bool, default False Whether to copy `values`. Attributes ---------- None Methods ------- None """ # If you're wondering why pd.Series(cls) doesn't put the array in an # ExtensionBlock, search for `ABCPandasArray`. We check for # that _typ to ensure that users don't unnecessarily use EAs inside # pandas internals, which turns off things like block consolidation. _typ = "npy_extension" __array_priority__ = 1000 _ndarray: np.ndarray _dtype: PandasDtype _internal_fill_value = np.nan # ------------------------------------------------------------------------ # Constructors def __init__(self, values: np.ndarray | PandasArray, copy: bool = False) -> None: if isinstance(values, type(self)): values = values._ndarray if not isinstance(values, np.ndarray): raise ValueError( f"'values' must be a NumPy array, not {type(values).__name__}" ) if values.ndim == 0: # Technically we support 2, but do not advertise that fact. raise ValueError("PandasArray must be 1-dimensional.") if copy: values = values.copy() dtype = PandasDtype(values.dtype) super().__init__(values, dtype) @classmethod def _from_sequence( cls, scalars, *, dtype: Dtype | None = None, copy: bool = False ) -> PandasArray: if isinstance(dtype, PandasDtype): dtype = dtype._dtype # error: Argument "dtype" to "asarray" has incompatible type # "Union[ExtensionDtype, str, dtype[Any], dtype[floating[_64Bit]], Type[object], # None]"; expected "Union[dtype[Any], None, type, _SupportsDType, str, # Union[Tuple[Any, int], Tuple[Any, Union[int, Sequence[int]]], List[Any], # _DTypeDict, Tuple[Any, Any]]]" result = np.asarray(scalars, dtype=dtype) # type: ignore[arg-type] if ( result.ndim > 1 and not hasattr(scalars, "dtype") and (dtype is None or dtype == object) ): # e.g. list-of-tuples result = construct_1d_object_array_from_listlike(scalars) if copy and result is scalars: result = result.copy() return cls(result) def _from_backing_data(self, arr: np.ndarray) -> PandasArray: return type(self)(arr) # ------------------------------------------------------------------------ # Data @property def dtype(self) -> PandasDtype: return self._dtype # ------------------------------------------------------------------------ # NumPy Array Interface def __array__(self, dtype: NpDtype | None = None) -> np.ndarray: return np.asarray(self._ndarray, dtype=dtype) def __array_ufunc__(self, ufunc: np.ufunc, method: str, *inputs, **kwargs): # Lightly modified version of # https://numpy.org/doc/stable/reference/generated/numpy.lib.mixins.NDArrayOperatorsMixin.html # The primary modification is not boxing scalar return values # in PandasArray, since pandas' ExtensionArrays are 1-d. out = kwargs.get("out", ()) result = ops.maybe_dispatch_ufunc_to_dunder_op( self, ufunc, method, *inputs, **kwargs ) if result is not NotImplemented: return result if "out" in kwargs: # e.g. test_ufunc_unary return arraylike.dispatch_ufunc_with_out( self, ufunc, method, *inputs, **kwargs ) if method == "reduce": result = arraylike.dispatch_reduction_ufunc( self, ufunc, method, *inputs, **kwargs ) if result is not NotImplemented: # e.g. tests.series.test_ufunc.TestNumpyReductions return result # Defer to the implementation of the ufunc on unwrapped values. inputs = tuple(x._ndarray if isinstance(x, PandasArray) else x for x in inputs) if out: kwargs["out"] = tuple( x._ndarray if isinstance(x, PandasArray) else x for x in out ) result = getattr(ufunc, method)(*inputs, **kwargs) if ufunc.nout > 1: # multiple return values; re-box array-like results return tuple(type(self)(x) for x in result) elif method == "at": # no return value return None elif method == "reduce": if isinstance(result, np.ndarray): # e.g. test_np_reduce_2d return type(self)(result) # e.g. test_np_max_nested_tuples return result else: # one return value; re-box array-like results return type(self)(result) # ------------------------------------------------------------------------ # Pandas ExtensionArray Interface def astype(self, dtype, copy: bool = True): dtype = pandas_dtype(dtype) if is_dtype_equal(dtype, self.dtype): if copy: return self.copy() return self result = astype_array(self._ndarray, dtype=dtype, copy=copy) return result def isna(self) -> np.ndarray: return isna(self._ndarray) def _validate_scalar(self, fill_value): if fill_value is None: # Primarily for subclasses fill_value = self.dtype.na_value return fill_value def _values_for_factorize(self) -> tuple[np.ndarray, float | None]: if self.dtype.kind in ["i", "u", "b"]: fv = None else: fv = np.nan return self._ndarray, fv # ------------------------------------------------------------------------ # Reductions def any( self, *, axis: AxisInt | None = None, out=None, keepdims: bool = False, skipna: bool = True, ): nv.validate_any((), {"out": out, "keepdims": keepdims}) result = nanops.nanany(self._ndarray, axis=axis, skipna=skipna) return self._wrap_reduction_result(axis, result) def all( self, *, axis: AxisInt | None = None, out=None, keepdims: bool = False, skipna: bool = True, ): nv.validate_all((), {"out": out, "keepdims": keepdims}) result = nanops.nanall(self._ndarray, axis=axis, skipna=skipna) return self._wrap_reduction_result(axis, result) def min( self, *, axis: AxisInt | None = None, skipna: bool = True, **kwargs ) -> Scalar: nv.validate_min((), kwargs) result = nanops.nanmin( values=self._ndarray, axis=axis, mask=self.isna(), skipna=skipna ) return self._wrap_reduction_result(axis, result) def max( self, *, axis: AxisInt | None = None, skipna: bool = True, **kwargs ) -> Scalar: nv.validate_max((), kwargs) result = nanops.nanmax( values=self._ndarray, axis=axis, mask=self.isna(), skipna=skipna ) return self._wrap_reduction_result(axis, result) def sum( self, *, axis: AxisInt | None = None, skipna: bool = True, min_count: int = 0, **kwargs, ) -> Scalar: nv.validate_sum((), kwargs) result = nanops.nansum( self._ndarray, axis=axis, skipna=skipna, min_count=min_count ) return self._wrap_reduction_result(axis, result) def prod( self, *, axis: AxisInt | None = None, skipna: bool = True, min_count: int = 0, **kwargs, ) -> Scalar: nv.validate_prod((), kwargs) result = nanops.nanprod( self._ndarray, axis=axis, skipna=skipna, min_count=min_count ) return self._wrap_reduction_result(axis, result) def mean( self, *, axis: AxisInt | None = None, dtype: NpDtype | None = None, out=None, keepdims: bool = False, skipna: bool = True, ): nv.validate_mean((), {"dtype": dtype, "out": out, "keepdims": keepdims}) result = nanops.nanmean(self._ndarray, axis=axis, skipna=skipna) return self._wrap_reduction_result(axis, result) def median( self, *, axis: AxisInt | None = None, out=None, overwrite_input: bool = False, keepdims: bool = False, skipna: bool = True, ): nv.validate_median( (), {"out": out, "overwrite_input": overwrite_input, "keepdims": keepdims} ) result = nanops.nanmedian(self._ndarray, axis=axis, skipna=skipna) return self._wrap_reduction_result(axis, result) def std( self, *, axis: AxisInt | None = None, dtype: NpDtype | None = None, out=None, ddof: int = 1, keepdims: bool = False, skipna: bool = True, ): nv.validate_stat_ddof_func( (), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="std" ) result = nanops.nanstd(self._ndarray, axis=axis, skipna=skipna, ddof=ddof) return self._wrap_reduction_result(axis, result) def var( self, *, axis: AxisInt | None = None, dtype: NpDtype | None = None, out=None, ddof: int = 1, keepdims: bool = False, skipna: bool = True, ): nv.validate_stat_ddof_func( (), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="var" ) result = nanops.nanvar(self._ndarray, axis=axis, skipna=skipna, ddof=ddof) return self._wrap_reduction_result(axis, result) def sem( self, *, axis: AxisInt | None = None, dtype: NpDtype | None = None, out=None, ddof: int = 1, keepdims: bool = False, skipna: bool = True, ): nv.validate_stat_ddof_func( (), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="sem" ) result = nanops.nansem(self._ndarray, axis=axis, skipna=skipna, ddof=ddof) return self._wrap_reduction_result(axis, result) def kurt( self, *, axis: AxisInt | None = None, dtype: NpDtype | None = None, out=None, keepdims: bool = False, skipna: bool = True, ): nv.validate_stat_ddof_func( (), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="kurt" ) result = nanops.nankurt(self._ndarray, axis=axis, skipna=skipna) return self._wrap_reduction_result(axis, result) def skew( self, *, axis: AxisInt | None = None, dtype: NpDtype | None = None, out=None, keepdims: bool = False, skipna: bool = True, ): nv.validate_stat_ddof_func( (), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="skew" ) result = nanops.nanskew(self._ndarray, axis=axis, skipna=skipna) return self._wrap_reduction_result(axis, result) # ------------------------------------------------------------------------ # Additional Methods def to_numpy( self, dtype: npt.DTypeLike | None = None, copy: bool = False, na_value: object = lib.no_default, ) -> np.ndarray: mask = self.isna() if na_value is not lib.no_default and mask.any(): result = self._ndarray.copy() result[mask] = na_value else: result = self._ndarray result = np.asarray(result, dtype=dtype) if copy and result is self._ndarray: result = result.copy() return result # ------------------------------------------------------------------------ # Ops def __invert__(self) -> PandasArray: return type(self)(~self._ndarray) def __neg__(self) -> PandasArray: return type(self)(-self._ndarray) def __pos__(self) -> PandasArray: return type(self)(+self._ndarray) def __abs__(self) -> PandasArray: return type(self)(abs(self._ndarray)) def _cmp_method(self, other, op): if isinstance(other, PandasArray): other = other._ndarray other = ops.maybe_prepare_scalar_for_op(other, (len(self),)) pd_op = ops.get_array_op(op) other = ensure_wrapped_if_datetimelike(other) with np.errstate(all="ignore"): result = pd_op(self._ndarray, other) if op is divmod or op is ops.rdivmod: a, b = result if isinstance(a, np.ndarray): # for e.g. op vs TimedeltaArray, we may already # have an ExtensionArray, in which case we do not wrap return self._wrap_ndarray_result(a), self._wrap_ndarray_result(b) return a, b if isinstance(result, np.ndarray): # for e.g. multiplication vs TimedeltaArray, we may already # have an ExtensionArray, in which case we do not wrap return self._wrap_ndarray_result(result) return result _arith_method = _cmp_method def _wrap_ndarray_result(self, result: np.ndarray): # If we have timedelta64[ns] result, return a TimedeltaArray instead # of a PandasArray if result.dtype.kind == "m" and is_supported_unit( get_unit_from_dtype(result.dtype) ): from pandas.core.arrays import TimedeltaArray return TimedeltaArray._simple_new(result, dtype=result.dtype) return type(self)(result) # ------------------------------------------------------------------------ # String methods interface _str_na_value = np.nan