123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- from __future__ import annotations
- import functools
- import re
- import sys
- import textwrap
- from typing import (
- TYPE_CHECKING,
- Callable,
- Literal,
- )
- import unicodedata
- import numpy as np
- from pandas._libs import lib
- import pandas._libs.missing as libmissing
- import pandas._libs.ops as libops
- from pandas._typing import (
- NpDtype,
- Scalar,
- )
- from pandas.core.dtypes.common import is_scalar
- from pandas.core.dtypes.missing import isna
- from pandas.core.strings.base import BaseStringArrayMethods
- if TYPE_CHECKING:
- from pandas import Series
- class ObjectStringArrayMixin(BaseStringArrayMethods):
- """
- String Methods operating on object-dtype ndarrays.
- """
- _str_na_value = np.nan
- def __len__(self) -> int:
- # For typing, _str_map relies on the object being sized.
- raise NotImplementedError
- def _str_map(
- self, f, na_value=None, dtype: NpDtype | None = None, convert: bool = True
- ):
- """
- Map a callable over valid elements of the array.
- Parameters
- ----------
- f : Callable
- A function to call on each non-NA element.
- na_value : Scalar, optional
- The value to set for NA values. Might also be used for the
- fill value if the callable `f` raises an exception.
- This defaults to ``self._str_na_value`` which is ``np.nan``
- for object-dtype and Categorical and ``pd.NA`` for StringArray.
- dtype : Dtype, optional
- The dtype of the result array.
- convert : bool, default True
- Whether to call `maybe_convert_objects` on the resulting ndarray
- """
- if dtype is None:
- dtype = np.dtype("object")
- if na_value is None:
- na_value = self._str_na_value
- if not len(self):
- return np.array([], dtype=dtype)
- arr = np.asarray(self, dtype=object)
- mask = isna(arr)
- map_convert = convert and not np.all(mask)
- try:
- result = lib.map_infer_mask(arr, f, mask.view(np.uint8), map_convert)
- except (TypeError, AttributeError) as err:
- # Reraise the exception if callable `f` got wrong number of args.
- # The user may want to be warned by this, instead of getting NaN
- p_err = (
- r"((takes)|(missing)) (?(2)from \d+ to )?\d+ "
- r"(?(3)required )positional arguments?"
- )
- if len(err.args) >= 1 and re.search(p_err, err.args[0]):
- # FIXME: this should be totally avoidable
- raise err
- def g(x):
- # This type of fallback behavior can be removed once
- # we remove object-dtype .str accessor.
- try:
- return f(x)
- except (TypeError, AttributeError):
- return na_value
- return self._str_map(g, na_value=na_value, dtype=dtype)
- if not isinstance(result, np.ndarray):
- return result
- if na_value is not np.nan:
- np.putmask(result, mask, na_value)
- if convert and result.dtype == object:
- result = lib.maybe_convert_objects(result)
- return result
- def _str_count(self, pat, flags: int = 0):
- regex = re.compile(pat, flags=flags)
- f = lambda x: len(regex.findall(x))
- return self._str_map(f, dtype="int64")
- def _str_pad(
- self,
- width,
- side: Literal["left", "right", "both"] = "left",
- fillchar: str = " ",
- ):
- if side == "left":
- f = lambda x: x.rjust(width, fillchar)
- elif side == "right":
- f = lambda x: x.ljust(width, fillchar)
- elif side == "both":
- f = lambda x: x.center(width, fillchar)
- else: # pragma: no cover
- raise ValueError("Invalid side")
- return self._str_map(f)
- def _str_contains(
- self, pat, case: bool = True, flags: int = 0, na=np.nan, regex: bool = True
- ):
- if regex:
- if not case:
- flags |= re.IGNORECASE
- pat = re.compile(pat, flags=flags)
- f = lambda x: pat.search(x) is not None
- else:
- if case:
- f = lambda x: pat in x
- else:
- upper_pat = pat.upper()
- f = lambda x: upper_pat in x.upper()
- return self._str_map(f, na, dtype=np.dtype("bool"))
- def _str_startswith(self, pat, na=None):
- f = lambda x: x.startswith(pat)
- return self._str_map(f, na_value=na, dtype=np.dtype(bool))
- def _str_endswith(self, pat, na=None):
- f = lambda x: x.endswith(pat)
- return self._str_map(f, na_value=na, dtype=np.dtype(bool))
- def _str_replace(
- self,
- pat: str | re.Pattern,
- repl: str | Callable,
- n: int = -1,
- case: bool = True,
- flags: int = 0,
- regex: bool = True,
- ):
- if case is False:
- # add case flag, if provided
- flags |= re.IGNORECASE
- if regex or flags or callable(repl):
- if not isinstance(pat, re.Pattern):
- if regex is False:
- pat = re.escape(pat)
- pat = re.compile(pat, flags=flags)
- n = n if n >= 0 else 0
- f = lambda x: pat.sub(repl=repl, string=x, count=n)
- else:
- f = lambda x: x.replace(pat, repl, n)
- return self._str_map(f, dtype=str)
- def _str_repeat(self, repeats):
- if is_scalar(repeats):
- def scalar_rep(x):
- try:
- return bytes.__mul__(x, repeats)
- except TypeError:
- return str.__mul__(x, repeats)
- return self._str_map(scalar_rep, dtype=str)
- else:
- from pandas.core.arrays.string_ import BaseStringArray
- def rep(x, r):
- if x is libmissing.NA:
- return x
- try:
- return bytes.__mul__(x, r)
- except TypeError:
- return str.__mul__(x, r)
- repeats = np.asarray(repeats, dtype=object)
- result = libops.vec_binop(np.asarray(self), repeats, rep)
- if isinstance(self, BaseStringArray):
- # Not going through map, so we have to do this here.
- result = type(self)._from_sequence(result)
- return result
- def _str_match(
- self, pat: str, case: bool = True, flags: int = 0, na: Scalar | None = None
- ):
- if not case:
- flags |= re.IGNORECASE
- regex = re.compile(pat, flags=flags)
- f = lambda x: regex.match(x) is not None
- return self._str_map(f, na_value=na, dtype=np.dtype(bool))
- def _str_fullmatch(
- self,
- pat: str | re.Pattern,
- case: bool = True,
- flags: int = 0,
- na: Scalar | None = None,
- ):
- if not case:
- flags |= re.IGNORECASE
- regex = re.compile(pat, flags=flags)
- f = lambda x: regex.fullmatch(x) is not None
- return self._str_map(f, na_value=na, dtype=np.dtype(bool))
- def _str_encode(self, encoding, errors: str = "strict"):
- f = lambda x: x.encode(encoding, errors=errors)
- return self._str_map(f, dtype=object)
- def _str_find(self, sub, start: int = 0, end=None):
- return self._str_find_(sub, start, end, side="left")
- def _str_rfind(self, sub, start: int = 0, end=None):
- return self._str_find_(sub, start, end, side="right")
- def _str_find_(self, sub, start, end, side):
- if side == "left":
- method = "find"
- elif side == "right":
- method = "rfind"
- else: # pragma: no cover
- raise ValueError("Invalid side")
- if end is None:
- f = lambda x: getattr(x, method)(sub, start)
- else:
- f = lambda x: getattr(x, method)(sub, start, end)
- return self._str_map(f, dtype="int64")
- def _str_findall(self, pat, flags: int = 0):
- regex = re.compile(pat, flags=flags)
- return self._str_map(regex.findall, dtype="object")
- def _str_get(self, i):
- def f(x):
- if isinstance(x, dict):
- return x.get(i)
- elif len(x) > i >= -len(x):
- return x[i]
- return self._str_na_value
- return self._str_map(f)
- def _str_index(self, sub, start: int = 0, end=None):
- if end:
- f = lambda x: x.index(sub, start, end)
- else:
- f = lambda x: x.index(sub, start, end)
- return self._str_map(f, dtype="int64")
- def _str_rindex(self, sub, start: int = 0, end=None):
- if end:
- f = lambda x: x.rindex(sub, start, end)
- else:
- f = lambda x: x.rindex(sub, start, end)
- return self._str_map(f, dtype="int64")
- def _str_join(self, sep):
- return self._str_map(sep.join)
- def _str_partition(self, sep, expand):
- result = self._str_map(lambda x: x.partition(sep), dtype="object")
- return result
- def _str_rpartition(self, sep, expand):
- return self._str_map(lambda x: x.rpartition(sep), dtype="object")
- def _str_len(self):
- return self._str_map(len, dtype="int64")
- def _str_slice(self, start=None, stop=None, step=None):
- obj = slice(start, stop, step)
- return self._str_map(lambda x: x[obj])
- def _str_slice_replace(self, start=None, stop=None, repl=None):
- if repl is None:
- repl = ""
- def f(x):
- if x[start:stop] == "":
- local_stop = start
- else:
- local_stop = stop
- y = ""
- if start is not None:
- y += x[:start]
- y += repl
- if stop is not None:
- y += x[local_stop:]
- return y
- return self._str_map(f)
- def _str_split(
- self,
- pat: str | re.Pattern | None = None,
- n=-1,
- expand: bool = False,
- regex: bool | None = None,
- ):
- if pat is None:
- if n is None or n == 0:
- n = -1
- f = lambda x: x.split(pat, n)
- else:
- new_pat: str | re.Pattern
- if regex is True or isinstance(pat, re.Pattern):
- new_pat = re.compile(pat)
- elif regex is False:
- new_pat = pat
- # regex is None so link to old behavior #43563
- else:
- if len(pat) == 1:
- new_pat = pat
- else:
- new_pat = re.compile(pat)
- if isinstance(new_pat, re.Pattern):
- if n is None or n == -1:
- n = 0
- f = lambda x: new_pat.split(x, maxsplit=n)
- else:
- if n is None or n == 0:
- n = -1
- f = lambda x: x.split(pat, n)
- return self._str_map(f, dtype=object)
- def _str_rsplit(self, pat=None, n=-1):
- if n is None or n == 0:
- n = -1
- f = lambda x: x.rsplit(pat, n)
- return self._str_map(f, dtype="object")
- def _str_translate(self, table):
- return self._str_map(lambda x: x.translate(table))
- def _str_wrap(self, width, **kwargs):
- kwargs["width"] = width
- tw = textwrap.TextWrapper(**kwargs)
- return self._str_map(lambda s: "\n".join(tw.wrap(s)))
- def _str_get_dummies(self, sep: str = "|"):
- from pandas import Series
- arr = Series(self).fillna("")
- try:
- arr = sep + arr + sep
- except (TypeError, NotImplementedError):
- arr = sep + arr.astype(str) + sep
- tags: set[str] = set()
- for ts in Series(arr, copy=False).str.split(sep):
- tags.update(ts)
- tags2 = sorted(tags - {""})
- dummies = np.empty((len(arr), len(tags2)), dtype=np.int64)
- def _isin(test_elements: str, element: str) -> bool:
- return element in test_elements
- for i, t in enumerate(tags2):
- pat = sep + t + sep
- dummies[:, i] = lib.map_infer(
- arr.to_numpy(), functools.partial(_isin, element=pat)
- )
- return dummies, tags2
- def _str_upper(self):
- return self._str_map(lambda x: x.upper())
- def _str_isalnum(self):
- return self._str_map(str.isalnum, dtype="bool")
- def _str_isalpha(self):
- return self._str_map(str.isalpha, dtype="bool")
- def _str_isdecimal(self):
- return self._str_map(str.isdecimal, dtype="bool")
- def _str_isdigit(self):
- return self._str_map(str.isdigit, dtype="bool")
- def _str_islower(self):
- return self._str_map(str.islower, dtype="bool")
- def _str_isnumeric(self):
- return self._str_map(str.isnumeric, dtype="bool")
- def _str_isspace(self):
- return self._str_map(str.isspace, dtype="bool")
- def _str_istitle(self):
- return self._str_map(str.istitle, dtype="bool")
- def _str_isupper(self):
- return self._str_map(str.isupper, dtype="bool")
- def _str_capitalize(self):
- return self._str_map(str.capitalize)
- def _str_casefold(self):
- return self._str_map(str.casefold)
- def _str_title(self):
- return self._str_map(str.title)
- def _str_swapcase(self):
- return self._str_map(str.swapcase)
- def _str_lower(self):
- return self._str_map(str.lower)
- def _str_normalize(self, form):
- f = lambda x: unicodedata.normalize(form, x)
- return self._str_map(f)
- def _str_strip(self, to_strip=None):
- return self._str_map(lambda x: x.strip(to_strip))
- def _str_lstrip(self, to_strip=None):
- return self._str_map(lambda x: x.lstrip(to_strip))
- def _str_rstrip(self, to_strip=None):
- return self._str_map(lambda x: x.rstrip(to_strip))
- def _str_removeprefix(self, prefix: str) -> Series:
- # outstanding question on whether to use native methods for users on Python 3.9+
- # https://github.com/pandas-dev/pandas/pull/39226#issuecomment-836719770,
- # in which case we could do return self._str_map(str.removeprefix)
- def removeprefix(text: str) -> str:
- if text.startswith(prefix):
- return text[len(prefix) :]
- return text
- return self._str_map(removeprefix)
- def _str_removesuffix(self, suffix: str) -> Series:
- if sys.version_info < (3, 9):
- # NOTE pyupgrade will remove this when we run it with --py39-plus
- # so don't remove the unnecessary `else` statement below
- from pandas.util._str_methods import removesuffix
- return self._str_map(functools.partial(removesuffix, suffix=suffix))
- else:
- return self._str_map(lambda x: x.removesuffix(suffix))
- def _str_extract(self, pat: str, flags: int = 0, expand: bool = True):
- regex = re.compile(pat, flags=flags)
- na_value = self._str_na_value
- if not expand:
- def g(x):
- m = regex.search(x)
- return m.groups()[0] if m else na_value
- return self._str_map(g, convert=False)
- empty_row = [na_value] * regex.groups
- def f(x):
- if not isinstance(x, str):
- return empty_row
- m = regex.search(x)
- if m:
- return [na_value if item is None else item for item in m.groups()]
- else:
- return empty_row
- return [f(val) for val in np.asarray(self)]
|