如何从并行进程中运行的函数中检索值?

Multiprocessing 模块对于 Python 初学者来说非常令人困惑,特别是对于那些刚从 MATLAB 迁移并对其并行计算工具箱变得懒惰的人。我有以下函数需要大约 80 秒才能运行,我想通过使用 Python 的多处理模块来缩短这个时间。


from time import time


xmax   = 100000000


start = time()

for x in range(xmax):

    y = ((x+5)**2+x-40)

    if y <= 0xf+1:

        print('Condition met at: ', y, x)

end  = time()

tt   = end-start #total time

print('Each iteration took: ', tt/xmax)

print('Total time:          ', tt)

这按预期输出:


Condition met at:  -15 0

Condition met at:  -3 1

Condition met at:  11 2

Each iteration took:  8.667453265190124e-07

Total time:           86.67453265190125

由于循环的任何迭代都不依赖于其他循环,因此我尝试采用官方文档中的此服务器进程来扫描单独进程中的范围块。最后我想出了 vartec 对这个问题的回答,可以准备以下代码。我还根据 Darkonaut 对当前问题的回答更新了代码。


from time import time 

import multiprocessing as mp


def chunker (rng, t): # this functions makes t chunks out of rng

    L  = rng[1] - rng[0]

    Lr = L % t

    Lm = L // t

    h  = rng[0]-1

    chunks = []

    for i in range(0, t):

        c  = [h+1, h + Lm]

        h += Lm

        chunks.append(c)

    chunks[t-1][1] += Lr + 1

    return chunks


def worker(lock, xrange, return_dict):

    '''worker function'''

    for x in range(xrange[0], xrange[1]):

        y = ((x+5)**2+x-40)

        if y <= 0xf+1:

            print('Condition met at: ', y, x)

            return_dict['x'].append(x)

            return_dict['y'].append(y)

            with lock:                

                list_x = return_dict['x']

                list_y = return_dict['y']

                list_x.append(x)

                list_y.append(y)

                return_dict['x'] = list_x

                return_dict['y'] = list_y

这将运行时间大大减少到约 17 秒。但是,我的共享变量无法检索任何值。请帮我找出代码的哪一部分出错了。




POPMUISE
浏览 130回答 1
1回答

达令说

