123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- from __future__ import annotations
- from typing import TYPE_CHECKING
- import numpy as np
- from pandas.compat._optional import import_optional_dependency
- def generate_online_numba_ewma_func(
- nopython: bool,
- nogil: bool,
- parallel: bool,
- ):
- """
- Generate a numba jitted groupby ewma function specified by values
- from engine_kwargs.
- Parameters
- ----------
- nopython : bool
- nopython to be passed into numba.jit
- nogil : bool
- nogil to be passed into numba.jit
- parallel : bool
- parallel to be passed into numba.jit
- Returns
- -------
- Numba function
- """
- if TYPE_CHECKING:
- import numba
- else:
- numba = import_optional_dependency("numba")
- @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
- def online_ewma(
- values: np.ndarray,
- deltas: np.ndarray,
- minimum_periods: int,
- old_wt_factor: float,
- new_wt: float,
- old_wt: np.ndarray,
- adjust: bool,
- ignore_na: bool,
- ):
- """
- Compute online exponentially weighted mean per column over 2D values.
- Takes the first observation as is, then computes the subsequent
- exponentially weighted mean accounting minimum periods.
- """
- result = np.empty(values.shape)
- weighted_avg = values[0]
- nobs = (~np.isnan(weighted_avg)).astype(np.int64)
- result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan)
- for i in range(1, len(values)):
- cur = values[i]
- is_observations = ~np.isnan(cur)
- nobs += is_observations.astype(np.int64)
- for j in numba.prange(len(cur)):
- if not np.isnan(weighted_avg[j]):
- if is_observations[j] or not ignore_na:
- # note that len(deltas) = len(vals) - 1 and deltas[i] is to be
- # used in conjunction with vals[i+1]
- old_wt[j] *= old_wt_factor ** deltas[j - 1]
- if is_observations[j]:
- # avoid numerical errors on constant series
- if weighted_avg[j] != cur[j]:
- weighted_avg[j] = (
- (old_wt[j] * weighted_avg[j]) + (new_wt * cur[j])
- ) / (old_wt[j] + new_wt)
- if adjust:
- old_wt[j] += new_wt
- else:
- old_wt[j] = 1.0
- elif is_observations[j]:
- weighted_avg[j] = cur[j]
- result[i] = np.where(nobs >= minimum_periods, weighted_avg, np.nan)
- return result, old_wt
- return online_ewma
- class EWMMeanState:
- def __init__(self, com, adjust, ignore_na, axis, shape) -> None:
- alpha = 1.0 / (1.0 + com)
- self.axis = axis
- self.shape = shape
- self.adjust = adjust
- self.ignore_na = ignore_na
- self.new_wt = 1.0 if adjust else alpha
- self.old_wt_factor = 1.0 - alpha
- self.old_wt = np.ones(self.shape[self.axis - 1])
- self.last_ewm = None
- def run_ewm(self, weighted_avg, deltas, min_periods, ewm_func):
- result, old_wt = ewm_func(
- weighted_avg,
- deltas,
- min_periods,
- self.old_wt_factor,
- self.new_wt,
- self.old_wt,
- self.adjust,
- self.ignore_na,
- )
- self.old_wt = old_wt
- self.last_ewm = result[-1]
- return result
- def reset(self) -> None:
- self.old_wt = np.ones(self.shape[self.axis - 1])
- self.last_ewm = None
|