PIPIONE
就像其他人说的,你必须使用多个进程来实现真正的并行而不是线程,因为 GIL 限制阻止线程并发运行。如果你想使用标准的多处理库(它基于启动多个进程),我建议使用一个worker 池。如果我理解正确,您想启动 100 多个并行实例。在一台主机上启动 100 多个进程会产生过多的开销。相反,创建一个 P 工人池,其中 P 是例如您机器中的核心数,并将 100 多个作业提交到池中。这很简单,网上有很多例子。此外,当您向池提交作业时,您可以提供回调函数来接收错误。这可能足以满足您的需求(有例子在这里)。但是,上次我查看时,多处理中的池无法在多个主机(例如机器集群)之间分配工作。所以,如果你需要这样做,或者如果你需要一个更灵活的通信方案,比如能够在工作人员运行时向控制进程发送更新,我的建议是使用char4py(请注意,我是一个 char4py 开发人员,所以这是我有经验的地方)。使用charm4py,您可以创建N 个工作进程,这些工作进程由运行时分布在P 个进程中(跨多个主机工作),并且工作进程只需通过远程方法调用即可与控制器通信。这是一个小例子:from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threadedimport timeWORKER_ITERATIONS = 100class Worker(Chare): def __init__(self, controller): self.controller = controller @threaded def work(self, x, done_future): result = -1 try: for i in range(WORKER_ITERATIONS): if i % 20 == 0: # send status update to controller self.controller.progressUpdate(self.thisIndex, i, ret=True).get() if i == 5 and self.thisIndex[0] % 2 == 0: # trigger NameError on even-numbered workers test[3] = 3 time.sleep(0.01) result = x**2 except Exception as e: # send error to controller self.controller.collectError(self.thisIndex, e) # send result to controller self.contribute(result, Reducer.gather, done_future)# This custom map is used to prevent workers from being created on process 0# (where the controller is). Not strictly needed, but allows more timely# controller outputclass WorkerMap(ArrayMap): def procNum(self, index): return (index[0] % (charm.numPes() - 1)) + 1class Controller(Chare): def __init__(self, args): self.startTime = time.time() done_future = charm.createFuture() # create 12 workers, which are distributed by charm4py among processes workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap)) # start work for i in range(12): workers[i].work(i, done_future) print('Results are', done_future.get()) # wait for result exit() def progressUpdate(self, worker_id, current_step): print(round(time.time() - self.startTime, 3), ': Worker', worker_id, 'progress', current_step * 100 / WORKER_ITERATIONS, '%') # the controller can return a value here and the worker would receive it def collectError(self, worker_id, error): print(round(time.time() - self.startTime, 3), ': Got error', error, 'from worker', worker_id)charm.start(Controller)在此示例中,控制器将在发生状态更新和错误时打印它们。完成所有工作后,它将打印所有工人的最终结果。失败的工作人员的结果将为 -1。进程数 P 在启动时给出。运行时将在可用进程之间分配 N 个工作程序。这发生在创建工作程序并且在此特定示例中没有动态负载平衡时。另外,请注意,在charm4py 模型中,远程方法调用是异步的,并返回调用者可以阻塞的未来,但只有调用线程会阻塞(而不是整个过程)。希望这会有所帮助。