新手-ish python/pandas 用户在这里。我一直在尝试在 read_fwf 中使用 chunksize arg 并迭代 value_counts 变量。我编写了一个函数来传递参数,例如文件迭代器和变量来解析和计数。我希望并行化这个函数,并能够同时将 2 个文件读入同一个函数。它似乎确实有效……但是,我的速度出乎意料地变慢了。线程在同一时间完成,但一个似乎正在减慢另一个(IO 瓶颈?)。通过按顺序而不是并行运行函数(324 秒对 172 秒),我得到了更快的时间。想法?我在执行这个错误吗?我试过多进程但启动映射错误,我无法腌制文件迭代器(read_fwf 的输出)。
testdf1=pd.read_fwf(filepath_or_buffer='200k.dat',header=None,colspecs=wlist,names=nlist,dtype=object,na_values=[''],chunksize=1000)
testdf2=pd.read_fwf(filepath_or_buffer='200k2.dat',header=None,colspecs=wlist,names=nlist,dtype=object,na_values=[''],chunksize=1000)
def tfuncth(df,varn,q,*args):
td={}
for key in varn.keys():
td[key]=pd.Series()
for rdf in df:
if args is not None:
for arg in args:
rdf=eval(f"rdf.query(\"{arg}\")")
for key in varn.keys():
ecode=f'rdf.{varn[key]}.value_counts()'
td[key]=pd.concat([td[key],eval(ecode)])
td[key]=td[key].groupby(td[key].index).sum()
for key in varn.keys():
td[key]=pd.DataFrame(td[key].reset_index()).rename(columns={'index':'Value',0:'Counts'}).assign(Var=key,PCT=lambda x:round(x.Counts/x.Counts.sum()*100,2))[['Var','Value','Counts','PCT']]
q.put(td)
bands={
'1':'A',
'2':'B',
'3':'C',
'4':'D',
'5':'E',
'6':'F',
'7':'G',
'8':'H',
'9':'I'
}
vdict={
'var1':'e1270.str.slice(0,2)',
'var2':'e1270.str.slice(2,3)',
'band':'e7641.str.slice(0,1).replace(bands)'
}
更新:
经过大量阅读,这也是我得出的结论。这是非常简单的结论,我敢肯定,所以如果有人知道,请告诉我。
Pandas 不是一个完全多线程友好的包
显然有一个名为 'dask' 的包,它复制了很多 Pandas 函数。所以我会调查一下。
在许多情况下,Python 并不是真正的多线程兼容语言
因此,诸如“dask”之类的包可以利用多线程。
可以分离多个线程,但只能并行非 CPU 绑定函数。
我的代码是用 IO 和 CPU 包装的。简单的 IO 可能是并行运行的,但会等待处理器执行。
我计划通过编写仅 IO 操作并尝试线程来测试这一点。
Python 受其编译器的约束。在纯 python 中,它被 GIL 解释和绑定,一次只执行一个线程
Python 可以使用在线程上没有全局解释器锁 (GIL) 的不同编译器进行编译。
眼眸繁星
相关分类