您示例中的问题是Manager.dict不会传播对其中的标准可变结构的修改。我首先向您展示如何与经理一起修复它,只是为了向您展示更好的选择。multiprocessing.Manager有点重,因为它使用单独的进程来Manager处理共享对象需要使用锁来保持数据一致性。如果您在一台机器上运行它,有更好的选择multiprocessing.Pool,以防您不必运行自定义Process类,如果必须,multiprocessing.Process与multiprocessing.Queue将是执行此操作的常用方法。引用部分来自多处理文档。经理如果标准(非代理)列表或字典对象包含在引用对象中,对这些可变值的修改将不会通过管理器传播,因为代理无法知道其中包含的值何时被修改。但是,在容器代理中存储一个值(触发代理对象上的setitem)确实会通过管理器传播,因此要有效地修改这样的项目,可以将修改后的值重新分配给容器代理......在您的情况下,这看起来像:def worker(xrange, return_dict, lock):&nbsp; &nbsp; """worker function"""&nbsp; &nbsp; for x in range(xrange[0], xrange[1]):&nbsp; &nbsp; &nbsp; &nbsp; y = ((x+5)**2+x-40)&nbsp; &nbsp; &nbsp; &nbsp; if y <= 0xf+1:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('Condition met at: ', y, x)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; with lock:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; list_x = return_dict['x']&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; list_y = return_dict['y']&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; list_x.append(x)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; list_y.append(y)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return_dict['x'] = list_x&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return_dict['y'] = list_y在lock这里将是一个manager.Lock你必须传递下去作为参数,因为整个(现在)被锁定的操作本身不是原子实例。(这 是一个Manager使用 Lock的更简单的例子)对于大多数用例,这种方法可能不如使用嵌套代理对象方便,但也展示了对同步的一定程度的控制。由于 Python 3.6 代理对象是可嵌套的:在 3.6 版更改: 共享对象能够嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager 管理和同步。从 Python 3.6 开始,您可以manager.dict在开始多处理之前使用manager.listas 值填充您的值,然后直接附加到工作程序中而无需重新分配。return_dict['x'] = manager.list()return_dict['y'] = manager.list()编辑:这是完整的示例Manager:import timeimport multiprocessing as mpfrom multiprocessing import Manager, Processfrom contextlib import contextmanager# mp_util.py from first link in code-snippet for "Pool"# section belowfrom mp_utils import calc_batch_sizes, build_batch_ranges# def context_timer ... see code snippet in "Pool" section belowdef worker(batch_range, return_dict, lock):&nbsp; &nbsp; """worker function"""&nbsp; &nbsp; for x in batch_range:&nbsp; &nbsp; &nbsp; &nbsp; y = ((x+5)**2+x-40)&nbsp; &nbsp; &nbsp; &nbsp; if y <= 0xf+1:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('Condition met at: ', y, x)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; with lock:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return_dict['x'].append(x)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return_dict['y'].append(y)if __name__ == '__main__':&nbsp; &nbsp; N_WORKERS = mp.cpu_count()&nbsp; &nbsp; X_MAX = 100000000&nbsp; &nbsp; batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)&nbsp; &nbsp; batch_ranges = build_batch_ranges(batch_sizes)&nbsp; &nbsp; print(batch_ranges)&nbsp; &nbsp; with Manager() as manager:&nbsp; &nbsp; &nbsp; &nbsp; lock = manager.Lock()&nbsp; &nbsp; &nbsp; &nbsp; return_dict = manager.dict()&nbsp; &nbsp; &nbsp; &nbsp; return_dict['x'] = manager.list()&nbsp; &nbsp; &nbsp; &nbsp; return_dict['y'] = manager.list()&nbsp; &nbsp; &nbsp; &nbsp; tasks = [(batch_range, return_dict, lock)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for batch_range in batch_ranges]&nbsp; &nbsp; &nbsp; &nbsp; with context_timer():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pool = [Process(target=worker, args=args)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for args in tasks]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for p in pool:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; p.start()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for p in pool:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; p.join()&nbsp; &nbsp; &nbsp; &nbsp; # Create standard container with data from manager before exiting&nbsp; &nbsp; &nbsp; &nbsp; # the manager.&nbsp; &nbsp; &nbsp; &nbsp; result = {k: list(v) for k, v in return_dict.items()}&nbsp; &nbsp; print(result)水池大多数情况下,multiprocessing.Pool只会这样做。由于您想在一个范围内分布迭代,因此您的示例还有一个额外的挑战。chunker即使每个进程都有大致相同的工作要做,您的函数也无法划分范围:chunker((0, 21), 4)# Out: [[0, 4], [5, 9], [10, 14], [15, 21]]&nbsp; # 4, 4, 4, 6!对于下面请抢代码的代码片段mp_utils.py从我的答案在这里,它提供了两个功能区块范围内尽量均匀。随着multiprocessing.Pool你的worker功能,只是必须返回结果,并Pool会采取运送结果返回的照顾了内部队列回父进程。这result将是一个列表,因此您必须以您希望的方式再次重新排列结果。您的示例可能如下所示:import timeimport multiprocessing as mpfrom multiprocessing import Poolfrom contextlib import contextmanagerfrom itertools import chainfrom mp_utils import calc_batch_sizes, build_batch_ranges@contextmanagerdef context_timer():&nbsp; &nbsp; start_time = time.perf_counter()&nbsp; &nbsp; yield&nbsp; &nbsp; end_time = time.perf_counter()&nbsp; &nbsp; total_time&nbsp; &nbsp;= end_time-start_time&nbsp; &nbsp; print(f'\nEach iteration took: {total_time / X_MAX:.4f} s')&nbsp; &nbsp; print(f'Total time:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {total_time:.4f} s\n')def worker(batch_range):&nbsp; &nbsp; """worker function"""&nbsp; &nbsp; result = []&nbsp; &nbsp; for x in batch_range:&nbsp; &nbsp; &nbsp; &nbsp; y = ((x+5)**2+x-40)&nbsp; &nbsp; &nbsp; &nbsp; if y <= 0xf+1:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('Condition met at: ', y, x)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result.append((x, y))&nbsp; &nbsp; return resultif __name__ == '__main__':&nbsp; &nbsp; N_WORKERS = mp.cpu_count()&nbsp; &nbsp; X_MAX = 100000000&nbsp; &nbsp; batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)&nbsp; &nbsp; batch_ranges = build_batch_ranges(batch_sizes)&nbsp; &nbsp; print(batch_ranges)&nbsp; &nbsp; with context_timer():&nbsp; &nbsp; &nbsp; &nbsp; with Pool(N_WORKERS) as pool:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; results = pool.map(worker, iterable=batch_ranges)&nbsp; &nbsp; print(f'results: {results}')&nbsp; &nbsp; x, y = zip(*chain.from_iterable(results))&nbsp; # filter and sort results&nbsp; &nbsp; print(f'results sorted: x: {x}, y: {y}')示例输出:[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000),&nbsp;range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]Condition met at:&nbsp; -15 0Condition met at:&nbsp; -3 1Condition met at:&nbsp; 11 2Each iteration took: 0.0000 sTotal time:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 8.2408 sresults: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]results sorted: x: (0, 1, 2), y: (-15, -3, 11)Process finished with exit code 0如果您有多个参数,您worker将使用参数元组构建一个“任务”列表,并pool.map(...)与pool.starmap(...iterable=tasks). 有关更多详细信息,请参阅文档。进程和队列如果multiprocessing.Pool由于某种原因无法使用,则必须自己处理进程间通信 (IPC),方法是将multiprocessing.Queueas 参数传递给子 进程中的工作函数,并让它们将结果排入队列以发送回给父母。您还必须构建类似 Pool 的结构,以便您可以对其进行迭代以启动和加入进程,并且您必须get()从队列中返回结果。Queue.get我在这里写了更多关于用法的信息。使用这种方法的解决方案可能如下所示:def worker(result_queue, batch_range):&nbsp; &nbsp; """worker function"""&nbsp; &nbsp; result = []&nbsp; &nbsp; for x in batch_range:&nbsp; &nbsp; &nbsp; &nbsp; y = ((x+5)**2+x-40)&nbsp; &nbsp; &nbsp; &nbsp; if y <= 0xf+1:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('Condition met at: ', y, x)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result.append((x, y))&nbsp; &nbsp; result_queue.put(result)&nbsp; # <--if __name__ == '__main__':&nbsp; &nbsp; N_WORKERS = mp.cpu_count()&nbsp; &nbsp; X_MAX = 100000000&nbsp; &nbsp; result_queue = mp.Queue()&nbsp; # <--&nbsp; &nbsp; batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)&nbsp; &nbsp; batch_ranges = build_batch_ranges(batch_sizes)&nbsp; &nbsp; print(batch_ranges)&nbsp; &nbsp; with context_timer():&nbsp; &nbsp; &nbsp; &nbsp; pool = [Process(target=worker, args=(result_queue, batch_range))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for batch_range in batch_ranges]&nbsp; &nbsp; &nbsp; &nbsp; for p in pool:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; p.start()&nbsp; &nbsp; &nbsp; &nbsp; results = [result_queue.get() for _ in batch_ranges]&nbsp; &nbsp; &nbsp; &nbsp; for p in pool:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; p.join()&nbsp; &nbsp; print(f'results: {results}')&nbsp; &nbsp; x, y = zip(*chain.from_iterable(results))&nbsp; # filter and sort results&nbsp; &nbsp; print(f'results sorted: x: {x}, y: {y}')
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python