123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- import numpy as np
- import pytest
- from pandas import (
- DataFrame,
- Series,
- concat,
- )
- import pandas._testing as tm
- def create_mock_weights(obj, com, adjust, ignore_na):
- if isinstance(obj, DataFrame):
- if not len(obj.columns):
- return DataFrame(index=obj.index, columns=obj.columns)
- w = concat(
- [
- create_mock_series_weights(
- obj.iloc[:, i], com=com, adjust=adjust, ignore_na=ignore_na
- )
- for i in range(len(obj.columns))
- ],
- axis=1,
- )
- w.index = obj.index
- w.columns = obj.columns
- return w
- else:
- return create_mock_series_weights(obj, com, adjust, ignore_na)
- def create_mock_series_weights(s, com, adjust, ignore_na):
- w = Series(np.nan, index=s.index, name=s.name)
- alpha = 1.0 / (1.0 + com)
- if adjust:
- count = 0
- for i in range(len(s)):
- if s.iat[i] == s.iat[i]:
- w.iat[i] = pow(1.0 / (1.0 - alpha), count)
- count += 1
- elif not ignore_na:
- count += 1
- else:
- sum_wts = 0.0
- prev_i = -1
- count = 0
- for i in range(len(s)):
- if s.iat[i] == s.iat[i]:
- if prev_i == -1:
- w.iat[i] = 1.0
- else:
- w.iat[i] = alpha * sum_wts / pow(1.0 - alpha, count - prev_i)
- sum_wts += w.iat[i]
- prev_i = count
- count += 1
- elif not ignore_na:
- count += 1
- return w
- def test_ewm_consistency_mean(all_data, adjust, ignore_na, min_periods):
- com = 3.0
- result = all_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).mean()
- weights = create_mock_weights(all_data, com=com, adjust=adjust, ignore_na=ignore_na)
- expected = (
- all_data.multiply(weights)
- .cumsum()
- .divide(weights.cumsum())
- .fillna(method="ffill")
- )
- expected[
- all_data.expanding().count() < (max(min_periods, 1) if min_periods else 1)
- ] = np.nan
- tm.assert_equal(result, expected.astype("float64"))
- def test_ewm_consistency_consistent(consistent_data, adjust, ignore_na, min_periods):
- com = 3.0
- count_x = consistent_data.expanding().count()
- mean_x = consistent_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).mean()
- # check that correlation of a series with itself is either 1 or NaN
- corr_x_x = consistent_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).corr(consistent_data)
- exp = (
- consistent_data.max()
- if isinstance(consistent_data, Series)
- else consistent_data.max().max()
- )
- # check mean of constant series
- expected = consistent_data * np.nan
- expected[count_x >= max(min_periods, 1)] = exp
- tm.assert_equal(mean_x, expected)
- # check correlation of constant series with itself is NaN
- expected[:] = np.nan
- tm.assert_equal(corr_x_x, expected)
- def test_ewm_consistency_var_debiasing_factors(
- all_data, adjust, ignore_na, min_periods
- ):
- com = 3.0
- # check variance debiasing factors
- var_unbiased_x = all_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).var(bias=False)
- var_biased_x = all_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).var(bias=True)
- weights = create_mock_weights(all_data, com=com, adjust=adjust, ignore_na=ignore_na)
- cum_sum = weights.cumsum().fillna(method="ffill")
- cum_sum_sq = (weights * weights).cumsum().fillna(method="ffill")
- numerator = cum_sum * cum_sum
- denominator = numerator - cum_sum_sq
- denominator[denominator <= 0.0] = np.nan
- var_debiasing_factors_x = numerator / denominator
- tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x)
- @pytest.mark.parametrize("bias", [True, False])
- def test_moments_consistency_var(all_data, adjust, ignore_na, min_periods, bias):
- com = 3.0
- mean_x = all_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).mean()
- var_x = all_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).var(bias=bias)
- assert not (var_x < 0).any().any()
- if bias:
- # check that biased var(x) == mean(x^2) - mean(x)^2
- mean_x2 = (
- (all_data * all_data)
- .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
- .mean()
- )
- tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x))
- @pytest.mark.parametrize("bias", [True, False])
- def test_moments_consistency_var_constant(
- consistent_data, adjust, ignore_na, min_periods, bias
- ):
- com = 3.0
- count_x = consistent_data.expanding(min_periods=min_periods).count()
- var_x = consistent_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).var(bias=bias)
- # check that variance of constant series is identically 0
- assert not (var_x > 0).any().any()
- expected = consistent_data * np.nan
- expected[count_x >= max(min_periods, 1)] = 0.0
- if not bias:
- expected[count_x < 2] = np.nan
- tm.assert_equal(var_x, expected)
- @pytest.mark.parametrize("bias", [True, False])
- def test_ewm_consistency_std(all_data, adjust, ignore_na, min_periods, bias):
- com = 3.0
- var_x = all_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).var(bias=bias)
- assert not (var_x < 0).any().any()
- std_x = all_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).std(bias=bias)
- assert not (std_x < 0).any().any()
- # check that var(x) == std(x)^2
- tm.assert_equal(var_x, std_x * std_x)
- cov_x_x = all_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).cov(all_data, bias=bias)
- assert not (cov_x_x < 0).any().any()
- # check that var(x) == cov(x, x)
- tm.assert_equal(var_x, cov_x_x)
- @pytest.mark.parametrize("bias", [True, False])
- def test_ewm_consistency_series_cov_corr(
- series_data, adjust, ignore_na, min_periods, bias
- ):
- com = 3.0
- var_x_plus_y = (
- (series_data + series_data)
- .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
- .var(bias=bias)
- )
- var_x = series_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).var(bias=bias)
- var_y = series_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).var(bias=bias)
- cov_x_y = series_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).cov(series_data, bias=bias)
- # check that cov(x, y) == (var(x+y) - var(x) -
- # var(y)) / 2
- tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))
- # check that corr(x, y) == cov(x, y) / (std(x) *
- # std(y))
- corr_x_y = series_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).corr(series_data)
- std_x = series_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).std(bias=bias)
- std_y = series_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).std(bias=bias)
- tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))
- if bias:
- # check that biased cov(x, y) == mean(x*y) -
- # mean(x)*mean(y)
- mean_x = series_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).mean()
- mean_y = series_data.ewm(
- com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
- ).mean()
- mean_x_times_y = (
- (series_data * series_data)
- .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
- .mean()
- )
- tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))
|