123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953 |
- # cython: boundscheck=False, wraparound=False, cdivision=True
- from libc.math cimport (
- round,
- signbit,
- sqrt,
- )
- from libcpp.deque cimport deque
- from pandas._libs.algos cimport TiebreakEnumType
- import numpy as np
- cimport numpy as cnp
- from numpy cimport (
- float32_t,
- float64_t,
- int64_t,
- ndarray,
- )
- cnp.import_array()
- import cython
- from pandas._libs.algos import is_monotonic
- cdef extern from "../src/skiplist.h":
- ctypedef struct node_t:
- node_t **next
- int *width
- double value
- int is_nil
- int levels
- int ref_count
- ctypedef struct skiplist_t:
- node_t *head
- node_t **tmp_chain
- int *tmp_steps
- int size
- int maxlevels
- skiplist_t* skiplist_init(int) nogil
- void skiplist_destroy(skiplist_t*) nogil
- double skiplist_get(skiplist_t*, int, int*) nogil
- int skiplist_insert(skiplist_t*, double) nogil
- int skiplist_remove(skiplist_t*, double) nogil
- int skiplist_rank(skiplist_t*, double) nogil
- int skiplist_min_rank(skiplist_t*, double) nogil
- cdef:
- float32_t MINfloat32 = np.NINF
- float64_t MINfloat64 = np.NINF
- float32_t MAXfloat32 = np.inf
- float64_t MAXfloat64 = np.inf
- float64_t NaN = <float64_t>np.NaN
- cdef bint is_monotonic_increasing_start_end_bounds(
- ndarray[int64_t, ndim=1] start, ndarray[int64_t, ndim=1] end
- ):
- return is_monotonic(start, False)[0] and is_monotonic(end, False)[0]
- # ----------------------------------------------------------------------
- # Rolling sum
- cdef float64_t calc_sum(int64_t minp, int64_t nobs, float64_t sum_x,
- int64_t num_consecutive_same_value, float64_t prev_value
- ) nogil:
- cdef:
- float64_t result
- if nobs == 0 == minp:
- result = 0
- elif nobs >= minp:
- if num_consecutive_same_value >= nobs:
- result = prev_value * nobs
- else:
- result = sum_x
- else:
- result = NaN
- return result
- cdef void add_sum(float64_t val, int64_t *nobs, float64_t *sum_x,
- float64_t *compensation, int64_t *num_consecutive_same_value,
- float64_t *prev_value) nogil:
- """ add a value from the sum calc using Kahan summation """
- cdef:
- float64_t y, t
- # Not NaN
- if val == val:
- nobs[0] = nobs[0] + 1
- y = val - compensation[0]
- t = sum_x[0] + y
- compensation[0] = t - sum_x[0] - y
- sum_x[0] = t
- # GH#42064, record num of same values to remove floating point artifacts
- if val == prev_value[0]:
- num_consecutive_same_value[0] += 1
- else:
- # reset to 1 (include current value itself)
- num_consecutive_same_value[0] = 1
- prev_value[0] = val
- cdef void remove_sum(float64_t val, int64_t *nobs, float64_t *sum_x,
- float64_t *compensation) nogil:
- """ remove a value from the sum calc using Kahan summation """
- cdef:
- float64_t y, t
- # Not NaN
- if val == val:
- nobs[0] = nobs[0] - 1
- y = - val - compensation[0]
- t = sum_x[0] + y
- compensation[0] = t - sum_x[0] - y
- sum_x[0] = t
- def roll_sum(const float64_t[:] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp) -> np.ndarray:
- cdef:
- Py_ssize_t i, j
- float64_t sum_x, compensation_add, compensation_remove, prev_value
- int64_t s, e, num_consecutive_same_value
- int64_t nobs = 0, N = len(start)
- ndarray[float64_t] output
- bint is_monotonic_increasing_bounds
- is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
- start, end
- )
- output = np.empty(N, dtype=np.float64)
- with nogil:
- for i in range(0, N):
- s = start[i]
- e = end[i]
- if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
- # setup
- prev_value = values[s]
- num_consecutive_same_value = 0
- sum_x = compensation_add = compensation_remove = 0
- nobs = 0
- for j in range(s, e):
- add_sum(values[j], &nobs, &sum_x, &compensation_add,
- &num_consecutive_same_value, &prev_value)
- else:
- # calculate deletes
- for j in range(start[i - 1], s):
- remove_sum(values[j], &nobs, &sum_x, &compensation_remove)
- # calculate adds
- for j in range(end[i - 1], e):
- add_sum(values[j], &nobs, &sum_x, &compensation_add,
- &num_consecutive_same_value, &prev_value)
- output[i] = calc_sum(
- minp, nobs, sum_x, num_consecutive_same_value, prev_value
- )
- if not is_monotonic_increasing_bounds:
- nobs = 0
- sum_x = 0.0
- compensation_remove = 0.0
- return output
- # ----------------------------------------------------------------------
- # Rolling mean
- cdef float64_t calc_mean(int64_t minp, Py_ssize_t nobs, Py_ssize_t neg_ct,
- float64_t sum_x, int64_t num_consecutive_same_value,
- float64_t prev_value) nogil:
- cdef:
- float64_t result
- if nobs >= minp and nobs > 0:
- result = sum_x / <float64_t>nobs
- if num_consecutive_same_value >= nobs:
- result = prev_value
- elif neg_ct == 0 and result < 0:
- # all positive
- result = 0
- elif neg_ct == nobs and result > 0:
- # all negative
- result = 0
- else:
- pass
- else:
- result = NaN
- return result
- cdef void add_mean(
- float64_t val,
- Py_ssize_t *nobs,
- float64_t *sum_x,
- Py_ssize_t *neg_ct,
- float64_t *compensation,
- int64_t *num_consecutive_same_value,
- float64_t *prev_value
- ) nogil:
- """ add a value from the mean calc using Kahan summation """
- cdef:
- float64_t y, t
- # Not NaN
- if val == val:
- nobs[0] = nobs[0] + 1
- y = val - compensation[0]
- t = sum_x[0] + y
- compensation[0] = t - sum_x[0] - y
- sum_x[0] = t
- if signbit(val):
- neg_ct[0] = neg_ct[0] + 1
- # GH#42064, record num of same values to remove floating point artifacts
- if val == prev_value[0]:
- num_consecutive_same_value[0] += 1
- else:
- # reset to 1 (include current value itself)
- num_consecutive_same_value[0] = 1
- prev_value[0] = val
- cdef void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x,
- Py_ssize_t *neg_ct, float64_t *compensation) nogil:
- """ remove a value from the mean calc using Kahan summation """
- cdef:
- float64_t y, t
- if val == val:
- nobs[0] = nobs[0] - 1
- y = - val - compensation[0]
- t = sum_x[0] + y
- compensation[0] = t - sum_x[0] - y
- sum_x[0] = t
- if signbit(val):
- neg_ct[0] = neg_ct[0] - 1
- def roll_mean(const float64_t[:] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp) -> np.ndarray:
- cdef:
- float64_t val, compensation_add, compensation_remove, sum_x, prev_value
- int64_t s, e, num_consecutive_same_value
- Py_ssize_t nobs, i, j, neg_ct, N = len(start)
- ndarray[float64_t] output
- bint is_monotonic_increasing_bounds
- is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
- start, end
- )
- output = np.empty(N, dtype=np.float64)
- with nogil:
- for i in range(0, N):
- s = start[i]
- e = end[i]
- if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
- # setup
- compensation_add = compensation_remove = sum_x = 0
- nobs = neg_ct = 0
- prev_value = values[s]
- num_consecutive_same_value = 0
- for j in range(s, e):
- val = values[j]
- add_mean(val, &nobs, &sum_x, &neg_ct, &compensation_add,
- &num_consecutive_same_value, &prev_value)
- else:
- # calculate deletes
- for j in range(start[i - 1], s):
- val = values[j]
- remove_mean(val, &nobs, &sum_x, &neg_ct, &compensation_remove)
- # calculate adds
- for j in range(end[i - 1], e):
- val = values[j]
- add_mean(val, &nobs, &sum_x, &neg_ct, &compensation_add,
- &num_consecutive_same_value, &prev_value)
- output[i] = calc_mean(
- minp, nobs, neg_ct, sum_x, num_consecutive_same_value, prev_value
- )
- if not is_monotonic_increasing_bounds:
- nobs = 0
- neg_ct = 0
- sum_x = 0.0
- compensation_remove = 0.0
- return output
- # ----------------------------------------------------------------------
- # Rolling variance
- cdef float64_t calc_var(
- int64_t minp,
- int ddof,
- float64_t nobs,
- float64_t ssqdm_x,
- int64_t num_consecutive_same_value
- ) nogil:
- cdef:
- float64_t result
- # Variance is unchanged if no observation is added or removed
- if (nobs >= minp) and (nobs > ddof):
- # pathological case & repeatedly same values case
- if nobs == 1 or num_consecutive_same_value >= nobs:
- result = 0
- else:
- result = ssqdm_x / (nobs - <float64_t>ddof)
- else:
- result = NaN
- return result
- cdef void add_var(
- float64_t val,
- float64_t *nobs,
- float64_t *mean_x,
- float64_t *ssqdm_x,
- float64_t *compensation,
- int64_t *num_consecutive_same_value,
- float64_t *prev_value,
- ) nogil:
- """ add a value from the var calc """
- cdef:
- float64_t delta, prev_mean, y, t
- # GH#21813, if msvc 2017 bug is resolved, we should be OK with != instead of `isnan`
- if val != val:
- return
- nobs[0] = nobs[0] + 1
- # GH#42064, record num of same values to remove floating point artifacts
- if val == prev_value[0]:
- num_consecutive_same_value[0] += 1
- else:
- # reset to 1 (include current value itself)
- num_consecutive_same_value[0] = 1
- prev_value[0] = val
- # Welford's method for the online variance-calculation
- # using Kahan summation
- # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
- prev_mean = mean_x[0] - compensation[0]
- y = val - compensation[0]
- t = y - mean_x[0]
- compensation[0] = t + mean_x[0] - y
- delta = t
- if nobs[0]:
- mean_x[0] = mean_x[0] + delta / nobs[0]
- else:
- mean_x[0] = 0
- ssqdm_x[0] = ssqdm_x[0] + (val - prev_mean) * (val - mean_x[0])
- cdef void remove_var(
- float64_t val,
- float64_t *nobs,
- float64_t *mean_x,
- float64_t *ssqdm_x,
- float64_t *compensation
- ) nogil:
- """ remove a value from the var calc """
- cdef:
- float64_t delta, prev_mean, y, t
- if val == val:
- nobs[0] = nobs[0] - 1
- if nobs[0]:
- # Welford's method for the online variance-calculation
- # using Kahan summation
- # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
- prev_mean = mean_x[0] - compensation[0]
- y = val - compensation[0]
- t = y - mean_x[0]
- compensation[0] = t + mean_x[0] - y
- delta = t
- mean_x[0] = mean_x[0] - delta / nobs[0]
- ssqdm_x[0] = ssqdm_x[0] - (val - prev_mean) * (val - mean_x[0])
- else:
- mean_x[0] = 0
- ssqdm_x[0] = 0
- def roll_var(const float64_t[:] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp, int ddof=1) -> np.ndarray:
- """
- Numerically stable implementation using Welford's method.
- """
- cdef:
- float64_t mean_x, ssqdm_x, nobs, compensation_add,
- float64_t compensation_remove, prev_value
- int64_t s, e, num_consecutive_same_value
- Py_ssize_t i, j, N = len(start)
- ndarray[float64_t] output
- bint is_monotonic_increasing_bounds
- minp = max(minp, 1)
- is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
- start, end
- )
- output = np.empty(N, dtype=np.float64)
- with nogil:
- for i in range(0, N):
- s = start[i]
- e = end[i]
- # Over the first window, observations can only be added
- # never removed
- if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
- prev_value = values[s]
- num_consecutive_same_value = 0
- mean_x = ssqdm_x = nobs = compensation_add = compensation_remove = 0
- for j in range(s, e):
- add_var(values[j], &nobs, &mean_x, &ssqdm_x, &compensation_add,
- &num_consecutive_same_value, &prev_value)
- else:
- # After the first window, observations can both be added
- # and removed
- # calculate deletes
- for j in range(start[i - 1], s):
- remove_var(values[j], &nobs, &mean_x, &ssqdm_x,
- &compensation_remove)
- # calculate adds
- for j in range(end[i - 1], e):
- add_var(values[j], &nobs, &mean_x, &ssqdm_x, &compensation_add,
- &num_consecutive_same_value, &prev_value)
- output[i] = calc_var(minp, ddof, nobs, ssqdm_x, num_consecutive_same_value)
- if not is_monotonic_increasing_bounds:
- nobs = 0.0
- mean_x = 0.0
- ssqdm_x = 0.0
- compensation_remove = 0.0
- return output
- # ----------------------------------------------------------------------
- # Rolling skewness
- cdef float64_t calc_skew(int64_t minp, int64_t nobs,
- float64_t x, float64_t xx, float64_t xxx,
- int64_t num_consecutive_same_value
- ) nogil:
- cdef:
- float64_t result, dnobs
- float64_t A, B, C, R
- if nobs >= minp:
- dnobs = <float64_t>nobs
- A = x / dnobs
- B = xx / dnobs - A * A
- C = xxx / dnobs - A * A * A - 3 * A * B
- if nobs < 3:
- result = NaN
- # GH 42064 46431
- # uniform case, force result to be 0
- elif num_consecutive_same_value >= nobs:
- result = 0.0
- # #18044: with uniform distribution, floating issue will
- # cause B != 0. and cause the result is a very
- # large number.
- #
- # in core/nanops.py nanskew/nankurt call the function
- # _zero_out_fperr(m2) to fix floating error.
- # if the variance is less than 1e-14, it could be
- # treat as zero, here we follow the original
- # skew/kurt behaviour to check B <= 1e-14
- elif B <= 1e-14:
- result = NaN
- else:
- R = sqrt(B)
- result = ((sqrt(dnobs * (dnobs - 1.)) * C) /
- ((dnobs - 2) * R * R * R))
- else:
- result = NaN
- return result
- cdef void add_skew(float64_t val, int64_t *nobs,
- float64_t *x, float64_t *xx,
- float64_t *xxx,
- float64_t *compensation_x,
- float64_t *compensation_xx,
- float64_t *compensation_xxx,
- int64_t *num_consecutive_same_value,
- float64_t *prev_value,
- ) nogil:
- """ add a value from the skew calc """
- cdef:
- float64_t y, t
- # Not NaN
- if val == val:
- nobs[0] = nobs[0] + 1
- y = val - compensation_x[0]
- t = x[0] + y
- compensation_x[0] = t - x[0] - y
- x[0] = t
- y = val * val - compensation_xx[0]
- t = xx[0] + y
- compensation_xx[0] = t - xx[0] - y
- xx[0] = t
- y = val * val * val - compensation_xxx[0]
- t = xxx[0] + y
- compensation_xxx[0] = t - xxx[0] - y
- xxx[0] = t
- # GH#42064, record num of same values to remove floating point artifacts
- if val == prev_value[0]:
- num_consecutive_same_value[0] += 1
- else:
- # reset to 1 (include current value itself)
- num_consecutive_same_value[0] = 1
- prev_value[0] = val
- cdef void remove_skew(float64_t val, int64_t *nobs,
- float64_t *x, float64_t *xx,
- float64_t *xxx,
- float64_t *compensation_x,
- float64_t *compensation_xx,
- float64_t *compensation_xxx) nogil:
- """ remove a value from the skew calc """
- cdef:
- float64_t y, t
- # Not NaN
- if val == val:
- nobs[0] = nobs[0] - 1
- y = - val - compensation_x[0]
- t = x[0] + y
- compensation_x[0] = t - x[0] - y
- x[0] = t
- y = - val * val - compensation_xx[0]
- t = xx[0] + y
- compensation_xx[0] = t - xx[0] - y
- xx[0] = t
- y = - val * val * val - compensation_xxx[0]
- t = xxx[0] + y
- compensation_xxx[0] = t - xxx[0] - y
- xxx[0] = t
- def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp) -> np.ndarray:
- cdef:
- Py_ssize_t i, j
- float64_t val, min_val, mean_val, sum_val = 0
- float64_t compensation_xxx_add, compensation_xxx_remove
- float64_t compensation_xx_add, compensation_xx_remove
- float64_t compensation_x_add, compensation_x_remove
- float64_t x, xx, xxx
- float64_t prev_value
- int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0
- int64_t s, e, num_consecutive_same_value
- ndarray[float64_t] output, values_copy
- bint is_monotonic_increasing_bounds
- minp = max(minp, 3)
- is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
- start, end
- )
- output = np.empty(N, dtype=np.float64)
- min_val = np.nanmin(values)
- values_copy = np.copy(values)
- with nogil:
- for i in range(0, V):
- val = values_copy[i]
- if val == val:
- nobs_mean += 1
- sum_val += val
- mean_val = sum_val / nobs_mean
- # Other cases would lead to imprecision for smallest values
- if min_val - mean_val > -1e5:
- mean_val = round(mean_val)
- for i in range(0, V):
- values_copy[i] = values_copy[i] - mean_val
- for i in range(0, N):
- s = start[i]
- e = end[i]
- # Over the first window, observations can only be added
- # never removed
- if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
- prev_value = values[s]
- num_consecutive_same_value = 0
- compensation_xxx_add = compensation_xxx_remove = 0
- compensation_xx_add = compensation_xx_remove = 0
- compensation_x_add = compensation_x_remove = 0
- x = xx = xxx = 0
- nobs = 0
- for j in range(s, e):
- val = values_copy[j]
- add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
- &compensation_xx_add, &compensation_xxx_add,
- &num_consecutive_same_value, &prev_value)
- else:
- # After the first window, observations can both be added
- # and removed
- # calculate deletes
- for j in range(start[i - 1], s):
- val = values_copy[j]
- remove_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_remove,
- &compensation_xx_remove, &compensation_xxx_remove)
- # calculate adds
- for j in range(end[i - 1], e):
- val = values_copy[j]
- add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
- &compensation_xx_add, &compensation_xxx_add,
- &num_consecutive_same_value, &prev_value)
- output[i] = calc_skew(minp, nobs, x, xx, xxx, num_consecutive_same_value)
- if not is_monotonic_increasing_bounds:
- nobs = 0
- x = 0.0
- xx = 0.0
- xxx = 0.0
- return output
- # ----------------------------------------------------------------------
- # Rolling kurtosis
- cdef float64_t calc_kurt(int64_t minp, int64_t nobs,
- float64_t x, float64_t xx,
- float64_t xxx, float64_t xxxx,
- int64_t num_consecutive_same_value,
- ) nogil:
- cdef:
- float64_t result, dnobs
- float64_t A, B, C, D, R, K
- if nobs >= minp:
- if nobs < 4:
- result = NaN
- # GH 42064 46431
- # uniform case, force result to be -3.
- elif num_consecutive_same_value >= nobs:
- result = -3.
- else:
- dnobs = <float64_t>nobs
- A = x / dnobs
- R = A * A
- B = xx / dnobs - R
- R = R * A
- C = xxx / dnobs - R - 3 * A * B
- R = R * A
- D = xxxx / dnobs - R - 6 * B * A * A - 4 * C * A
- # #18044: with uniform distribution, floating issue will
- # cause B != 0. and cause the result is a very
- # large number.
- #
- # in core/nanops.py nanskew/nankurt call the function
- # _zero_out_fperr(m2) to fix floating error.
- # if the variance is less than 1e-14, it could be
- # treat as zero, here we follow the original
- # skew/kurt behaviour to check B <= 1e-14
- if B <= 1e-14:
- result = NaN
- else:
- K = (dnobs * dnobs - 1.) * D / (B * B) - 3 * ((dnobs - 1.) ** 2)
- result = K / ((dnobs - 2.) * (dnobs - 3.))
- else:
- result = NaN
- return result
- cdef void add_kurt(float64_t val, int64_t *nobs,
- float64_t *x, float64_t *xx,
- float64_t *xxx, float64_t *xxxx,
- float64_t *compensation_x,
- float64_t *compensation_xx,
- float64_t *compensation_xxx,
- float64_t *compensation_xxxx,
- int64_t *num_consecutive_same_value,
- float64_t *prev_value
- ) nogil:
- """ add a value from the kurotic calc """
- cdef:
- float64_t y, t
- # Not NaN
- if val == val:
- nobs[0] = nobs[0] + 1
- y = val - compensation_x[0]
- t = x[0] + y
- compensation_x[0] = t - x[0] - y
- x[0] = t
- y = val * val - compensation_xx[0]
- t = xx[0] + y
- compensation_xx[0] = t - xx[0] - y
- xx[0] = t
- y = val * val * val - compensation_xxx[0]
- t = xxx[0] + y
- compensation_xxx[0] = t - xxx[0] - y
- xxx[0] = t
- y = val * val * val * val - compensation_xxxx[0]
- t = xxxx[0] + y
- compensation_xxxx[0] = t - xxxx[0] - y
- xxxx[0] = t
- # GH#42064, record num of same values to remove floating point artifacts
- if val == prev_value[0]:
- num_consecutive_same_value[0] += 1
- else:
- # reset to 1 (include current value itself)
- num_consecutive_same_value[0] = 1
- prev_value[0] = val
- cdef void remove_kurt(float64_t val, int64_t *nobs,
- float64_t *x, float64_t *xx,
- float64_t *xxx, float64_t *xxxx,
- float64_t *compensation_x,
- float64_t *compensation_xx,
- float64_t *compensation_xxx,
- float64_t *compensation_xxxx) nogil:
- """ remove a value from the kurotic calc """
- cdef:
- float64_t y, t
- # Not NaN
- if val == val:
- nobs[0] = nobs[0] - 1
- y = - val - compensation_x[0]
- t = x[0] + y
- compensation_x[0] = t - x[0] - y
- x[0] = t
- y = - val * val - compensation_xx[0]
- t = xx[0] + y
- compensation_xx[0] = t - xx[0] - y
- xx[0] = t
- y = - val * val * val - compensation_xxx[0]
- t = xxx[0] + y
- compensation_xxx[0] = t - xxx[0] - y
- xxx[0] = t
- y = - val * val * val * val - compensation_xxxx[0]
- t = xxxx[0] + y
- compensation_xxxx[0] = t - xxxx[0] - y
- xxxx[0] = t
- def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp) -> np.ndarray:
- cdef:
- Py_ssize_t i, j
- float64_t val, mean_val, min_val, sum_val = 0
- float64_t compensation_xxxx_add, compensation_xxxx_remove
- float64_t compensation_xxx_remove, compensation_xxx_add
- float64_t compensation_xx_remove, compensation_xx_add
- float64_t compensation_x_remove, compensation_x_add
- float64_t x, xx, xxx, xxxx
- float64_t prev_value
- int64_t nobs, s, e, num_consecutive_same_value
- int64_t N = len(start), V = len(values), nobs_mean = 0
- ndarray[float64_t] output, values_copy
- bint is_monotonic_increasing_bounds
- minp = max(minp, 4)
- is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
- start, end
- )
- output = np.empty(N, dtype=np.float64)
- values_copy = np.copy(values)
- min_val = np.nanmin(values)
- with nogil:
- for i in range(0, V):
- val = values_copy[i]
- if val == val:
- nobs_mean += 1
- sum_val += val
- mean_val = sum_val / nobs_mean
- # Other cases would lead to imprecision for smallest values
- if min_val - mean_val > -1e4:
- mean_val = round(mean_val)
- for i in range(0, V):
- values_copy[i] = values_copy[i] - mean_val
- for i in range(0, N):
- s = start[i]
- e = end[i]
- # Over the first window, observations can only be added
- # never removed
- if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
- prev_value = values[s]
- num_consecutive_same_value = 0
- compensation_xxxx_add = compensation_xxxx_remove = 0
- compensation_xxx_remove = compensation_xxx_add = 0
- compensation_xx_remove = compensation_xx_add = 0
- compensation_x_remove = compensation_x_add = 0
- x = xx = xxx = xxxx = 0
- nobs = 0
- for j in range(s, e):
- add_kurt(values_copy[j], &nobs, &x, &xx, &xxx, &xxxx,
- &compensation_x_add, &compensation_xx_add,
- &compensation_xxx_add, &compensation_xxxx_add,
- &num_consecutive_same_value, &prev_value)
- else:
- # After the first window, observations can both be added
- # and removed
- # calculate deletes
- for j in range(start[i - 1], s):
- remove_kurt(values_copy[j], &nobs, &x, &xx, &xxx, &xxxx,
- &compensation_x_remove, &compensation_xx_remove,
- &compensation_xxx_remove, &compensation_xxxx_remove)
- # calculate adds
- for j in range(end[i - 1], e):
- add_kurt(values_copy[j], &nobs, &x, &xx, &xxx, &xxxx,
- &compensation_x_add, &compensation_xx_add,
- &compensation_xxx_add, &compensation_xxxx_add,
- &num_consecutive_same_value, &prev_value)
- output[i] = calc_kurt(minp, nobs, x, xx, xxx, xxxx,
- num_consecutive_same_value)
- if not is_monotonic_increasing_bounds:
- nobs = 0
- x = 0.0
- xx = 0.0
- xxx = 0.0
- xxxx = 0.0
- return output
- # ----------------------------------------------------------------------
- # Rolling median, min, max
- def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp) -> np.ndarray:
- cdef:
- Py_ssize_t i, j
- bint err = False, is_monotonic_increasing_bounds
- int midpoint, ret = 0
- int64_t nobs = 0, N = len(start), s, e, win
- float64_t val, res
- skiplist_t *sl
- ndarray[float64_t] output
- is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
- start, end
- )
- # we use the Fixed/Variable Indexer here as the
- # actual skiplist ops outweigh any window computation costs
- output = np.empty(N, dtype=np.float64)
- if (end - start).max() == 0:
- output[:] = NaN
- return output
- win = (end - start).max()
- sl = skiplist_init(<int>win)
- if sl == NULL:
- raise MemoryError("skiplist_init failed")
- with nogil:
- for i in range(0, N):
- s = start[i]
- e = end[i]
- if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
- if i != 0:
- skiplist_destroy(sl)
- sl = skiplist_init(<int>win)
- nobs = 0
- # setup
- for j in range(s, e):
- val = values[j]
- if val == val:
- nobs += 1
- err = skiplist_insert(sl, val) == -1
- if err:
- break
- else:
- # calculate adds
- for j in range(end[i - 1], e):
- val = values[j]
- if val == val:
- nobs += 1
- err = skiplist_insert(sl, val) == -1
- if err:
- break
- # calculate deletes
- for j in range(start[i - 1], s):
- val = values[j]
- if val == val:
- skiplist_remove(sl, val)
- nobs -= 1
- if nobs >= minp:
- midpoint = <int>(nobs / 2)
- if nobs % 2:
- res = skiplist_get(sl, midpoint, &ret)
- else:
- res = (skiplist_get(sl, midpoint, &ret) +
- skiplist_get(sl, (midpoint - 1), &ret)) / 2
- if ret == 0:
- res = NaN
- else:
- res = NaN
- output[i] = res
- if not is_monotonic_increasing_bounds:
- nobs = 0
- skiplist_destroy(sl)
- sl = skiplist_init(<int>win)
- skiplist_destroy(sl)
- if err:
- raise MemoryError("skiplist_insert failed")
- return output
- # ----------------------------------------------------------------------
- # Moving maximum / minimum code taken from Bottleneck under the terms
- # of its Simplified BSD license
- # https://github.com/pydata/bottleneck
- cdef float64_t init_mm(float64_t ai, Py_ssize_t *nobs, bint is_max) nogil:
- if ai == ai:
- nobs[0] = nobs[0] + 1
- elif is_max:
- ai = MINfloat64
- else:
- ai = MAXfloat64
- return ai
- cdef void remove_mm(float64_t aold, Py_ssize_t *nobs) nogil:
- """ remove a value from the mm calc """
- if aold == aold:
- nobs[0] = nobs[0] - 1
- cdef float64_t calc_mm(int64_t minp, Py_ssize_t nobs,
- float64_t value) nogil:
- cdef:
- float64_t result
- if nobs >= minp:
- result = value
- else:
- result = NaN
- return result
- def roll_max(ndarray[float64_t] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp) -> np.ndarray:
- """
- Moving max of 1d array of any numeric type along axis=0 ignoring NaNs.
- Parameters
- ----------
- values : np.ndarray[np.float64]
- window : int, size of rolling window
- minp : if number of observations in window
- is below this, output a NaN
- index : ndarray, optional
- index for window computation
- closed : 'right', 'left', 'both', 'neither'
- make the interval closed on the right, left,
- both or neither endpoints
- Returns
- -------
- np.ndarray[float]
- """
- return _roll_min_max(values, start, end, minp, is_max=1)
- def roll_min(ndarray[float64_t] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp) -> np.ndarray:
- """
- Moving min of 1d array of any numeric type along axis=0 ignoring NaNs.
- Parameters
- ----------
- values : np.ndarray[np.float64]
- window : int, size of rolling window
- minp : if number of observations in window
- is below this, output a NaN
- index : ndarray, optional
- index for window computation
- Returns
- -------
- np.ndarray[float]
- """
- return _roll_min_max(values, start, end, minp, is_max=0)
- cdef _roll_min_max(ndarray[float64_t] values,
- ndarray[int64_t] starti,
- ndarray[int64_t] endi,
- int64_t minp,
- bint is_max):
- cdef:
- float64_t ai
- int64_t curr_win_size, start
- Py_ssize_t i, k, nobs = 0, N = len(starti)
- deque Q[int64_t] # min/max always the front
- deque W[int64_t] # track the whole window for nobs compute
- ndarray[float64_t, ndim=1] output
- output = np.empty(N, dtype=np.float64)
- Q = deque[int64_t]()
- W = deque[int64_t]()
- with nogil:
- # This is using a modified version of the C++ code in this
- # SO post: https://stackoverflow.com/a/12239580
- # The original impl didn't deal with variable window sizes
- # So the code was optimized for that
- # first window's size
- curr_win_size = endi[0] - starti[0]
- # GH 32865
- # Anchor output index to values index to provide custom
- # BaseIndexer support
- for i in range(N):
- curr_win_size = endi[i] - starti[i]
- if i == 0:
- start = starti[i]
- else:
- start = endi[i - 1]
- for k in range(start, endi[i]):
- ai = init_mm(values[k], &nobs, is_max)
- # Discard previous entries if we find new min or max
- if is_max:
- while not Q.empty() and ((ai >= values[Q.back()]) or
- values[Q.back()] != values[Q.back()]):
- Q.pop_back()
- else:
- while not Q.empty() and ((ai <= values[Q.back()]) or
- values[Q.back()] != values[Q.back()]):
- Q.pop_back()
- Q.push_back(k)
- W.push_back(k)
- # Discard entries outside and left of current window
- while not Q.empty() and Q.front() <= starti[i] - 1:
- Q.pop_front()
- while not W.empty() and W.front() <= starti[i] - 1:
- remove_mm(values[W.front()], &nobs)
- W.pop_front()
- # Save output based on index in input value array
- if not Q.empty() and curr_win_size > 0:
- output[i] = calc_mm(minp, nobs, values[Q.front()])
- else:
- output[i] = NaN
- return output
- cdef enum InterpolationType:
- LINEAR,
- LOWER,
- HIGHER,
- NEAREST,
- MIDPOINT
- interpolation_types = {
- "linear": LINEAR,
- "lower": LOWER,
- "higher": HIGHER,
- "nearest": NEAREST,
- "midpoint": MIDPOINT,
- }
- def roll_quantile(const float64_t[:] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp,
- float64_t quantile, str interpolation) -> np.ndarray:
- """
- O(N log(window)) implementation using skip list
- """
- cdef:
- Py_ssize_t i, j, s, e, N = len(start), idx
- int ret = 0
- int64_t nobs = 0, win
- float64_t val, idx_with_fraction
- float64_t vlow, vhigh
- skiplist_t *skiplist
- InterpolationType interpolation_type
- ndarray[float64_t] output
- if quantile <= 0.0 or quantile >= 1.0:
- raise ValueError(f"quantile value {quantile} not in [0, 1]")
- try:
- interpolation_type = interpolation_types[interpolation]
- except KeyError:
- raise ValueError(f"Interpolation '{interpolation}' is not supported")
- is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
- start, end
- )
- # we use the Fixed/Variable Indexer here as the
- # actual skiplist ops outweigh any window computation costs
- output = np.empty(N, dtype=np.float64)
- win = (end - start).max()
- if win == 0:
- output[:] = NaN
- return output
- skiplist = skiplist_init(<int>win)
- if skiplist == NULL:
- raise MemoryError("skiplist_init failed")
- with nogil:
- for i in range(0, N):
- s = start[i]
- e = end[i]
- if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
- if i != 0:
- nobs = 0
- skiplist_destroy(skiplist)
- skiplist = skiplist_init(<int>win)
- # setup
- for j in range(s, e):
- val = values[j]
- if val == val:
- nobs += 1
- skiplist_insert(skiplist, val)
- else:
- # calculate adds
- for j in range(end[i - 1], e):
- val = values[j]
- if val == val:
- nobs += 1
- skiplist_insert(skiplist, val)
- # calculate deletes
- for j in range(start[i - 1], s):
- val = values[j]
- if val == val:
- skiplist_remove(skiplist, val)
- nobs -= 1
- if nobs >= minp:
- if nobs == 1:
- # Single value in skip list
- output[i] = skiplist_get(skiplist, 0, &ret)
- else:
- idx_with_fraction = quantile * (nobs - 1)
- idx = <int>idx_with_fraction
- if idx_with_fraction == idx:
- # no need to interpolate
- output[i] = skiplist_get(skiplist, idx, &ret)
- continue
- if interpolation_type == LINEAR:
- vlow = skiplist_get(skiplist, idx, &ret)
- vhigh = skiplist_get(skiplist, idx + 1, &ret)
- output[i] = ((vlow + (vhigh - vlow) *
- (idx_with_fraction - idx)))
- elif interpolation_type == LOWER:
- output[i] = skiplist_get(skiplist, idx, &ret)
- elif interpolation_type == HIGHER:
- output[i] = skiplist_get(skiplist, idx + 1, &ret)
- elif interpolation_type == NEAREST:
- # the same behaviour as round()
- if idx_with_fraction - idx == 0.5:
- if idx % 2 == 0:
- output[i] = skiplist_get(skiplist, idx, &ret)
- else:
- output[i] = skiplist_get(
- skiplist, idx + 1, &ret)
- elif idx_with_fraction - idx < 0.5:
- output[i] = skiplist_get(skiplist, idx, &ret)
- else:
- output[i] = skiplist_get(skiplist, idx + 1, &ret)
- elif interpolation_type == MIDPOINT:
- vlow = skiplist_get(skiplist, idx, &ret)
- vhigh = skiplist_get(skiplist, idx + 1, &ret)
- output[i] = <float64_t>(vlow + vhigh) / 2
- if ret == 0:
- output[i] = NaN
- else:
- output[i] = NaN
- skiplist_destroy(skiplist)
- return output
- rolling_rank_tiebreakers = {
- "average": TiebreakEnumType.TIEBREAK_AVERAGE,
- "min": TiebreakEnumType.TIEBREAK_MIN,
- "max": TiebreakEnumType.TIEBREAK_MAX,
- }
- def roll_rank(const float64_t[:] values, ndarray[int64_t] start,
- ndarray[int64_t] end, int64_t minp, bint percentile,
- str method, bint ascending) -> np.ndarray:
- """
- O(N log(window)) implementation using skip list
- derived from roll_quantile
- """
- cdef:
- Py_ssize_t i, j, s, e, N = len(start)
- float64_t rank_min = 0, rank = 0
- int64_t nobs = 0, win
- float64_t val
- skiplist_t *skiplist
- float64_t[::1] output
- TiebreakEnumType rank_type
- try:
- rank_type = rolling_rank_tiebreakers[method]
- except KeyError:
- raise ValueError(f"Method '{method}' is not supported")
- is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
- start, end
- )
- # we use the Fixed/Variable Indexer here as the
- # actual skiplist ops outweigh any window computation costs
- output = np.empty(N, dtype=np.float64)
- win = (end - start).max()
- if win == 0:
- output[:] = NaN
- return np.asarray(output)
- skiplist = skiplist_init(<int>win)
- if skiplist == NULL:
- raise MemoryError("skiplist_init failed")
- with nogil:
- for i in range(N):
- s = start[i]
- e = end[i]
- if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
- if i != 0:
- nobs = 0
- skiplist_destroy(skiplist)
- skiplist = skiplist_init(<int>win)
- # setup
- for j in range(s, e):
- val = values[j] if ascending else -values[j]
- if val == val:
- nobs += 1
- rank = skiplist_insert(skiplist, val)
- if rank == -1:
- raise MemoryError("skiplist_insert failed")
- if rank_type == TiebreakEnumType.TIEBREAK_AVERAGE:
- # The average rank of `val` is the sum of the ranks of all
- # instances of `val` in the skip list divided by the number
- # of instances. The sum of consecutive integers from 1 to N
- # is N * (N + 1) / 2.
- # The sum of the ranks is the sum of integers from the
- # lowest rank to the highest rank, which is the sum of
- # integers from 1 to the highest rank minus the sum of
- # integers from 1 to one less than the lowest rank.
- rank_min = skiplist_min_rank(skiplist, val)
- rank = (((rank * (rank + 1) / 2)
- - ((rank_min - 1) * rank_min / 2))
- / (rank - rank_min + 1))
- elif rank_type == TiebreakEnumType.TIEBREAK_MIN:
- rank = skiplist_min_rank(skiplist, val)
- else:
- rank = NaN
- else:
- # calculate deletes
- for j in range(start[i - 1], s):
- val = values[j] if ascending else -values[j]
- if val == val:
- skiplist_remove(skiplist, val)
- nobs -= 1
- # calculate adds
- for j in range(end[i - 1], e):
- val = values[j] if ascending else -values[j]
- if val == val:
- nobs += 1
- rank = skiplist_insert(skiplist, val)
- if rank == -1:
- raise MemoryError("skiplist_insert failed")
- if rank_type == TiebreakEnumType.TIEBREAK_AVERAGE:
- rank_min = skiplist_min_rank(skiplist, val)
- rank = (((rank * (rank + 1) / 2)
- - ((rank_min - 1) * rank_min / 2))
- / (rank - rank_min + 1))
- elif rank_type == TiebreakEnumType.TIEBREAK_MIN:
- rank = skiplist_min_rank(skiplist, val)
- else:
- rank = NaN
- if nobs >= minp:
- output[i] = rank / nobs if percentile else rank
- else:
- output[i] = NaN
- skiplist_destroy(skiplist)
- return np.asarray(output)
- def roll_apply(object obj,
- ndarray[int64_t] start, ndarray[int64_t] end,
- int64_t minp,
- object function, bint raw,
- tuple args, dict kwargs) -> np.ndarray:
- cdef:
- ndarray[float64_t] output, counts
- ndarray[float64_t, cast=True] arr
- Py_ssize_t i, s, e, N = len(start), n = len(obj)
- if n == 0:
- return np.array([], dtype=np.float64)
- arr = np.asarray(obj)
- # ndarray input
- if raw and not arr.flags.c_contiguous:
- arr = arr.copy("C")
- counts = roll_sum(np.isfinite(arr).astype(float), start, end, minp)
- output = np.empty(N, dtype=np.float64)
- for i in range(N):
- s = start[i]
- e = end[i]
- if counts[i] >= minp:
- if raw:
- output[i] = function(arr[s:e], *args, **kwargs)
- else:
- output[i] = function(obj.iloc[s:e], *args, **kwargs)
- else:
- output[i] = NaN
- return output
- # ----------------------------------------------------------------------
- # Rolling sum and mean for weighted window
- def roll_weighted_sum(
- const float64_t[:] values, const float64_t[:] weights, int minp
- ) -> np.ndarray:
- return _roll_weighted_sum_mean(values, weights, minp, avg=0)
- def roll_weighted_mean(
- const float64_t[:] values, const float64_t[:] weights, int minp
- ) -> np.ndarray:
- return _roll_weighted_sum_mean(values, weights, minp, avg=1)
- cdef float64_t[:] _roll_weighted_sum_mean(const float64_t[:] values,
- const float64_t[:] weights,
- int minp, bint avg):
- """
- Assume len(weights) << len(values)
- """
- cdef:
- float64_t[:] output, tot_wgt, counts
- Py_ssize_t in_i, win_i, win_n, in_n
- float64_t val_in, val_win, c, w
- in_n = len(values)
- win_n = len(weights)
- output = np.zeros(in_n, dtype=np.float64)
- counts = np.zeros(in_n, dtype=np.float64)
- if avg:
- tot_wgt = np.zeros(in_n, dtype=np.float64)
- elif minp > in_n:
- minp = in_n + 1
- minp = max(minp, 1)
- with nogil:
- if avg:
- for win_i in range(win_n):
- val_win = weights[win_i]
- if val_win != val_win:
- continue
- for in_i in range(in_n - (win_n - win_i) + 1):
- val_in = values[in_i]
- if val_in == val_in:
- output[in_i + (win_n - win_i) - 1] += val_in * val_win
- counts[in_i + (win_n - win_i) - 1] += 1
- tot_wgt[in_i + (win_n - win_i) - 1] += val_win
- for in_i in range(in_n):
- c = counts[in_i]
- if c < minp:
- output[in_i] = NaN
- else:
- w = tot_wgt[in_i]
- if w == 0:
- output[in_i] = NaN
- else:
- output[in_i] /= tot_wgt[in_i]
- else:
- for win_i in range(win_n):
- val_win = weights[win_i]
- if val_win != val_win:
- continue
- for in_i in range(in_n - (win_n - win_i) + 1):
- val_in = values[in_i]
- if val_in == val_in:
- output[in_i + (win_n - win_i) - 1] += val_in * val_win
- counts[in_i + (win_n - win_i) - 1] += 1
- for in_i in range(in_n):
- c = counts[in_i]
- if c < minp:
- output[in_i] = NaN
- return output
- # ----------------------------------------------------------------------
- # Rolling var for weighted window
- cdef float64_t calc_weighted_var(float64_t t,
- float64_t sum_w,
- Py_ssize_t win_n,
- unsigned int ddof,
- float64_t nobs,
- int64_t minp) nogil:
- """
- Calculate weighted variance for a window using West's method.
- Paper: https://dl.acm.org/citation.cfm?id=359153
- Parameters
- ----------
- t: float64_t
- sum of weighted squared differences
- sum_w: float64_t
- sum of weights
- win_n: Py_ssize_t
- window size
- ddof: unsigned int
- delta degrees of freedom
- nobs: float64_t
- number of observations
- minp: int64_t
- minimum number of observations
- Returns
- -------
- result : float64_t
- weighted variance of the window
- """
- cdef:
- float64_t result
- # Variance is unchanged if no observation is added or removed
- if (nobs >= minp) and (nobs > ddof):
- # pathological case
- if nobs == 1:
- result = 0
- else:
- result = t * win_n / ((win_n - ddof) * sum_w)
- if result < 0:
- result = 0
- else:
- result = NaN
- return result
- cdef void add_weighted_var(float64_t val,
- float64_t w,
- float64_t *t,
- float64_t *sum_w,
- float64_t *mean,
- float64_t *nobs) nogil:
- """
- Update weighted mean, sum of weights and sum of weighted squared
- differences to include value and weight pair in weighted variance
- calculation using West's method.
- Paper: https://dl.acm.org/citation.cfm?id=359153
- Parameters
- ----------
- val: float64_t
- window values
- w: float64_t
- window weights
- t: float64_t
- sum of weighted squared differences
- sum_w: float64_t
- sum of weights
- mean: float64_t
- weighted mean
- nobs: float64_t
- number of observations
- """
- cdef:
- float64_t temp, q, r
- if val != val:
- return
- nobs[0] = nobs[0] + 1
- q = val - mean[0]
- temp = sum_w[0] + w
- r = q * w / temp
- mean[0] = mean[0] + r
- t[0] = t[0] + r * sum_w[0] * q
- sum_w[0] = temp
- cdef void remove_weighted_var(float64_t val,
- float64_t w,
- float64_t *t,
- float64_t *sum_w,
- float64_t *mean,
- float64_t *nobs) nogil:
- """
- Update weighted mean, sum of weights and sum of weighted squared
- differences to remove value and weight pair from weighted variance
- calculation using West's method.
- Paper: https://dl.acm.org/citation.cfm?id=359153
- Parameters
- ----------
- val: float64_t
- window values
- w: float64_t
- window weights
- t: float64_t
- sum of weighted squared differences
- sum_w: float64_t
- sum of weights
- mean: float64_t
- weighted mean
- nobs: float64_t
- number of observations
- """
- cdef:
- float64_t temp, q, r
- if val == val:
- nobs[0] = nobs[0] - 1
- if nobs[0]:
- q = val - mean[0]
- temp = sum_w[0] - w
- r = q * w / temp
- mean[0] = mean[0] - r
- t[0] = t[0] - r * sum_w[0] * q
- sum_w[0] = temp
- else:
- t[0] = 0
- sum_w[0] = 0
- mean[0] = 0
- def roll_weighted_var(const float64_t[:] values, const float64_t[:] weights,
- int64_t minp, unsigned int ddof):
- """
- Calculates weighted rolling variance using West's online algorithm.
- Paper: https://dl.acm.org/citation.cfm?id=359153
- Parameters
- ----------
- values: float64_t[:]
- values to roll window over
- weights: float64_t[:]
- array of weights whose length is window size
- minp: int64_t
- minimum number of observations to calculate
- variance of a window
- ddof: unsigned int
- the divisor used in variance calculations
- is the window size - ddof
- Returns
- -------
- output: float64_t[:]
- weighted variances of windows
- """
- cdef:
- float64_t t = 0, sum_w = 0, mean = 0, nobs = 0
- float64_t val, pre_val, w, pre_w
- Py_ssize_t i, n, win_n
- float64_t[:] output
- n = len(values)
- win_n = len(weights)
- output = np.empty(n, dtype=np.float64)
- with nogil:
- for i in range(min(win_n, n)):
- add_weighted_var(values[i], weights[i], &t,
- &sum_w, &mean, &nobs)
- output[i] = calc_weighted_var(t, sum_w, win_n,
- ddof, nobs, minp)
- for i in range(win_n, n):
- val = values[i]
- pre_val = values[i - win_n]
- w = weights[i % win_n]
- pre_w = weights[(i - win_n) % win_n]
- if val == val:
- if pre_val == pre_val:
- remove_weighted_var(pre_val, pre_w, &t,
- &sum_w, &mean, &nobs)
- add_weighted_var(val, w, &t, &sum_w, &mean, &nobs)
- elif pre_val == pre_val:
- remove_weighted_var(pre_val, pre_w, &t,
- &sum_w, &mean, &nobs)
- output[i] = calc_weighted_var(t, sum_w, win_n,
- ddof, nobs, minp)
- return output
- # ----------------------------------------------------------------------
- # Exponentially weighted moving
- @cython.cpow(True)
- def ewm(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end,
- int minp, float64_t com, bint adjust, bint ignore_na,
- const float64_t[:] deltas=None, bint normalize=True) -> np.ndarray:
- """
- Compute exponentially-weighted moving average or sum using center-of-mass.
- Parameters
- ----------
- vals : ndarray (float64 type)
- start: ndarray (int64 type)
- end: ndarray (int64 type)
- minp : int
- com : float64
- adjust : bool
- ignore_na : bool
- deltas : ndarray (float64 type), optional. If None, implicitly assumes equally
- spaced points (used when `times` is not passed)
- normalize : bool, optional.
- If True, calculate the mean. If False, calculate the sum.
- Returns
- -------
- np.ndarray[float64_t]
- """
- cdef:
- Py_ssize_t i, j, s, e, nobs, win_size, N = len(vals), M = len(start)
- const float64_t[:] sub_vals
- const float64_t[:] sub_deltas=None
- ndarray[float64_t] sub_output, output = np.empty(N, dtype=np.float64)
- float64_t alpha, old_wt_factor, new_wt, weighted, old_wt, cur
- bint is_observation, use_deltas
- if N == 0:
- return output
- use_deltas = deltas is not None
- alpha = 1. / (1. + com)
- old_wt_factor = 1. - alpha
- new_wt = 1. if adjust else alpha
- for j in range(M):
- s = start[j]
- e = end[j]
- sub_vals = vals[s:e]
- # note that len(deltas) = len(vals) - 1 and deltas[i] is to be used in
- # conjunction with vals[i+1]
- if use_deltas:
- sub_deltas = deltas[s:e - 1]
- win_size = len(sub_vals)
- sub_output = np.empty(win_size, dtype=np.float64)
- weighted = sub_vals[0]
- is_observation = weighted == weighted
- nobs = int(is_observation)
- sub_output[0] = weighted if nobs >= minp else NaN
- old_wt = 1.
- with nogil:
- for i in range(1, win_size):
- cur = sub_vals[i]
- is_observation = cur == cur
- nobs += is_observation
- if weighted == weighted:
- if is_observation or not ignore_na:
- if normalize:
- if use_deltas:
- old_wt *= old_wt_factor ** sub_deltas[i - 1]
- else:
- old_wt *= old_wt_factor
- else:
- weighted = old_wt_factor * weighted
- if is_observation:
- if normalize:
- # avoid numerical errors on constant series
- if weighted != cur:
- weighted = old_wt * weighted + new_wt * cur
- weighted /= (old_wt + new_wt)
- if adjust:
- old_wt += new_wt
- else:
- old_wt = 1.
- else:
- weighted += cur
- elif is_observation:
- weighted = cur
- sub_output[i] = weighted if nobs >= minp else NaN
- output[s:e] = sub_output
- return output
- def ewmcov(const float64_t[:] input_x, const int64_t[:] start, const int64_t[:] end,
- int minp, const float64_t[:] input_y, float64_t com, bint adjust,
- bint ignore_na, bint bias) -> np.ndarray:
- """
- Compute exponentially-weighted moving variance using center-of-mass.
- Parameters
- ----------
- input_x : ndarray (float64 type)
- start: ndarray (int64 type)
- end: ndarray (int64 type)
- minp : int
- input_y : ndarray (float64 type)
- com : float64
- adjust : bool
- ignore_na : bool
- bias : bool
- Returns
- -------
- np.ndarray[float64_t]
- """
- cdef:
- Py_ssize_t i, j, s, e, win_size, nobs
- Py_ssize_t N = len(input_x), M = len(input_y), L = len(start)
- float64_t alpha, old_wt_factor, new_wt, mean_x, mean_y, cov
- float64_t sum_wt, sum_wt2, old_wt, cur_x, cur_y, old_mean_x, old_mean_y
- float64_t numerator, denominator
- const float64_t[:] sub_x_vals, sub_y_vals
- ndarray[float64_t] sub_out, output = np.empty(N, dtype=np.float64)
- bint is_observation
- if M != N:
- raise ValueError(f"arrays are of different lengths ({N} and {M})")
- if N == 0:
- return output
- alpha = 1. / (1. + com)
- old_wt_factor = 1. - alpha
- new_wt = 1. if adjust else alpha
- for j in range(L):
- s = start[j]
- e = end[j]
- sub_x_vals = input_x[s:e]
- sub_y_vals = input_y[s:e]
- win_size = len(sub_x_vals)
- sub_out = np.empty(win_size, dtype=np.float64)
- mean_x = sub_x_vals[0]
- mean_y = sub_y_vals[0]
- is_observation = (mean_x == mean_x) and (mean_y == mean_y)
- nobs = int(is_observation)
- if not is_observation:
- mean_x = NaN
- mean_y = NaN
- sub_out[0] = (0. if bias else NaN) if nobs >= minp else NaN
- cov = 0.
- sum_wt = 1.
- sum_wt2 = 1.
- old_wt = 1.
- with nogil:
- for i in range(1, win_size):
- cur_x = sub_x_vals[i]
- cur_y = sub_y_vals[i]
- is_observation = (cur_x == cur_x) and (cur_y == cur_y)
- nobs += is_observation
- if mean_x == mean_x:
- if is_observation or not ignore_na:
- sum_wt *= old_wt_factor
- sum_wt2 *= (old_wt_factor * old_wt_factor)
- old_wt *= old_wt_factor
- if is_observation:
- old_mean_x = mean_x
- old_mean_y = mean_y
- # avoid numerical errors on constant series
- if mean_x != cur_x:
- mean_x = ((old_wt * old_mean_x) +
- (new_wt * cur_x)) / (old_wt + new_wt)
- # avoid numerical errors on constant series
- if mean_y != cur_y:
- mean_y = ((old_wt * old_mean_y) +
- (new_wt * cur_y)) / (old_wt + new_wt)
- cov = ((old_wt * (cov + ((old_mean_x - mean_x) *
- (old_mean_y - mean_y)))) +
- (new_wt * ((cur_x - mean_x) *
- (cur_y - mean_y)))) / (old_wt + new_wt)
- sum_wt += new_wt
- sum_wt2 += (new_wt * new_wt)
- old_wt += new_wt
- if not adjust:
- sum_wt /= old_wt
- sum_wt2 /= (old_wt * old_wt)
- old_wt = 1.
- elif is_observation:
- mean_x = cur_x
- mean_y = cur_y
- if nobs >= minp:
- if not bias:
- numerator = sum_wt * sum_wt
- denominator = numerator - sum_wt2
- if denominator > 0:
- sub_out[i] = (numerator / denominator) * cov
- else:
- sub_out[i] = NaN
- else:
- sub_out[i] = cov
- else:
- sub_out[i] = NaN
- output[s:e] = sub_out
- return output
|