达令说
您示例中的问题是Manager.dict不会传播对其中的标准可变结构的修改。我首先向您展示如何与经理一起修复它,只是为了向您展示更好的选择。multiprocessing.Manager有点重,因为它使用单独的进程来Manager处理共享对象需要使用锁来保持数据一致性。如果您在一台机器上运行它,有更好的选择multiprocessing.Pool,以防您不必运行自定义Process类,如果必须,multiprocessing.Process与multiprocessing.Queue将是执行此操作的常用方法。引用部分来自多处理文档。经理如果标准(非代理)列表或字典对象包含在引用对象中,对这些可变值的修改将不会通过管理器传播,因为代理无法知道其中包含的值何时被修改。但是,在容器代理中存储一个值(触发代理对象上的setitem)确实会通过管理器传播,因此要有效地修改这样的项目,可以将修改后的值重新分配给容器代理......在您的情况下,这看起来像:def worker(xrange, return_dict, lock): """worker function""" for x in range(xrange[0], xrange[1]): y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) with lock: list_x = return_dict['x'] list_y = return_dict['y'] list_x.append(x) list_y.append(y) return_dict['x'] = list_x return_dict['y'] = list_y在lock这里将是一个manager.Lock你必须传递下去作为参数,因为整个(现在)被锁定的操作本身不是原子实例。(这 是一个Manager使用 Lock的更简单的例子)对于大多数用例,这种方法可能不如使用嵌套代理对象方便,但也展示了对同步的一定程度的控制。由于 Python 3.6 代理对象是可嵌套的:在 3.6 版更改: 共享对象能够嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由 SyncManager 管理和同步。从 Python 3.6 开始,您可以manager.dict在开始多处理之前使用manager.listas 值填充您的值,然后直接附加到工作程序中而无需重新分配。return_dict['x'] = manager.list()return_dict['y'] = manager.list()编辑:这是完整的示例Manager:import timeimport multiprocessing as mpfrom multiprocessing import Manager, Processfrom contextlib import contextmanager# mp_util.py from first link in code-snippet for "Pool"# section belowfrom mp_utils import calc_batch_sizes, build_batch_ranges# def context_timer ... see code snippet in "Pool" section belowdef worker(batch_range, return_dict, lock): """worker function""" for x in batch_range: y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) with lock: return_dict['x'].append(x) return_dict['y'].append(y)if __name__ == '__main__': N_WORKERS = mp.cpu_count() X_MAX = 100000000 batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS) batch_ranges = build_batch_ranges(batch_sizes) print(batch_ranges) with Manager() as manager: lock = manager.Lock() return_dict = manager.dict() return_dict['x'] = manager.list() return_dict['y'] = manager.list() tasks = [(batch_range, return_dict, lock) for batch_range in batch_ranges] with context_timer(): pool = [Process(target=worker, args=args) for args in tasks] for p in pool: p.start() for p in pool: p.join() # Create standard container with data from manager before exiting # the manager. result = {k: list(v) for k, v in return_dict.items()} print(result)水池大多数情况下,multiprocessing.Pool只会这样做。由于您想在一个范围内分布迭代,因此您的示例还有一个额外的挑战。chunker即使每个进程都有大致相同的工作要做,您的函数也无法划分范围:chunker((0, 21), 4)# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!对于下面请抢代码的代码片段mp_utils.py从我的答案在这里,它提供了两个功能区块范围内尽量均匀。随着multiprocessing.Pool你的worker功能,只是必须返回结果,并Pool会采取运送结果返回的照顾了内部队列回父进程。这result将是一个列表,因此您必须以您希望的方式再次重新排列结果。您的示例可能如下所示:import timeimport multiprocessing as mpfrom multiprocessing import Poolfrom contextlib import contextmanagerfrom itertools import chainfrom mp_utils import calc_batch_sizes, build_batch_ranges@contextmanagerdef context_timer(): start_time = time.perf_counter() yield end_time = time.perf_counter() total_time = end_time-start_time print(f'\nEach iteration took: {total_time / X_MAX:.4f} s') print(f'Total time: {total_time:.4f} s\n')def worker(batch_range): """worker function""" result = [] for x in batch_range: y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) result.append((x, y)) return resultif __name__ == '__main__': N_WORKERS = mp.cpu_count() X_MAX = 100000000 batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS) batch_ranges = build_batch_ranges(batch_sizes) print(batch_ranges) with context_timer(): with Pool(N_WORKERS) as pool: results = pool.map(worker, iterable=batch_ranges) print(f'results: {results}') x, y = zip(*chain.from_iterable(results)) # filter and sort results print(f'results sorted: x: {x}, y: {y}')示例输出:[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]Condition met at: -15 0Condition met at: -3 1Condition met at: 11 2Each iteration took: 0.0000 sTotal time: 8.2408 sresults: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]results sorted: x: (0, 1, 2), y: (-15, -3, 11)Process finished with exit code 0如果您有多个参数,您worker将使用参数元组构建一个“任务”列表,并pool.map(...)与pool.starmap(...iterable=tasks). 有关更多详细信息,请参阅文档。进程和队列如果multiprocessing.Pool由于某种原因无法使用,则必须自己处理进程间通信 (IPC),方法是将multiprocessing.Queueas 参数传递给子 进程中的工作函数,并让它们将结果排入队列以发送回给父母。您还必须构建类似 Pool 的结构,以便您可以对其进行迭代以启动和加入进程,并且您必须get()从队列中返回结果。Queue.get我在这里写了更多关于用法的信息。使用这种方法的解决方案可能如下所示:def worker(result_queue, batch_range): """worker function""" result = [] for x in batch_range: y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) result.append((x, y)) result_queue.put(result) # <--if __name__ == '__main__': N_WORKERS = mp.cpu_count() X_MAX = 100000000 result_queue = mp.Queue() # <-- batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS) batch_ranges = build_batch_ranges(batch_sizes) print(batch_ranges) with context_timer(): pool = [Process(target=worker, args=(result_queue, batch_range)) for batch_range in batch_ranges] for p in pool: p.start() results = [result_queue.get() for _ in batch_ranges] for p in pool: p.join() print(f'results: {results}') x, y = zip(*chain.from_iterable(results)) # filter and sort results print(f'results sorted: x: {x}, y: {y}')