123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- from __future__ import annotations
- import numpy as np
- from pandas._typing import (
- ArrayLike,
- Scalar,
- npt,
- )
- from pandas.compat.numpy import np_percentile_argname
- from pandas.core.dtypes.missing import (
- isna,
- na_value_for_dtype,
- )
- def quantile_compat(
- values: ArrayLike, qs: npt.NDArray[np.float64], interpolation: str
- ) -> ArrayLike:
- """
- Compute the quantiles of the given values for each quantile in `qs`.
- Parameters
- ----------
- values : np.ndarray or ExtensionArray
- qs : np.ndarray[float64]
- interpolation : str
- Returns
- -------
- np.ndarray or ExtensionArray
- """
- if isinstance(values, np.ndarray):
- fill_value = na_value_for_dtype(values.dtype, compat=False)
- mask = isna(values)
- return quantile_with_mask(values, mask, fill_value, qs, interpolation)
- else:
- return values._quantile(qs, interpolation)
- def quantile_with_mask(
- values: np.ndarray,
- mask: npt.NDArray[np.bool_],
- fill_value,
- qs: npt.NDArray[np.float64],
- interpolation: str,
- ) -> np.ndarray:
- """
- Compute the quantiles of the given values for each quantile in `qs`.
- Parameters
- ----------
- values : np.ndarray
- For ExtensionArray, this is _values_for_factorize()[0]
- mask : np.ndarray[bool]
- mask = isna(values)
- For ExtensionArray, this is computed before calling _value_for_factorize
- fill_value : Scalar
- The value to interpret fill NA entries with
- For ExtensionArray, this is _values_for_factorize()[1]
- qs : np.ndarray[float64]
- interpolation : str
- Type of interpolation
- Returns
- -------
- np.ndarray
- Notes
- -----
- Assumes values is already 2D. For ExtensionArray this means np.atleast_2d
- has been called on _values_for_factorize()[0]
- Quantile is computed along axis=1.
- """
- assert values.shape == mask.shape
- if values.ndim == 1:
- # unsqueeze, operate, re-squeeze
- values = np.atleast_2d(values)
- mask = np.atleast_2d(mask)
- res_values = quantile_with_mask(values, mask, fill_value, qs, interpolation)
- return res_values[0]
- assert values.ndim == 2
- is_empty = values.shape[1] == 0
- if is_empty:
- # create the array of na_values
- # 2d len(values) * len(qs)
- flat = np.array([fill_value] * len(qs))
- result = np.repeat(flat, len(values)).reshape(len(values), len(qs))
- else:
- result = _nanpercentile(
- values,
- qs * 100.0,
- na_value=fill_value,
- mask=mask,
- interpolation=interpolation,
- )
- result = np.array(result, copy=False)
- result = result.T
- return result
- def _nanpercentile_1d(
- values: np.ndarray,
- mask: npt.NDArray[np.bool_],
- qs: npt.NDArray[np.float64],
- na_value: Scalar,
- interpolation: str,
- ) -> Scalar | np.ndarray:
- """
- Wrapper for np.percentile that skips missing values, specialized to
- 1-dimensional case.
- Parameters
- ----------
- values : array over which to find quantiles
- mask : ndarray[bool]
- locations in values that should be considered missing
- qs : np.ndarray[float64] of quantile indices to find
- na_value : scalar
- value to return for empty or all-null values
- interpolation : str
- Returns
- -------
- quantiles : scalar or array
- """
- # mask is Union[ExtensionArray, ndarray]
- values = values[~mask]
- if len(values) == 0:
- # Can't pass dtype=values.dtype here bc we might have na_value=np.nan
- # with values.dtype=int64 see test_quantile_empty
- # equiv: 'np.array([na_value] * len(qs))' but much faster
- return np.full(len(qs), na_value)
- return np.percentile(
- values,
- qs,
- # error: No overload variant of "percentile" matches argument
- # types "ndarray[Any, Any]", "ndarray[Any, dtype[floating[_64Bit]]]"
- # , "Dict[str, str]" [call-overload]
- **{np_percentile_argname: interpolation}, # type: ignore[call-overload]
- )
- def _nanpercentile(
- values: np.ndarray,
- qs: npt.NDArray[np.float64],
- *,
- na_value,
- mask: npt.NDArray[np.bool_],
- interpolation: str,
- ):
- """
- Wrapper for np.percentile that skips missing values.
- Parameters
- ----------
- values : np.ndarray[ndim=2] over which to find quantiles
- qs : np.ndarray[float64] of quantile indices to find
- na_value : scalar
- value to return for empty or all-null values
- mask : np.ndarray[bool]
- locations in values that should be considered missing
- interpolation : str
- Returns
- -------
- quantiles : scalar or array
- """
- if values.dtype.kind in ["m", "M"]:
- # need to cast to integer to avoid rounding errors in numpy
- result = _nanpercentile(
- values.view("i8"),
- qs=qs,
- na_value=na_value.view("i8"),
- mask=mask,
- interpolation=interpolation,
- )
- # Note: we have to do `astype` and not view because in general we
- # have float result at this point, not i8
- return result.astype(values.dtype)
- if mask.any():
- # Caller is responsible for ensuring mask shape match
- assert mask.shape == values.shape
- result = [
- _nanpercentile_1d(val, m, qs, na_value, interpolation=interpolation)
- for (val, m) in zip(list(values), list(mask))
- ]
- if values.dtype.kind == "f":
- # preserve itemsize
- result = np.array(result, dtype=values.dtype, copy=False).T
- else:
- result = np.array(result, copy=False).T
- if (
- result.dtype != values.dtype
- and not mask.all()
- and (result == result.astype(values.dtype, copy=False)).all()
- ):
- # mask.all() will never get cast back to int
- # e.g. values id integer dtype and result is floating dtype,
- # only cast back to integer dtype if result values are all-integer.
- result = result.astype(values.dtype, copy=False)
- return result
- else:
- return np.percentile(
- values,
- qs,
- axis=1,
- # error: No overload variant of "percentile" matches argument types
- # "ndarray[Any, Any]", "ndarray[Any, dtype[floating[_64Bit]]]",
- # "int", "Dict[str, str]" [call-overload]
- **{np_percentile_argname: interpolation}, # type: ignore[call-overload]
- )
|