猿问

如何有效地转置 67 gb 文件/Dask 数据帧而不将其完全加载到内存中?

我有 3 个相当大的文件(67gb、36gb、30gb)需要训练模型。但是,特征是行,样本是列。由于 Dask 尚未实现转置并存储按行拆分的 DataFrame,因此我需要自己编写一些东西来执行此操作。有没有一种方法可以有效地转置而不加载到内存中?


我有 16 GB 的内存可供我使用,并且正在使用 jupyter notebook。我写了一些相当慢的代码,但真的很感激更快的解决方案。以下代码的速度将需要一个月才能完成所有文件。几个数量级的最慢步骤是 awk。


import dask.dataframe as dd

import subprocess

from IPython.display import clear_output


df = dd.read_csv('~/VeryLarge.tsv')

with open('output.csv','wb') as fout:

    for i in range(1, len(df.columns)+1):

        print('AWKing')

        #read a column from the original data and store it elsewhere

        x = "awk '{print $"+str(i)+"}' ~/VeryLarge.tsv > ~/file.temp"

        subprocess.check_call([x], shell=True)


        print('Reading')

        #load and transpose the column

        col = pd.read_csv('~/file.temp')

        row = col.T

        display(row)


        print('Deleting')

        #remove the temporary file created

        !rm ../file.temp


        print('Storing')

        #store the row in its own csv just to be safe. not entirely necessary

        row.to_csv('~/columns/col_{:09d}'.format(i), header=False)


        print('Appending')

        #append the row (transposed column) to the new file

        with open('~/columns/col_{:09d}', 'rb') as fin:

            for line in fin:

                fout.write(line)


        clear_output()

        #Just a measure of progress

        print(i/len(df.columns))

数据本身有 1000 万行(特征)和 2000 列(样本)。它只需要转置。目前,它看起来像这样:

小唯快跑啊
浏览 154回答 2
2回答

慕标琳琳

我修改了我的原始脚本以部署在任意数量的 CPU 上。它运行得更快,因为我可以使用多个线程并部署在 aws 上。我用了一台96核的机器,8小时左右就完成了任务。我很惊讶,因为这几乎是线性缩放!这个想法是使一些重复的任务可分发。然后你就可以将任务分配给 CPU。这里的并行化是通过命令完成的pool.map()。从命令行使用此脚本非常简单:python3 transposer.py -i largeFile.tsv如果需要,您也可以指定其他参数。import argparse, subprocessimport numpy as npimport pandas as pdimport dask.dataframe as ddfrom IPython.display import clear_outputfrom contextlib import closingfrom os import cpu_countfrom multiprocessing import Poolparser = argparse.ArgumentParser(description='Transpose csv')parser.add_argument('-i', '--infile', help='Path to input folder',                    default=None)parser.add_argument('-s', '--sep', help='input separator',                    default='\t')args = parser.parse_args()infile = args.infilesep = args.sep    df = pd.read_csv(infile, sep='\t', nrows=3)    def READ_COL(item):    print(item)    outfile = 'outfile{}.temp'.format(item)    if item !=0:                x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile                subprocess.check_call([x], shell=True)                col = pd.read_csv(outfile)                row = col.T                display(row)                row.to_csv('col_{:09d}.csv'.format(item), header=False)                subprocess.check_call(['rm '+outfile], shell=True)                print(item/len(df.columns))with closing(Pool(processes=cpu_count())) as pool:    pool.map(READ_COL, list(range(1, len(df.columns)+1)))在此之后,您应该有许多转置列的文件。您只需要使用cat或其他命令行工具将它们连接在一起。我刚跑cat col_* > full_file_transposed.csv
随时随地看视频慕课网APP

相关分类

Python
我要回答