猿问

从单个脚本运行多个 python 脚本,并在它们之间来回通信?

我有一个我编写的脚本,我可以将参数传递给它,并且我想使用唯一参数启动多个同时迭代(可能是 100+)。我的计划是编写另一个 python 脚本,然后启动这些下标/进程,但是为了有效,我需要该脚本能够监视下标是否有任何错误。

有没有什么直接的方法可以做到这一点,或者有一个提供这个功能的库?我一直在寻找一段时间,但没有找到任何好运。创建子进程和多线程似乎很简单,但我真的找不到任何关于如何与这些线程/子进程通信的指南或教程。


慕后森
浏览 288回答 3
3回答

PIPIONE

就像其他人说的,你必须使用多个进程来实现真正的并行而不是线程,因为 GIL 限制阻止线程并发运行。如果你想使用标准的多处理库(它基于启动多个进程),我建议使用一个worker 池。如果我理解正确,您想启动 100 多个并行实例。在一台主机上启动 100 多个进程会产生过多的开销。相反,创建一个 P 工人池,其中 P 是例如您机器中的核心数,并将 100 多个作业提交到池中。这很简单,网上有很多例子。此外,当您向池提交作业时,您可以提供回调函数来接收错误。这可能足以满足您的需求(有例子在这里)。但是,上次我查看时,多处理中的池无法在多个主机(例如机器集群)之间分配工作。所以,如果你需要这样做,或者如果你需要一个更灵活的通信方案,比如能够在工作人员运行时向控制进程发送更新,我的建议是使用char4py(请注意,我是一个 char4py 开发人员,所以这是我有经验的地方)。使用charm4py,您可以创建N 个工作进程,这些工作进程由运行时分布在P 个进程中(跨多个主机工作),并且工作进程只需通过远程方法调用即可与控制器通信。这是一个小例子:from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threadedimport timeWORKER_ITERATIONS = 100class Worker(Chare):    def __init__(self, controller):        self.controller = controller    @threaded    def work(self, x, done_future):        result = -1        try:            for i in range(WORKER_ITERATIONS):                if i % 20 == 0:                    # send status update to controller                    self.controller.progressUpdate(self.thisIndex, i, ret=True).get()                if i == 5 and self.thisIndex[0] % 2 == 0:                    # trigger NameError on even-numbered workers                    test[3] = 3                time.sleep(0.01)            result = x**2        except Exception as e:            # send error to controller            self.controller.collectError(self.thisIndex, e)        # send result to controller        self.contribute(result, Reducer.gather, done_future)# This custom map is used to prevent workers from being created on process 0# (where the controller is). Not strictly needed, but allows more timely# controller outputclass WorkerMap(ArrayMap):    def procNum(self, index):        return (index[0] % (charm.numPes() - 1)) + 1class Controller(Chare):    def __init__(self, args):        self.startTime = time.time()        done_future = charm.createFuture()        # create 12 workers, which are distributed by charm4py among processes        workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))        # start work        for i in range(12):            workers[i].work(i, done_future)        print('Results are', done_future.get())  # wait for result        exit()    def progressUpdate(self, worker_id, current_step):        print(round(time.time() - self.startTime, 3), ': Worker', worker_id,              'progress', current_step * 100 / WORKER_ITERATIONS, '%')        # the controller can return a value here and the worker would receive it    def collectError(self, worker_id, error):        print(round(time.time() - self.startTime, 3), ': Got error', error,              'from worker', worker_id)charm.start(Controller)在此示例中,控制器将在发生状态更新和错误时打印它们。完成所有工作后,它将打印所有工人的最终结果。失败的工作人员的结果将为 -1。进程数 P 在启动时给出。运行时将在可用进程之间分配 N 个工作程序。这发生在创建工作程序并且在此特定示例中没有动态负载平衡时。另外,请注意,在charm4py 模型中,远程方法调用是异步的,并返回调用者可以阻塞的未来,但只有调用线程会阻塞(而不是整个过程)。希望这会有所帮助。

鸿蒙传说

更好的方法是使用线程。如果您将要调用的脚本放入这个更大的脚本中的函数中,您可以让主函数根据需要多次调用此脚本,并让线程根据需要报告信息。您可以在此处阅读一些有关线程如何工作的信息。
随时随地看视频慕课网APP

相关分类

Python
我要回答