在 Python 中多处理一个文件,然后将结果写入磁盘

我想做以下事情:

  • 从 csv 文件中读取数据

  • 处理上述 csv 的每一行(假设这是一个长网络操作)

  • 将结果写入另一个文件

我曾尝试将这个这个答案粘合在一起,但收效甚微。第二个队列的代码永远不会被调用,因此不会发生写入磁盘的情况。我如何让进程知道有第二个队列?

请注意,我不一定是multiprocessing. 如果async/await效果更好,我完全赞成。

到目前为止我的代码


万千封印
浏览 256回答 2
2回答

RISEBY

import multiprocessingimport osimport timein_queue = multiprocessing.Queue()out_queue = multiprocessing.Queue()def worker_main(in_queue, out_queue):    print (os.getpid(), "working")    while True:        item = in_queue.get(True)        print (os.getpid(), "got", item)        time.sleep(1) #long network processing        print (os.getpid(), "done", item)        # put the processed items to be written to disl        out_queue.put("processed:" + str(item))pool = multiprocessing.Pool(3, worker_main,(in_queue,out_queue))for i in range(5): # let's assume this is the file reading part    in_queue.put(i)with open('out.txt', 'w') as file:    while not out_queue.empty():        try:            value = q.get(timeout = 1)            file.write(value + '\n')        except Exception as qe:            print ("Empty Queue or dead process")

宝慕林4294392

我在尝试执行您的代码时遇到的第一个问题是:An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module我必须在if __name__ == '__main__':习语中包装任何模块范围指令。在这里阅读更多。由于您的目标是遍历文件的行,因此Pool.imap()似乎很合适。该imap()文档参考map()文档,不同的是imap()懒洋洋地提取从可迭代的下一个项目(在你的情况将是CSV文件),这将是有益的,如果您的CSV文件较大。所以从map()文档:此方法将可迭代对象分成多个块,并将其作为单独的任务提交给进程池。imap() 返回一个迭代器,这样您就可以遍历流程工作人员产生的结果以执行您必须对它们执行的操作(在您的示例中,它将结果写入文件)。这是一个工作示例:import multiprocessingimport osimport timedef worker_main(item):    print(os.getpid(), "got", item)    time.sleep(1) #long network processing    print(os.getpid(), "done", item)    # put the processed items to be written to disl    return "processed:" + str(item)if __name__ == '__main__':    with multiprocessing.Pool(3) as pool:        with open('out.txt', 'w') as file:            # range(5) simulating a 5 row csv file.            for proc_row in pool.imap(worker_main, range(5)):                file.write(proc_row + '\n')# printed output:# 1368 got 0# 9228 got 1# 12632 got 2# 1368 done 0# 1368 got 3# 9228 done 1# 9228 got 4# 12632 done 2# 1368 done 3# 9228 done 4out.txt 看起来像这样:processed:0processed:1processed:2processed:3processed:4请注意,我也不必使用任何队列。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python