123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- import numpy as np
- import pytest
- from pandas import Series
- import pandas._testing as tm
- def no_nans(x):
- return x.notna().all().all()
- def all_na(x):
- return x.isnull().all().all()
- @pytest.fixture(params=[(1, 0), (5, 1)])
- def rolling_consistency_cases(request):
- """window, min_periods"""
- return request.param
- @pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum, np.sum])
- def test_rolling_apply_consistency_sum(
- request, all_data, rolling_consistency_cases, center, f
- ):
- window, min_periods = rolling_consistency_cases
- if f is np.sum:
- if not no_nans(all_data) and not (
- all_na(all_data) and not all_data.empty and min_periods > 0
- ):
- request.node.add_marker(
- pytest.mark.xfail(reason="np.sum has different behavior with NaNs")
- )
- rolling_f_result = all_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).sum()
- rolling_apply_f_result = all_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).apply(func=f, raw=True)
- tm.assert_equal(rolling_f_result, rolling_apply_f_result)
- @pytest.mark.parametrize("ddof", [0, 1])
- def test_moments_consistency_var(all_data, rolling_consistency_cases, center, ddof):
- window, min_periods = rolling_consistency_cases
- var_x = all_data.rolling(window=window, min_periods=min_periods, center=center).var(
- ddof=ddof
- )
- assert not (var_x < 0).any().any()
- if ddof == 0:
- # check that biased var(x) == mean(x^2) - mean(x)^2
- mean_x = all_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).mean()
- mean_x2 = (
- (all_data * all_data)
- .rolling(window=window, min_periods=min_periods, center=center)
- .mean()
- )
- tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x))
- @pytest.mark.parametrize("ddof", [0, 1])
- def test_moments_consistency_var_constant(
- consistent_data, rolling_consistency_cases, center, ddof
- ):
- window, min_periods = rolling_consistency_cases
- count_x = consistent_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).count()
- var_x = consistent_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).var(ddof=ddof)
- # check that variance of constant series is identically 0
- assert not (var_x > 0).any().any()
- expected = consistent_data * np.nan
- expected[count_x >= max(min_periods, 1)] = 0.0
- if ddof == 1:
- expected[count_x < 2] = np.nan
- tm.assert_equal(var_x, expected)
- @pytest.mark.parametrize("ddof", [0, 1])
- def test_rolling_consistency_var_std_cov(
- all_data, rolling_consistency_cases, center, ddof
- ):
- window, min_periods = rolling_consistency_cases
- var_x = all_data.rolling(window=window, min_periods=min_periods, center=center).var(
- ddof=ddof
- )
- assert not (var_x < 0).any().any()
- std_x = all_data.rolling(window=window, min_periods=min_periods, center=center).std(
- ddof=ddof
- )
- assert not (std_x < 0).any().any()
- # check that var(x) == std(x)^2
- tm.assert_equal(var_x, std_x * std_x)
- cov_x_x = all_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).cov(all_data, ddof=ddof)
- assert not (cov_x_x < 0).any().any()
- # check that var(x) == cov(x, x)
- tm.assert_equal(var_x, cov_x_x)
- @pytest.mark.parametrize("ddof", [0, 1])
- def test_rolling_consistency_series_cov_corr(
- series_data, rolling_consistency_cases, center, ddof
- ):
- window, min_periods = rolling_consistency_cases
- var_x_plus_y = (
- (series_data + series_data)
- .rolling(window=window, min_periods=min_periods, center=center)
- .var(ddof=ddof)
- )
- var_x = series_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).var(ddof=ddof)
- var_y = series_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).var(ddof=ddof)
- cov_x_y = series_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).cov(series_data, ddof=ddof)
- # check that cov(x, y) == (var(x+y) - var(x) -
- # var(y)) / 2
- tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))
- # check that corr(x, y) == cov(x, y) / (std(x) *
- # std(y))
- corr_x_y = series_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).corr(series_data)
- std_x = series_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).std(ddof=ddof)
- std_y = series_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).std(ddof=ddof)
- tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))
- if ddof == 0:
- # check that biased cov(x, y) == mean(x*y) -
- # mean(x)*mean(y)
- mean_x = series_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).mean()
- mean_y = series_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).mean()
- mean_x_times_y = (
- (series_data * series_data)
- .rolling(window=window, min_periods=min_periods, center=center)
- .mean()
- )
- tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))
- def test_rolling_consistency_mean(all_data, rolling_consistency_cases, center):
- window, min_periods = rolling_consistency_cases
- result = all_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).mean()
- expected = (
- all_data.rolling(window=window, min_periods=min_periods, center=center)
- .sum()
- .divide(
- all_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).count()
- )
- )
- tm.assert_equal(result, expected.astype("float64"))
- def test_rolling_consistency_constant(
- consistent_data, rolling_consistency_cases, center
- ):
- window, min_periods = rolling_consistency_cases
- count_x = consistent_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).count()
- mean_x = consistent_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).mean()
- # check that correlation of a series with itself is either 1 or NaN
- corr_x_x = consistent_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).corr(consistent_data)
- exp = (
- consistent_data.max()
- if isinstance(consistent_data, Series)
- else consistent_data.max().max()
- )
- # check mean of constant series
- expected = consistent_data * np.nan
- expected[count_x >= max(min_periods, 1)] = exp
- tm.assert_equal(mean_x, expected)
- # check correlation of constant series with itself is NaN
- expected[:] = np.nan
- tm.assert_equal(corr_x_x, expected)
- def test_rolling_consistency_var_debiasing_factors(
- all_data, rolling_consistency_cases, center
- ):
- window, min_periods = rolling_consistency_cases
- # check variance debiasing factors
- var_unbiased_x = all_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).var()
- var_biased_x = all_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).var(ddof=0)
- var_debiasing_factors_x = (
- all_data.rolling(window=window, min_periods=min_periods, center=center)
- .count()
- .divide(
- (
- all_data.rolling(
- window=window, min_periods=min_periods, center=center
- ).count()
- - 1.0
- ).replace(0.0, np.nan)
- )
- )
- tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x)
|