猿问

如何在 Python 中进行并行处理?

我正在尝试在 python 中进行并行处理。我有一个包含多4M行的巨大数据框。因此,作为下面给出的示例,我想将 dataframe( ) 划分为在不同的结果数据帧上df will be divided into df1,df2应用相同的集合。transpose operations感谢 Jezrael 帮助我达到这个水平。请在我的输入数据框下方找到


df = pd.DataFrame({

'subject_id':[1,1,1,1,2,2,2,2,3,3,4,4,4,4,4],

'readings' : ['READ_1','READ_2','READ_1','READ_3','READ_1','READ_5','READ_6','READ_8','READ_10','READ_12','READ_11','READ_14','READ_09','READ_08','READ_07'],

'val' :[5,6,7,11,5,7,16,12,13,56,32,13,45,43,46],

})

划分数据框的代码


N=2  # dividing into two dataframes.

dfs = [x for _,x in df.groupby(pd.factorize(df['subject_id'])[0] // N)] # dfs is an iterable which will have two dataframes

并行处理代码


import multiprocessing as mp

pool = mp.Pool(mp.cpu_count())

results = []


def transpose_ope(df):                      #this function does the transformation like I want

    df_op = (df.groupby(['subject_id','readings'])['val']

            .describe()

            .unstack()

            .swaplevel(0,1,axis=1)

            .reindex(df['readings'].unique(), axis=1, level=0))

    df_op.columns = df_op.columns.map('_'.join)

    df_op = df_op.reset_index()


results.append(pool.map(transpose_ope, [df for df in dfs])) # am I storing the output correctly here?

实际上,我想将每个阶段的输出附加到主数据框。


你能帮我做这件事吗?即使只有 10-15 条记录,我的代码也会继续运行


慕哥6287543
浏览 156回答 1
1回答

波斯汪

你在map中使用的函数需要返回你想要的对象。我还将使用可用于池的更惯用的上下文管理器。编辑:固定导入import multiprocessing as mpdef transpose_ope(df):                      #this function does the transformation like I want    df_op = (df.groupby(['subject_id','readings'])['val']            .describe()            .unstack()            .swaplevel(0,1,axis=1)            .reindex(df['readings'].unique(), axis=1, level=0))    df_op.columns = df_op.columns.map('_'.join)    df_op = df_op.reset_index()    return df_opdef main():    with mp.Pool(mp.cpu_count()) as pool:        res = pool.map(transpose_ope, [df for df in dfs])if __name__=='__main__':   main()不知道为什么要将单个列表附加到另一个列表...但是如果您只想要 [transformed(df) for df in dfs] 的最终列表,map 只会返回该列表。
随时随地看视频慕课网APP

相关分类

Python
我要回答