猿问

Python 使用多进程加速合并计数器

我正在尝试使用一起购买的物品的次数来制作一个非常简单的物品推荐系统,


所以首先我创建了一个像计数器一样的 item2item 字典


# people purchased A with B 4 times, A with C 3 times.

item2item = {'A': {'B': 4, 'C': 3}, 'B': {'A': 4, 'C': 2}, 'C':{'A': 3, 'B': 2}}

# recommend user who purchased A and C

samples_list = [['A', 'C'], ...]    

因此,对于 samples = ['A', 'C'],我建议最大 item2item['A'] + item2item['C']。


但是,对于大型矩阵,合并很重,所以我尝试使用如下的多处理


from operator import add

from functools import reduce

from concurrent.futures import ProcessPoolExecutor

from collections import Counter


with ProcessPoolExecutor(max_workers=10) as pool:

    for samples in samples_list:

        # w/o PoolExecutor

        # combined = reduce(add, [item2item[s] for s in samples], Counter())

        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())

        combined = future.result()

然而,这根本没有加快这个过程。


我怀疑在Python multiprocessing 和 shared counter和https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes中,reduce 函数中的 Counter 未共享。


任何帮助表示赞赏。


喵喔喔
浏览 179回答 1
1回答

收到一只叮咚

调用combined = future.result()会阻塞,直到结果完成,因此您不会在前一个请求完成之前向池提交后续请求。换句话说,您永远不会运行多个子进程。至少您应该将代码更改为:with ProcessPoolExecutor(max_workers=10) as pool:    the_futures = []    for samples in tqdm(sample_list):        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())        the_futures.append(future) # save it    results = [f.result() for f in the_futures()] # all the results另一种方式:with ProcessPoolExecutor(max_workers=10) as pool:    the_futures = []    for samples in tqdm(sample_list):        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())        the_futures.append(future) # save it    # you need: from concurrent.futures import as_completed    for future in as_completed(the_futures): # not necessarily the order of submission        result = future.result() # do something with this此外,如果您未指定构造函数,则默认为您机器上的处理器数量max_workers。ProcessPoolExecutor指定一个大于您实际拥有的处理器数量的值不会有任何收获。更新如果您想在结果完成后立即处理结果并需要一种方法将结果与原始请求联系起来,您可以将期货作为键存储在字典中,其中相应的值表示请求的参数。在这种情况下:with ProcessPoolExecutor(max_workers=10) as pool:    the_futures = {}    for samples in tqdm(sample_list):        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())        the_futures[future] = samples # map future to request    # you need: from concurrent.futures import as_completed    for future in as_completed(the_futures): # not necessarily the order of submission        samples = the_futures[future] # the request        result = future.result() # the result
随时随地看视频慕课网APP

相关分类

Python
我要回答