熊猫:提高滚动窗口的速度(应用自定义功能)

我正在使用此代码funcX使用滚动窗口在我的数据框上应用函数 ()。主要问题是这个数据框 ( data) 的大小非常大,我正在寻找一种更快的方法来完成这项任务。


import numpy as np


def funcX(x):

    x = np.sort(x)

    xd = np.delete(x, 25)

    med = np.median(xd)

    return (np.abs(x - med)).mean() + med


med_out = data.var1.rolling(window = 51, center = True).apply(funcX, raw = True)

使用这个函数的唯一原因是计算出的中位数是去掉中间值后的中位数。所以.median()在滚动窗口的末尾添加是不同的。


红颜莎娜
浏览 196回答 1
1回答

慕村9548890

为了有效,窗口算法必须链接两个重叠窗口的结果。在这里,与 :med0中位数,排序后的元素med中的中位数 x \ med0,xl之前的元素med和xg之后的元素,可以看作: medfuncX(x)<|x-med|> + med = [sum(xg) - sum(xl) - |med0-med|] / windowsize + med&nbsp;&nbsp;因此,一个想法是维护一个表示已排序当前窗口的缓冲区,sum(xg)并且sum(xl). 使用 Numba 即时编译,这里会出现非常好的性能。首先是缓冲区管理:init对第一个窗口进行排序并计算 left( xls) 和 right( xgs) 总和。import numpy as npimport numbawindowsize = 51 #odd, >1halfsize = windowsize//2@numba.njitdef init(firstwindow):&nbsp; &nbsp; buffer = np.sort(firstwindow)&nbsp; &nbsp; xls = buffer[:halfsize].sum()&nbsp; &nbsp; xgs = buffer[-halfsize:].sum()&nbsp; &nbsp;&nbsp; &nbsp; return buffer,xls,xgsshift是线性部分。它更新缓冲区,保持它的排序。np.searchsorted计算 中的插入和删除位置O(log(windowsize))。这是技术性的xin<xout,因为xout<xin不是对称的情况。@numba.njitdef shift(buffer,xin,xout):&nbsp; &nbsp; i_in = np.searchsorted(buffer,xin)&nbsp;&nbsp; &nbsp; i_out = np.searchsorted(buffer,xout)&nbsp; &nbsp; if xin <= xout :&nbsp; &nbsp; &nbsp; &nbsp; buffer[i_in+1:i_out+1] = buffer[i_in:i_out]&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; buffer[i_in] = xin&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; buffer[i_out:i_in-1] = buffer[i_out+1:i_in]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; buffer[i_in-1] = xin&nbsp; &nbsp; return i_in, i_outupdate更新缓冲区和左右部分的总和。这是技术性的xin<xout,因为xout<xin不是对称的情况。@numba.njitdef update(buffer,xls,xgs,xin,xout):&nbsp; &nbsp; xl,x0,xg = buffer[halfsize-1:halfsize+2]&nbsp; &nbsp; i_in,i_out = shift(buffer,xin,xout)&nbsp; &nbsp; if i_out < halfsize:&nbsp; &nbsp; &nbsp; &nbsp; xls -= xout&nbsp; &nbsp; &nbsp; &nbsp; if i_in <= halfsize:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; xls += xin&nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; xls += x0&nbsp; &nbsp; elif i_in < halfsize:&nbsp; &nbsp; &nbsp; &nbsp; xls += xin - xl&nbsp; &nbsp; if i_out > halfsize:&nbsp; &nbsp; &nbsp; &nbsp; xgs -= xout&nbsp; &nbsp; &nbsp; &nbsp; if i_in > halfsize:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; xgs += xin&nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; xgs += x0&nbsp; &nbsp; elif i_in > halfsize+1:&nbsp; &nbsp; &nbsp; &nbsp; xgs += xin - xg&nbsp; &nbsp; return buffer, xls, xgsfunc相当于原来funcX的on buffer。O(1).@numba.njit&nbsp; &nbsp; &nbsp; &nbsp;def func(buffer,xls,xgs):&nbsp; &nbsp; med0 = buffer[halfsize]&nbsp; &nbsp; med&nbsp; = (buffer[halfsize-1] + buffer[halfsize+1])/2&nbsp; &nbsp; if med0 > med:&nbsp; &nbsp; &nbsp; &nbsp; return (xgs-xls+med0-med) / windowsize + med&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp; &nbsp; &nbsp; &nbsp; return (xgs-xls+med-med0) / windowsize + med&nbsp; &nbsp;&nbsp;med是全局函数。O(data.size * windowsize).@numba.njitdef med(data):&nbsp; &nbsp; res = np.full_like(data, np.nan)&nbsp; &nbsp; state = init(data[:windowsize])&nbsp; &nbsp; res[halfsize] = func(*state)&nbsp; &nbsp; for i in range(windowsize, data.size):&nbsp; &nbsp; &nbsp; &nbsp; xin,xout = data[i], data[i - windowsize]&nbsp; &nbsp; &nbsp; &nbsp; state = update(*state, xin, xout)&nbsp; &nbsp; &nbsp; &nbsp; res[i-halfsize] = func(*state)&nbsp; &nbsp; return res&nbsp;表现 :import pandasdata=pandas.DataFrame(np.random.rand(10**5))%time res1=data[0].rolling(window = windowsize, center = True).apply(funcX, raw = True)Wall time: 10.8 sres2=med(data[0].values)np.allclose((res1-res2)[halfsize:-halfsize],0)Out[112]: True%timeit res2=med(data[0].values)40.4 ms ± 462 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)它快了 250 倍,窗口大小 = 51。一小时变成了 15 秒。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python