了解 Python 多处理 apply_aync 并获取

我正在挖掘我前段时间编写的一些代码,它具有以下结构。我试图在需要的地方添加解释性评论。


# Create a reader and a writer process

reader_proc = Process(target=self.reader)

reader_proc.start()


writer_proc = Process(target=self.writer, args=(pfin,))

writer_proc.start()


# start a pool of workers

with Pool(n_workers, maxtasksperchild=max_tasks_per_child) as pool:

    # a list to keep track of workers

    worker_jobs = []

    # a list to keep track of return values

    return_vals = []


    # get input chunks from the reader

    # reader writes input chunks to a work_q (queue)

    while True:

        work = work_q.get()

        if work == 'done':

            break


        # process_chunk is a function that ... processes the given chunk

        # this function will do some computations and write those to a results_q (queue)

        # which the writer will then write to a file

        # the function also returns another type of value that is processed below

        job = pool.apply_async(process_chunk, (work,))

        worker_jobs.append(job)


    print('Done reading chunks!')

    # reader is done reading

    reader_proc.join()

    reader_proc.terminate()


    # When a worker has finished its job, get its information back

    for job_idx, job in enumerate(worker_jobs, 1):

        print(f"Processing job {job_idx}")

        res1, res2 = job.get()

        return_vals.append((res1, res2))


    # process results in main process

    process_results(return_vals)


    # Notify the writer that we're done

    results_q.put('done')

Tl; dr 一个池处理来自队列的块apply_async。队列用完后,我们.get()将结果返回并处理它们。


我不确定作业在应用到池时是否立即执行,还是等到.get()被调用?这很重要,因为如果他们等待执行直到队列耗尽,那么对于长队列来说可能需要很长时间。


另一方面,如果它们不等待并立即执行,那么这些函数的结果存储在哪里?由于我们正在等待直到.get()获取结果,这是否意味着子进程被阻塞直到.get()被调用?


我问的原因是因为第一个打印语句(读取完成)和后续语句(处理作业 x)之间有很长的延迟,我不知道为什么。



月关宝盒
浏览 177回答 1
1回答

吃鸡游戏

一旦工作人员空闲,任务就会执行。得到结果或根本没有得到结果对此没有影响。您的工作人员结果存储在AsyncResult对象中,在您的情况下job是其中之一并且worker_jobs拥有它们。然后你做正确的事情并遍历你的结果对象并得到结果。池在内部存储结果,直到您获得它们 - 即使您根本没有获得结果,它也不会阻止工作人员 - 在许多并行处理的情况下,如果工作人员,您甚至可能对来自工作人员的“结果”不感兴趣只是根据输入执行特定任务。一旦工作人员完成并将结果(或异常!)存储在此对象中,就可以自由地从池中接受另一个作业。这也意味着您必须在关闭池之前获得结果 - 就像现在一样。如果您将“处理作业”循环移到with Pool...结构之外,那么当您尝试获取结果时,结果就会丢失。有关可用的对象 方法,请参阅https://docs.python.org/3.4/library/multiprocessing.html?highlight=process 。AsyncResult如果您的工作人员提出一个异常,该AsyncResult对象也会存储一个异常。当工作人员遇到异常时不会立即触发它,而是存储在那里并在您 get() 结果时引发。如果您的工作人员可以引发异常,您应该为您的 get 循环而不是工作人员构建异常处理。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python