123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- """
- Misc tools for implementing data structures
- Note: pandas.core.common is *not* part of the public API.
- """
- from __future__ import annotations
- import builtins
- from collections import (
- abc,
- defaultdict,
- )
- import contextlib
- from functools import partial
- import inspect
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Collection,
- Generator,
- Hashable,
- Iterable,
- Sequence,
- cast,
- overload,
- )
- import warnings
- import numpy as np
- from pandas._libs import lib
- from pandas._typing import (
- AnyArrayLike,
- ArrayLike,
- NpDtype,
- RandomState,
- T,
- )
- from pandas.core.dtypes.cast import construct_1d_object_array_from_listlike
- from pandas.core.dtypes.common import (
- is_array_like,
- is_bool_dtype,
- is_extension_array_dtype,
- is_integer,
- )
- from pandas.core.dtypes.generic import (
- ABCExtensionArray,
- ABCIndex,
- ABCSeries,
- )
- from pandas.core.dtypes.inference import iterable_not_string
- from pandas.core.dtypes.missing import isna
- if TYPE_CHECKING:
- from pandas import Index
- def flatten(line):
- """
- Flatten an arbitrarily nested sequence.
- Parameters
- ----------
- line : sequence
- The non string sequence to flatten
- Notes
- -----
- This doesn't consider strings sequences.
- Returns
- -------
- flattened : generator
- """
- for element in line:
- if iterable_not_string(element):
- yield from flatten(element)
- else:
- yield element
- def consensus_name_attr(objs):
- name = objs[0].name
- for obj in objs[1:]:
- try:
- if obj.name != name:
- name = None
- except ValueError:
- name = None
- return name
- def is_bool_indexer(key: Any) -> bool:
- """
- Check whether `key` is a valid boolean indexer.
- Parameters
- ----------
- key : Any
- Only list-likes may be considered boolean indexers.
- All other types are not considered a boolean indexer.
- For array-like input, boolean ndarrays or ExtensionArrays
- with ``_is_boolean`` set are considered boolean indexers.
- Returns
- -------
- bool
- Whether `key` is a valid boolean indexer.
- Raises
- ------
- ValueError
- When the array is an object-dtype ndarray or ExtensionArray
- and contains missing values.
- See Also
- --------
- check_array_indexer : Check that `key` is a valid array to index,
- and convert to an ndarray.
- """
- if isinstance(key, (ABCSeries, np.ndarray, ABCIndex)) or (
- is_array_like(key) and is_extension_array_dtype(key.dtype)
- ):
- if key.dtype == np.object_:
- key_array = np.asarray(key)
- if not lib.is_bool_array(key_array):
- na_msg = "Cannot mask with non-boolean array containing NA / NaN values"
- if lib.infer_dtype(key_array) == "boolean" and isna(key_array).any():
- # Don't raise on e.g. ["A", "B", np.nan], see
- # test_loc_getitem_list_of_labels_categoricalindex_with_na
- raise ValueError(na_msg)
- return False
- return True
- elif is_bool_dtype(key.dtype):
- return True
- elif isinstance(key, list):
- # check if np.array(key).dtype would be bool
- if len(key) > 0:
- if type(key) is not list:
- # GH#42461 cython will raise TypeError if we pass a subclass
- key = list(key)
- return lib.is_bool_list(key)
- return False
- def cast_scalar_indexer(val):
- """
- Disallow indexing with a float key, even if that key is a round number.
- Parameters
- ----------
- val : scalar
- Returns
- -------
- outval : scalar
- """
- # assumes lib.is_scalar(val)
- if lib.is_float(val) and val.is_integer():
- raise IndexError(
- # GH#34193
- "Indexing with a float is no longer supported. Manually convert "
- "to an integer key instead."
- )
- return val
- def not_none(*args):
- """
- Returns a generator consisting of the arguments that are not None.
- """
- return (arg for arg in args if arg is not None)
- def any_none(*args) -> bool:
- """
- Returns a boolean indicating if any argument is None.
- """
- return any(arg is None for arg in args)
- def all_none(*args) -> bool:
- """
- Returns a boolean indicating if all arguments are None.
- """
- return all(arg is None for arg in args)
- def any_not_none(*args) -> bool:
- """
- Returns a boolean indicating if any argument is not None.
- """
- return any(arg is not None for arg in args)
- def all_not_none(*args) -> bool:
- """
- Returns a boolean indicating if all arguments are not None.
- """
- return all(arg is not None for arg in args)
- def count_not_none(*args) -> int:
- """
- Returns the count of arguments that are not None.
- """
- return sum(x is not None for x in args)
- @overload
- def asarray_tuplesafe(
- values: ArrayLike | list | tuple | zip, dtype: NpDtype | None = ...
- ) -> np.ndarray:
- # ExtensionArray can only be returned when values is an Index, all other iterables
- # will return np.ndarray. Unfortunately "all other" cannot be encoded in a type
- # signature, so instead we special-case some common types.
- ...
- @overload
- def asarray_tuplesafe(values: Iterable, dtype: NpDtype | None = ...) -> ArrayLike:
- ...
- def asarray_tuplesafe(values: Iterable, dtype: NpDtype | None = None) -> ArrayLike:
- if not (isinstance(values, (list, tuple)) or hasattr(values, "__array__")):
- values = list(values)
- elif isinstance(values, ABCIndex):
- return values._values
- if isinstance(values, list) and dtype in [np.object_, object]:
- return construct_1d_object_array_from_listlike(values)
- try:
- with warnings.catch_warnings():
- # Can remove warning filter once NumPy 1.24 is min version
- warnings.simplefilter("ignore", np.VisibleDeprecationWarning)
- result = np.asarray(values, dtype=dtype)
- except ValueError:
- # Using try/except since it's more performant than checking is_list_like
- # over each element
- # error: Argument 1 to "construct_1d_object_array_from_listlike"
- # has incompatible type "Iterable[Any]"; expected "Sized"
- return construct_1d_object_array_from_listlike(values) # type: ignore[arg-type]
- if issubclass(result.dtype.type, str):
- result = np.asarray(values, dtype=object)
- if result.ndim == 2:
- # Avoid building an array of arrays:
- values = [tuple(x) for x in values]
- result = construct_1d_object_array_from_listlike(values)
- return result
- def index_labels_to_array(
- labels: np.ndarray | Iterable, dtype: NpDtype | None = None
- ) -> np.ndarray:
- """
- Transform label or iterable of labels to array, for use in Index.
- Parameters
- ----------
- dtype : dtype
- If specified, use as dtype of the resulting array, otherwise infer.
- Returns
- -------
- array
- """
- if isinstance(labels, (str, tuple)):
- labels = [labels]
- if not isinstance(labels, (list, np.ndarray)):
- try:
- labels = list(labels)
- except TypeError: # non-iterable
- labels = [labels]
- labels = asarray_tuplesafe(labels, dtype=dtype)
- return labels
- def maybe_make_list(obj):
- if obj is not None and not isinstance(obj, (tuple, list)):
- return [obj]
- return obj
- def maybe_iterable_to_list(obj: Iterable[T] | T) -> Collection[T] | T:
- """
- If obj is Iterable but not list-like, consume into list.
- """
- if isinstance(obj, abc.Iterable) and not isinstance(obj, abc.Sized):
- return list(obj)
- obj = cast(Collection, obj)
- return obj
- def is_null_slice(obj) -> bool:
- """
- We have a null slice.
- """
- return (
- isinstance(obj, slice)
- and obj.start is None
- and obj.stop is None
- and obj.step is None
- )
- def is_empty_slice(obj) -> bool:
- """
- We have an empty slice, e.g. no values are selected.
- """
- return (
- isinstance(obj, slice)
- and obj.start is not None
- and obj.stop is not None
- and obj.start == obj.stop
- )
- def is_true_slices(line) -> list[bool]:
- """
- Find non-trivial slices in "line": return a list of booleans with same length.
- """
- return [isinstance(k, slice) and not is_null_slice(k) for k in line]
- # TODO: used only once in indexing; belongs elsewhere?
- def is_full_slice(obj, line: int) -> bool:
- """
- We have a full length slice.
- """
- return (
- isinstance(obj, slice)
- and obj.start == 0
- and obj.stop == line
- and obj.step is None
- )
- def get_callable_name(obj):
- # typical case has name
- if hasattr(obj, "__name__"):
- return getattr(obj, "__name__")
- # some objects don't; could recurse
- if isinstance(obj, partial):
- return get_callable_name(obj.func)
- # fall back to class name
- if callable(obj):
- return type(obj).__name__
- # everything failed (probably because the argument
- # wasn't actually callable); we return None
- # instead of the empty string in this case to allow
- # distinguishing between no name and a name of ''
- return None
- def apply_if_callable(maybe_callable, obj, **kwargs):
- """
- Evaluate possibly callable input using obj and kwargs if it is callable,
- otherwise return as it is.
- Parameters
- ----------
- maybe_callable : possibly a callable
- obj : NDFrame
- **kwargs
- """
- if callable(maybe_callable):
- return maybe_callable(obj, **kwargs)
- return maybe_callable
- def standardize_mapping(into):
- """
- Helper function to standardize a supplied mapping.
- Parameters
- ----------
- into : instance or subclass of collections.abc.Mapping
- Must be a class, an initialized collections.defaultdict,
- or an instance of a collections.abc.Mapping subclass.
- Returns
- -------
- mapping : a collections.abc.Mapping subclass or other constructor
- a callable object that can accept an iterator to create
- the desired Mapping.
- See Also
- --------
- DataFrame.to_dict
- Series.to_dict
- """
- if not inspect.isclass(into):
- if isinstance(into, defaultdict):
- return partial(defaultdict, into.default_factory)
- into = type(into)
- if not issubclass(into, abc.Mapping):
- raise TypeError(f"unsupported type: {into}")
- if into == defaultdict:
- raise TypeError("to_dict() only accepts initialized defaultdicts")
- return into
- @overload
- def random_state(state: np.random.Generator) -> np.random.Generator:
- ...
- @overload
- def random_state(
- state: int | ArrayLike | np.random.BitGenerator | np.random.RandomState | None,
- ) -> np.random.RandomState:
- ...
- def random_state(state: RandomState | None = None):
- """
- Helper function for processing random_state arguments.
- Parameters
- ----------
- state : int, array-like, BitGenerator, Generator, np.random.RandomState, None.
- If receives an int, array-like, or BitGenerator, passes to
- np.random.RandomState() as seed.
- If receives an np.random RandomState or Generator, just returns that unchanged.
- If receives `None`, returns np.random.
- If receives anything else, raises an informative ValueError.
- .. versionchanged:: 1.1.0
- array-like and BitGenerator object now passed to np.random.RandomState()
- as seed
- Default None.
- Returns
- -------
- np.random.RandomState or np.random.Generator. If state is None, returns np.random
- """
- if (
- is_integer(state)
- or is_array_like(state)
- or isinstance(state, np.random.BitGenerator)
- ):
- # error: Argument 1 to "RandomState" has incompatible type "Optional[Union[int,
- # Union[ExtensionArray, ndarray[Any, Any]], Generator, RandomState]]"; expected
- # "Union[None, Union[Union[_SupportsArray[dtype[Union[bool_, integer[Any]]]],
- # Sequence[_SupportsArray[dtype[Union[bool_, integer[Any]]]]],
- # Sequence[Sequence[_SupportsArray[dtype[Union[bool_, integer[Any]]]]]],
- # Sequence[Sequence[Sequence[_SupportsArray[dtype[Union[bool_,
- # integer[Any]]]]]]],
- # Sequence[Sequence[Sequence[Sequence[_SupportsArray[dtype[Union[bool_,
- # integer[Any]]]]]]]]], Union[bool, int, Sequence[Union[bool, int]],
- # Sequence[Sequence[Union[bool, int]]], Sequence[Sequence[Sequence[Union[bool,
- # int]]]], Sequence[Sequence[Sequence[Sequence[Union[bool, int]]]]]]],
- # BitGenerator]"
- return np.random.RandomState(state) # type: ignore[arg-type]
- elif isinstance(state, np.random.RandomState):
- return state
- elif isinstance(state, np.random.Generator):
- return state
- elif state is None:
- return np.random
- else:
- raise ValueError(
- "random_state must be an integer, array-like, a BitGenerator, Generator, "
- "a numpy RandomState, or None"
- )
- def pipe(
- obj, func: Callable[..., T] | tuple[Callable[..., T], str], *args, **kwargs
- ) -> T:
- """
- Apply a function ``func`` to object ``obj`` either by passing obj as the
- first argument to the function or, in the case that the func is a tuple,
- interpret the first element of the tuple as a function and pass the obj to
- that function as a keyword argument whose key is the value of the second
- element of the tuple.
- Parameters
- ----------
- func : callable or tuple of (callable, str)
- Function to apply to this object or, alternatively, a
- ``(callable, data_keyword)`` tuple where ``data_keyword`` is a
- string indicating the keyword of ``callable`` that expects the
- object.
- *args : iterable, optional
- Positional arguments passed into ``func``.
- **kwargs : dict, optional
- A dictionary of keyword arguments passed into ``func``.
- Returns
- -------
- object : the return type of ``func``.
- """
- if isinstance(func, tuple):
- func, target = func
- if target in kwargs:
- msg = f"{target} is both the pipe target and a keyword argument"
- raise ValueError(msg)
- kwargs[target] = obj
- return func(*args, **kwargs)
- else:
- return func(obj, *args, **kwargs)
- def get_rename_function(mapper):
- """
- Returns a function that will map names/labels, dependent if mapper
- is a dict, Series or just a function.
- """
- def f(x):
- if x in mapper:
- return mapper[x]
- else:
- return x
- return f if isinstance(mapper, (abc.Mapping, ABCSeries)) else mapper
- def convert_to_list_like(
- values: Hashable | Iterable | AnyArrayLike,
- ) -> list | AnyArrayLike:
- """
- Convert list-like or scalar input to list-like. List, numpy and pandas array-like
- inputs are returned unmodified whereas others are converted to list.
- """
- if isinstance(values, (list, np.ndarray, ABCIndex, ABCSeries, ABCExtensionArray)):
- return values
- elif isinstance(values, abc.Iterable) and not isinstance(values, str):
- return list(values)
- return [values]
- @contextlib.contextmanager
- def temp_setattr(obj, attr: str, value) -> Generator[None, None, None]:
- """Temporarily set attribute on an object.
- Args:
- obj: Object whose attribute will be modified.
- attr: Attribute to modify.
- value: Value to temporarily set attribute to.
- Yields:
- obj with modified attribute.
- """
- old_value = getattr(obj, attr)
- setattr(obj, attr, value)
- try:
- yield obj
- finally:
- setattr(obj, attr, old_value)
- def require_length_match(data, index: Index) -> None:
- """
- Check the length of data matches the length of the index.
- """
- if len(data) != len(index):
- raise ValueError(
- "Length of values "
- f"({len(data)}) "
- "does not match length of index "
- f"({len(index)})"
- )
- # the ufuncs np.maximum.reduce and np.minimum.reduce default to axis=0,
- # whereas np.min and np.max (which directly call obj.min and obj.max)
- # default to axis=None.
- _builtin_table = {
- builtins.sum: np.sum,
- builtins.max: np.maximum.reduce,
- builtins.min: np.minimum.reduce,
- }
- _cython_table = {
- builtins.sum: "sum",
- builtins.max: "max",
- builtins.min: "min",
- np.all: "all",
- np.any: "any",
- np.sum: "sum",
- np.nansum: "sum",
- np.mean: "mean",
- np.nanmean: "mean",
- np.prod: "prod",
- np.nanprod: "prod",
- np.std: "std",
- np.nanstd: "std",
- np.var: "var",
- np.nanvar: "var",
- np.median: "median",
- np.nanmedian: "median",
- np.max: "max",
- np.nanmax: "max",
- np.min: "min",
- np.nanmin: "min",
- np.cumprod: "cumprod",
- np.nancumprod: "cumprod",
- np.cumsum: "cumsum",
- np.nancumsum: "cumsum",
- }
- def get_cython_func(arg: Callable) -> str | None:
- """
- if we define an internal function for this argument, return it
- """
- return _cython_table.get(arg)
- def is_builtin_func(arg):
- """
- if we define a builtin function for this argument, return it,
- otherwise return the arg
- """
- return _builtin_table.get(arg, arg)
- def fill_missing_names(names: Sequence[Hashable | None]) -> list[Hashable]:
- """
- If a name is missing then replace it by level_n, where n is the count
- .. versionadded:: 1.4.0
- Parameters
- ----------
- names : list-like
- list of column names or None values.
- Returns
- -------
- list
- list of column names with the None values replaced.
- """
- return [f"level_{i}" if name is None else name for i, name in enumerate(names)]
|