猿问

将任务添加到父级的多处理池

如何将新任务添加到multiprocessing在父进程中初始化的池中?以下不起作用:


from multiprocessing import Pool



def child_task(x):

    # the child task spawns new tasks

    results = p.map(grandchild_task, [x])

    return results[0]



def grandchild_task(x):

    return x



if __name__ == '__main__':

    p = Pool(2)

    print(p.map(child_task, [0]))

    # Result: NameError: name 'p' is not defined

动机:我需要并行化一个由各种子任务组成的程序,这些子任务本身也有子任务(即孙任务)。仅并行化子任务或孙任务不会利用我所有的 CPU 内核。


在我的用例中,我有各种子任务(可能有 1-50 个),每个子任务有很多孙子任务(可能有 100-1000 个)。


替代方案:如果使用 Python 的多处理包无法做到这一点,我很乐意切换到另一个支持它的库。


紫衣仙女
浏览 144回答 2
2回答

鸿蒙传说

有一个最小的可重现示例这样的东西,然后除此之外还删除了太多代码,最终得到的东西 (1) 可能过于简单化了,有危险而不是答案可能会错过标记,并且 (2)不可能如图所示运行(您需要将创建 Pool 和提交任务的代码包含在由语句控制的块中if __name__ == '__main__':。但是根据您所展示的内容,我不认为 Pool 是适合您的解决方案;您应该根据需要创建 Process 实例。从进程中获取结果的一种方法是将它们存储在可共享的托管字典中,其键例如是创建结果的进程的进程 ID。为了扩展您的示例,向子任务传递了两个参数,x并且y需要作为结果返回x**2 + 'y**2。子任务将产生两个孙任务实例,每个实例计算其参数的平方。然后,子任务将使用加法组合这些进程的返回值:from multiprocessing import Process, Managerimport osdef child_task(results_dict, x, y):    # the child task spawns new tasks    p1 = Process(target=grandchild_task, args=(results_dict, x))    p1.start()    pid1 = p1.pid    p2 = Process(target=grandchild_task, args=(results_dict, y))    p2.start()    pid2 = p2.pid    p1.join()    p2.join()    pid = os.getpid()    results_dict[pid] = results_dict[pid1] + results_dict[pid2]def grandchild_task(results_dict, n):    pid = os.getpid()    results_dict[pid] = n * ndef main():    manager = Manager()    results_dict = manager.dict()    p = Process(target=child_task, args=(results_dict, 2, 3))    p.start()    pid = p.pid    p.join()    # results will be stored with key p.pid:    print(results_dict[pid])if __name__ == '__main__':    main()印刷:13更新例如,如果您确实遇到这样一种情况,child_task需要处理 N 个相同的调用,只是参数不同,但它必须产生一两个子进程,那么像以前一样使用 Pool,但另外传递一个要使用的托管child_task字典用于产生额外的进程(不尝试为此使用池)并检索它们的结果。更新 2我能弄清楚子进程本身使用池的唯一方法是使用模块ProcessPoolExecutor中的类concurrent.futures。当我试图用 做同样的事情时multiprocessing.Pool,我得到了一个错误,因为我们有守护进程试图创建自己的进程。但即使在这里,唯一的方法是池中的每个进程都有自己的进程池。您的计算机上只有有限数量的处理器/内核,因此除非在处理中混合了一些 I/O,否则您可以创建所有这些池,但进程将等待运行的机会。因此,尚不清楚将实现什么样的性能提升。还有关闭为child_task子进程创建的所有池的问题。通常一个ProcessPoolExecutor实例是使用with块,当该块终止时,将清理创建的池。但是child_task被重复调用并且显然不能使用with块,因为我们不希望不断地创建和销毁池。我来到这里有点麻烦:传递了第三个参数,True 或 False,指示是否child_task应该启动其池的关闭。此参数的默认值为 False,我们甚至懒得传递它。在检索到所有实际结果并且child_task进程现在空闲之后,我们提交 N 个具有虚拟值但shutdown设置为 True 的新任务。请注意,该ProcessPoolExecutor函数的map工作方式与类中的相同函数有很大不同Pool(阅读文档):from concurrent.futures import ProcessPoolExecutorimport timechild_executor = Nonedef child_task(x, y, shutdown=False):    global child_executor    if child_executor is None:        child_executor = ProcessPoolExecutor(max_workers=1)    if shutdown:        if child_executor:            child_executor.shutdown(False)            child_executor = None            time.sleep(.2) # make sure another process in the pool gets the next task        return None    # the child task spawns new task(s)    future = child_executor.submit(grandchild_task, y)    # we can compute one of the results using the current process:    return grandchild_task(x) + future.result()def grandchild_task(n):    return n * ndef main():    N_WORKERS = 2    with ProcessPoolExecutor(max_workers=N_WORKERS) as executor:        # first call is (1, 2), second call is (3, 4):        results = [result for result in executor.map(child_task, (1, 3), (2, 4))]        print(results)        # force a shutdown        # need N_WORKERS invocations:        [result for result in executor.map(child_task, (0,) * N_WORKERS, (0,) * N_WORKERS, (True,) * N_WORKERS)]if __name__ == '__main__':    main()印刷:[5, 25]

holdtom

检查此解决方案:#!/usr/bin/python# requires Python version 3.8 or higherfrom multiprocessing import Queue, Processimport timefrom random import randrangeimport osimport psutil# function to be run by each child processdef square(number):&nbsp; &nbsp; sleep = randrange(5)&nbsp; &nbsp; time.sleep(sleep)&nbsp; &nbsp; print(f'Result is {number * number}, computed by pid {os.getpid()}...sleeping {sleep} secs')# create a queue where all tasks will be placedqueue = Queue()# indicate how many number of children you want the system to create to run the tasksnumber_of_child_proceses = 5# put all tasks in the queue abovefor task in range(19):&nbsp; &nbsp; queue.put(task)# this the main entry/start of the program when you rundef main():&nbsp; &nbsp; number_of_task = queue.qsize()&nbsp; &nbsp; print(f'{"_" * 60}\nBatch: {number_of_task // number_of_child_proceses + 1} \n{"_" * 60}')&nbsp; &nbsp; # don't create more number of children than the number of tasks. Also, in the last round, wait for all child process&nbsp; &nbsp; # to complete so as to wrap up everything&nbsp; &nbsp; if number_of_task <= number_of_child_proceses:&nbsp; &nbsp; &nbsp; &nbsp; processes = [Process(target=square, args=(queue.get(),)) for _ in&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;range(number_of_task)]&nbsp; &nbsp; &nbsp; &nbsp; for p in processes:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; p.start()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; p.join()&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; processes = [Process(target=square, args=(queue.get(),)) for _ in range(number_of_child_proceses)]&nbsp; &nbsp; &nbsp; &nbsp; for p in processes:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; p.start()&nbsp; &nbsp; # update count of remaining task&nbsp; &nbsp; number_of_task = queue.qsize()&nbsp; &nbsp; # run the program in a loop until no more task remains in the queue&nbsp; &nbsp; while number_of_task:&nbsp; &nbsp; &nbsp; &nbsp; current_process = psutil.Process()&nbsp; &nbsp; &nbsp; &nbsp; children = current_process.children()&nbsp; &nbsp; &nbsp; &nbsp; # if children process have completed assigned task but there is still more remaining tasks in the queue,&nbsp; &nbsp; &nbsp; &nbsp; # assign them more tasks&nbsp; &nbsp; &nbsp; &nbsp; if not len(children) and number_of_task:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print(f'\nAssigned tasks completed... reasigning the remaining {number_of_task} task(s) in the queue\n')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; main()&nbsp; &nbsp; # exit the loop if no more task in the queue to work on&nbsp; &nbsp; print('\nAll tasks completed!!')&nbsp; &nbsp; exit()if __name__ == "__main__":&nbsp; &nbsp; main()

梵蒂冈之花

我环顾四周,找到了Ray,它使用嵌套的远程函数解决了这个确切的用例。
随时随地看视频慕课网APP

相关分类

Python
我要回答