猿问

python过滤器+多处理+迭代器延迟加载

我有一个二维数组,它产生一个巨大的(> 300GB)组合列表,所以我想对 itertools.combinations 生成的迭代器进行惰性迭代并并行化这个操作。问题是我需要过滤输出,而 Multiprocessing 不支持。我现有的解决方法需要将组合列表加载到内存中,由于列表的大小,这也不起作用。



n_nodes = np.random.randn(10, 100)

cutoff=0.3


def node_combinations(nodes):

    return itertools.combinations(list(range(len(nodes))), 2)    


def pfilter(func, candidates):

    return np.asarray([c for c, keep in zip(candidates, pool.map(func, candidates)) if keep])


def pearsonr(xy: tuple):

    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]

    if correlation_coefficient >= cutoff:

            return True

        else:

            return False



edgelist = pfilter(pearsonr, node_combinations(n_nodes))


我正在寻找一种使用带过滤器而不是映射的多处理对大型迭代器进行惰性评估的方法。


慕少森
浏览 129回答 2
2回答

犯罪嫌疑人X

下面使用信号量来减慢过度急切的池线程。不是正确的解决方案,因为它不能解决其他问题,例如使用相同池的嵌套循环和循环 imap 的结果在任何内部循环作业开始之前完成其外部循环的作业。但它确实限制了内存使用:def slowdown(n=16):    s = threading.Semaphore(n)    def inner(it):        for item in it:            s.acquire()            yield item    def outer(it):        for item in it:            s.release()            yield item    return outer, inner这用于包装pool.imap:outer, inner = slowdown()outer(pool.imap(func, inner(candidates)))

Cats萌萌

Hoxha 的建议效果很好——谢谢!@Dan 的问题是,即使是空列表也会占用内存,420 亿个配对在内存中接近 3TB。这是我的实现:import more_itertoolsimport itertoolsimport multiprocessing as mpimport numpy as npimport scipyfrom tqdm import tqdmn_nodes = np.random.randn(10, 100)num_combinations = int((int(n_nodes.shape[0]) ** 2) - int(n_nodes.shape[0]) // 2)cpu_count = 8cutoff=0.3def node_combinations(nodes):    return itertools.combinations(list(range(len(nodes))), 2)    def edge_gen(xy_iterator: type(itertools.islice)):    edges = []    for cand in tqdm(xy_iterator, total=num_combinations//cpu_count)        if pearsonr(cand):            edges.append(cand)def pearsonr(xy: tuple):    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]    if correlation_coefficient >= cutoff:            return True        else:            return Falseslices = more_itertools.distribute(cpu_count), node_combinations(n_nodes))pool = mp.Pool(cpu_count)results = pool.imap(edge_gen, slices)pool.close()pool.join()
随时随地看视频慕课网APP

相关分类

Python
我要回答