| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 | from __future__ import annotationsfrom dataclasses import dataclassfrom typing import Any, Callableimport numpy as npfrom numpy import ndarrayimport pandas as pdfrom pandas import DataFrametry:    from scipy.stats import gaussian_kde    _no_scipy = Falseexcept ImportError:    from seaborn.external.kde import gaussian_kde    _no_scipy = Truefrom seaborn._core.groupby import GroupByfrom seaborn._core.scales import Scalefrom seaborn._stats.base import Stat@dataclassclass KDE(Stat):    """    Compute a univariate kernel density estimate.    Parameters    ----------    bw_adjust : float        Factor that multiplicatively scales the value chosen using        `bw_method`. Increasing will make the curve smoother. See Notes.    bw_method : string, scalar, or callable        Method for determining the smoothing bandwidth to use. Passed directly        to :class:`scipy.stats.gaussian_kde`; see there for options.    common_norm : bool or list of variables        If `True`, normalize so that the areas of all curves sums to 1.        If `False`, normalize each curve independently. If a list, defines        variable(s) to group by and normalize within.    common_grid : bool or list of variables        If `True`, all curves will share the same evaluation grid.        If `False`, each evaluation grid is independent. If a list, defines        variable(s) to group by and share a grid within.    gridsize : int or None        Number of points in the evaluation grid. If None, the density is        evaluated at the original datapoints.    cut : float        Factor, multiplied by the kernel bandwidth, that determines how far        the evaluation grid extends past the extreme datapoints. When set to 0,        the curve is truncated at the data limits.    cumulative : bool        If True, estimate a cumulative distribution function. Requires scipy.    Notes    -----    The *bandwidth*, or standard deviation of the smoothing kernel, is an    important parameter. Much like histogram bin width, using the wrong    bandwidth can produce a distorted representation. Over-smoothing can erase    true features, while under-smoothing can create false ones. The default    uses a rule-of-thumb that works best for distributions that are roughly    bell-shaped. It is a good idea to check the default by varying `bw_adjust`.    Because the smoothing is performed with a Gaussian kernel, the estimated    density curve can extend to values that may not make sense. For example, the    curve may be drawn over negative values when data that are naturally    positive. The `cut` parameter can be used to control the evaluation range,    but datasets that have many observations close to a natural boundary may be    better served by a different method.    Similar distortions may arise when a dataset is naturally discrete or "spiky"    (containing many repeated observations of the same value). KDEs will always    produce a smooth curve, which could be misleading.    The units on the density axis are a common source of confusion. While kernel    density estimation produces a probability distribution, the height of the curve    at each point gives a density, not a probability. A probability can be obtained    only by integrating the density across a range. The curve is normalized so    that the integral over all possible values is 1, meaning that the scale of    the density axis depends on the data values.    If scipy is installed, its cython-accelerated implementation will be used.    Examples    --------    .. include:: ../docstrings/objects.KDE.rst    """    bw_adjust: float = 1    bw_method: str | float | Callable[[gaussian_kde], float] = "scott"    common_norm: bool | list[str] = True    common_grid: bool | list[str] = True    gridsize: int | None = 200    cut: float = 3    cumulative: bool = False    def __post_init__(self):        if self.cumulative and _no_scipy:            raise RuntimeError("Cumulative KDE evaluation requires scipy")    def _check_var_list_or_boolean(self, param: str, grouping_vars: Any) -> None:        """Do input checks on grouping parameters."""        value = getattr(self, param)        if not (            isinstance(value, bool)            or (isinstance(value, list) and all(isinstance(v, str) for v in value))        ):            param_name = f"{self.__class__.__name__}.{param}"            raise TypeError(f"{param_name} must be a boolean or list of strings.")        self._check_grouping_vars(param, grouping_vars, stacklevel=3)    def _fit(self, data: DataFrame, orient: str) -> gaussian_kde:        """Fit and return a KDE object."""        # TODO need to handle singular data        fit_kws: dict[str, Any] = {"bw_method": self.bw_method}        if "weight" in data:            fit_kws["weights"] = data["weight"]        kde = gaussian_kde(data[orient], **fit_kws)        kde.set_bandwidth(kde.factor * self.bw_adjust)        return kde    def _get_support(self, data: DataFrame, orient: str) -> ndarray:        """Define the grid that the KDE will be evaluated on."""        if self.gridsize is None:            return data[orient].to_numpy()        kde = self._fit(data, orient)        bw = np.sqrt(kde.covariance.squeeze())        gridmin = data[orient].min() - bw * self.cut        gridmax = data[orient].max() + bw * self.cut        return np.linspace(gridmin, gridmax, self.gridsize)    def _fit_and_evaluate(        self, data: DataFrame, orient: str, support: ndarray    ) -> DataFrame:        """Transform single group by fitting a KDE and evaluating on a support grid."""        empty = pd.DataFrame(columns=[orient, "weight", "density"], dtype=float)        if len(data) < 2:            return empty        try:            kde = self._fit(data, orient)        except np.linalg.LinAlgError:            return empty        if self.cumulative:            s_0 = support[0]            density = np.array([kde.integrate_box_1d(s_0, s_i) for s_i in support])        else:            density = kde(support)        weight = data["weight"].sum()        return pd.DataFrame({orient: support, "weight": weight, "density": density})    def _transform(        self, data: DataFrame, orient: str, grouping_vars: list[str]    ) -> DataFrame:        """Transform multiple groups by fitting KDEs and evaluating."""        empty = pd.DataFrame(columns=[*data.columns, "density"], dtype=float)        if len(data) < 2:            return empty        try:            support = self._get_support(data, orient)        except np.linalg.LinAlgError:            return empty        grouping_vars = [x for x in grouping_vars if data[x].nunique() > 1]        if not grouping_vars:            return self._fit_and_evaluate(data, orient, support)        groupby = GroupBy(grouping_vars)        return groupby.apply(data, self._fit_and_evaluate, orient, support)    def __call__(        self, data: DataFrame, groupby: GroupBy, orient: str, scales: dict[str, Scale],    ) -> DataFrame:        if "weight" not in data:            data = data.assign(weight=1)        data = data.dropna(subset=[orient, "weight"])        # Transform each group separately        grouping_vars = [str(v) for v in data if v in groupby.order]        if not grouping_vars or self.common_grid is True:            res = self._transform(data, orient, grouping_vars)        else:            if self.common_grid is False:                grid_vars = grouping_vars            else:                self._check_var_list_or_boolean("common_grid", grouping_vars)                grid_vars = [v for v in self.common_grid if v in grouping_vars]            res = (                GroupBy(grid_vars)                .apply(data, self._transform, orient, grouping_vars)            )        # Normalize, potentially within groups        if not grouping_vars or self.common_norm is True:            res = res.assign(group_weight=data["weight"].sum())        else:            if self.common_norm is False:                norm_vars = grouping_vars            else:                self._check_var_list_or_boolean("common_norm", grouping_vars)                norm_vars = [v for v in self.common_norm if v in grouping_vars]            res = res.join(                data.groupby(norm_vars)["weight"].sum().rename("group_weight"),                on=norm_vars,            )        res["density"] *= res.eval("weight / group_weight")        value = {"x": "y", "y": "x"}[orient]        res[value] = res["density"]        return res.drop(["weight", "group_weight"], axis=1)
 |