如何实现Python的多处理池来转换数千个文件

我有一个Python脚本,可以在包含超过10,000个DBF文件的目录中读取,以便将它们转换为CSV。我想将此任务视为一般化,而不是单独转换每个文件。我已经阅读了Python的多处理模块,尽管我在实现此任务时遇到了一些麻烦。具体来说,我想使用 Pool 类在 CPU 内核之间分配工作负载。


这是我到目前为止的代码:


import os

from dbfread import DBF

import pandas as pd

import multiprocessing


directory = 'C:\\Path_to_DBF_Files' #define file directory 


files_in = os.listdir(directory) #store files in directory to list 


def convert():


    for file in files_in:


        if file.startswith('D') and file.endswith('.DBF'): #define parameters of filenames to convert

            file_path = os.path.join(files_in, file)

            print(f'\nReading in {file}...')

            dbf = DBF(file_path) #create DBF object 

            dbf.encoding = 'utf-8' #set encoding attribute to utf-8 instead of acsii 

            dbf.char_decode_errors = 'ignore' #set decoding errors attribute to ignore any errors and read in DBF file as is 

            print('\nConverting to DataFrame...')

            df = pd.DataFrame(iter(dbf)) #convert to Pandas dataframe 

            df.columns.astype(str) #convert column datatypes to string

            print(df)

            print('\nWriting to CSV...')

            dest_directory = 'C:\\Path_to_output_directory\\%s.csv' % ('D' + file.strip('.DBF')) #define destination directory and names for output files 

            df.to_csv(dest_directory, index = False)

            print(f'\nConverted {file} to CSV. Moving to next file...')



        elif file.startswith('B') and file.endswith('.DBF'): #define parameters for unnecessary files 

            print('\nB file not needed.')

            continue


        elif file.endswith('.FPT'): #skip FPT files 

            print('Skipping FPT file.')

            continue


我在StackOverflow上读到了一些与我的问题有些相似的答案;但是,我没有看到任何适用于我的特定任务的内容。如何改进代码,使脚本同时处理多个文件,而不是一次只读取和转换一个文件?


感谢您提供的任何帮助。


蝴蝶不菲
浏览 104回答 1
1回答

MM们

一些一般指导:您正在创建一个池。池大小应取决于计算机,而不是作业的大小。例如,您希望池中有 4 个进程而不是 10000 个进程,即使您有 10000 个文件要处理在每个进程上运行的作业应该简单但已参数化。在您的例子中,创建一个函数来获取文件名作为输入并执行转换。然后将输入文件映射到其中。过滤应在调用之前完成。map因此,我会将您的代码转换为如下所示的内容:import osfrom dbfread import DBFimport pandas as pdimport multiprocessingdirectory = 'C:\\Path_to_DBF_Files' #define file directory files_in = os.listdir(directory) #store files in directory to list def convert(file):    file_path = os.path.join(files_in, file)    print(f'\nReading in {file}...')    dbf = DBF(file_path) #create DBF object     dbf.encoding = 'utf-8' #set encoding attribute to utf-8 instead of acsii     dbf.char_decode_errors = 'ignore' #set decoding errors attribute to ignore any errors and read in DBF file as is     print('\nConverting to DataFrame...')    df = pd.DataFrame(iter(dbf)) #convert to Pandas dataframe     df.columns.astype(str) #convert column datatypes to string    print(df)    print('\nWriting to CSV...')    dest_directory = 'C:\\Path_to_output_directory\\%s.csv' % ('D' + file.strip('.DBF')) #define destination directory and names for output files     df.to_csv(dest_directory, index = False)    print(f'\nConverted {file} to CSV. Moving to next file...')pool = multiprocessing.Pool(processes = 4)pool.map(convert, [file for file in files_in if file.startswith('D') and file.endswith('.DBF')])
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python