我有一个类,它有一个方法可以进行一些并行计算,并且经常被调用。因此,我希望我的池在类的构造函数中初始化一次,而不是每次调用此方法时都创建一个新池。在此方法中,我想使用 apply_async() 为所有工作进程启动一个任务,然后等待(阻塞)并聚合每个任务的结果。我的代码如下所示:
class Foo:
def __init__(self, ...):
# ...
self.pool = mp.Pool(mp.cpu_count())
def do_parallel_calculations(self, ...):
for _ in range(mp.cpu_count()):
self.pool.apply_async(calc_func, args=(...), callback=aggregate_result)
# wait for results to be aggregated to a global var by the callback
self.pool.join() # <-- ValueError: Pool is still running
# do something with the aggregated result of all worker processes
但是,当我运行此命令时,我在 self.pool.join() 中收到错误消息:“ValueError:池仍在运行”。现在,在所有示例中,我都看到 self.pool.close() 在 self.pool.join() 之前被调用,我认为这就是我收到此错误的原因,但我不想关闭我的池,因为我想要它在那里下次调用此方法时!我不能不使用 self.pool.join(),因为我需要一种方法来等待所有进程完成,并且我不想浪费地手动旋转,例如使用“while not global_flag: pass”。
我可以做什么来实现我想要做的事情?为什么多重处理不允许我加入仍然开放的池?这似乎是一件完全合理的事情。
一只斗牛犬
千巷猫影
相关分类