

我有一个带有collections.Counterand 的单线程脚本gzip.open,需要几个小时才能完成。


但是,我找不到ProcessPoolExecutor将队列发送到的示例Executor,它们只是map列表中的单个项目。. 只有单线程示例asyncio.Queue

  • 这是一个巨大的文件,所以我无法读取整个文件并获得list之前的计数,因此我无法使用concurrent.futures.Executor.map. 但是我阅读的所有示例都使用固定列表作为开始。

  • 拆分和计算一个句子的时间相当于fork一个进程,所以我必须让每个消费者进程的寿命更长。我不认为mapcan merge Counters,所以我不能使用chunksize> 1。因此我必须给消费者一个队列并让他们继续计数直到整个文件完成。但大多数示例只向消费者发送一件物品并使用chunksize=1000以减少fork时间。


我希望代码向后兼容 Python 3.5.3,因为 PyPy 更快。


chr1    10011   141     0       157     4       41      50

chr1    10012   146     1       158     4       42      51

chr1    10013   150     0       163     4       43      53

chr1    10014   164     3       167     4       44      54

我需要计算第 3 到第 8 列的单列的每个直方图。所以我将词频作为一个更简单的例子。

csv.DictReader 花费大部分时间。


我的问题是,虽然 gzip 阅读器很快,但 csv 阅读器很快,我需要计算数十亿行。而且 csv 阅读器肯定比 gzip 阅读器慢。

因此,我需要将行传播到 csv 读取器的不同工作进程并分别进行下游计数。在一个生产者和许多消费者之间使用队列很方便。

由于我使用的是 Python,而不是 C,是否有一些抽象的多处理和队列包装器?这可以ProcessPoolExecutorQueue类一起使用吗?

一个 30 GB 的文本文件足以将您的问题放入大数据领域。所以为了解决这个问题,我建议使用大数据工具,比如 Hadoop 和 Spark。您所解释的“生产者-消费者流”基本上就是MapReduce算法的设计目的。字数频率是典型的 MapReduce 问题。查一查,你会发现很多例子。


我在周末学习了多处理库。停止按 Ctrl+C 并写入当前结果功能仍然无效。主要功能现在很好。#!/usr/bin/env pypy3import sysfrom collections import Counterfrom multiprocessing import Pool, Process, Manager, current_process, freeze_supportSamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')ChunkSize = 1024 * 128verbose = 0Nworkers = 16def main():&nbsp; &nbsp; import math&nbsp; &nbsp; if len(sys.argv) < 3 :&nbsp; &nbsp; &nbsp; &nbsp; print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)&nbsp; &nbsp; &nbsp; &nbsp; exit(0)&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; verbose = int(sys.argv[3])&nbsp; &nbsp; except: # `except IndexError:` and `except ValueError:`&nbsp; &nbsp; &nbsp; &nbsp; verbose = 0&nbsp; &nbsp; inDepthFile = sys.argv[1]&nbsp; &nbsp; outFile = sys.argv[2]&nbsp; &nbsp; print('From:[{}], To:[{}].\nVerbose: [{}].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)&nbsp; &nbsp; RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)&nbsp; &nbsp; for k in SamplesList:&nbsp; &nbsp; &nbsp; &nbsp; cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)&nbsp; &nbsp; &nbsp; &nbsp; cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)&nbsp; &nbsp; &nbsp; &nbsp; cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2])&nbsp; &nbsp;# E(X^2)-E(X)^2&nbsp; &nbsp; tsvout = open(outFile, 'wt')&nbsp; &nbsp; print('#{}\t{}'.format('Depth','\t'.join(SamplesList)),file=tsvout)&nbsp; &nbsp; #RecordCntLength = len(str(RecordCnt))&nbsp; &nbsp; print( '#N={},SD:\t{}'.format(RecordCnt,'\t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)&nbsp; &nbsp; for depth in range(0,MaxDepth+1):&nbsp; &nbsp; &nbsp; &nbsp; #print( '{}\t{}'.format(depth,'\t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )&nbsp; &nbsp; &nbsp; &nbsp; #print( '{}\t{}'.format(depth,'\t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )&nbsp; &nbsp; &nbsp; &nbsp; print( '{}\t{}'.format(depth,'\t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)&nbsp; &nbsp; &nbsp; &nbsp; #pass&nbsp; &nbsp; #print('#MaxDepth={}'.format(MaxDepth),file=tsvout)&nbsp; &nbsp; tsvout.close()&nbsp; &nbsp; passdef CallStat(inDepthFile):&nbsp; &nbsp; import gzip&nbsp; &nbsp; import itertools&nbsp; &nbsp; RecordCnt = 0&nbsp; &nbsp; MaxDepth = 0&nbsp; &nbsp; cDepthCnt = {key:Counter() for key in SamplesList}&nbsp; &nbsp; cDepthStat = {key:[0,0,0,0,0] for key in SamplesList} # x and x^2&nbsp; &nbsp; #lines_queue = Queue()&nbsp; &nbsp; manager = Manager()&nbsp; &nbsp; lines_queue = manager.Queue()&nbsp; &nbsp; stater_pool = Pool(Nworkers)&nbsp; &nbsp; TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)&nbsp; &nbsp; #ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]&nbsp; &nbsp; #MapResult = stater_pool.map_async(iStator,TASKS,1)&nbsp; &nbsp; AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; with gzip.open(inDepthFile, 'rt') as tsvfin:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines = tsvfin.readlines(ChunkSize)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines_queue.put(lines)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if not lines:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for i in range(Nworkers):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines_queue.put(b'\n\n')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; except KeyboardInterrupt:&nbsp; &nbsp; &nbsp; &nbsp; print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)&nbsp; &nbsp; &nbsp; &nbsp; for i in range(Nworkers):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines_queue.put(b'\n\n')&nbsp; &nbsp; &nbsp; &nbsp; pass&nbsp; &nbsp; #for results in ApplyResult:&nbsp; &nbsp; &nbsp; &nbsp; #(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()&nbsp; &nbsp; #for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():&nbsp; &nbsp; for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:&nbsp; &nbsp; &nbsp; &nbsp; RecordCnt += iRecordCnt&nbsp; &nbsp; &nbsp; &nbsp; if iMaxDepth > MaxDepth:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MaxDepth = iMaxDepth&nbsp; &nbsp; &nbsp; &nbsp; for k in SamplesList:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cDepthCnt[k].update(icDepthCnt[k])&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cDepthStat[k][0] += icDepthStat[k][0]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cDepthStat[k][1] += icDepthStat[k][1]&nbsp; &nbsp; return RecordCnt,MaxDepth,cDepthCnt,cDepthStat#def iStator(inQueue,inSamplesList):def iStator(args):&nbsp; &nbsp; (inQueue,inSamplesList) = args&nbsp; &nbsp; import csv&nbsp; &nbsp; # Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>&nbsp; &nbsp; cDepthCnt = {key:Counter() for key in inSamplesList}&nbsp; &nbsp; cDepthStat = {key:[0,0] for key in inSamplesList} # x and x^2&nbsp; &nbsp; RecordCnt = 0&nbsp; &nbsp; MaxDepth = 0&nbsp; &nbsp; for lines in iter(inQueue.get, b'\n\n'):&nbsp; &nbsp; &nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tsvin = csv.DictReader(lines, delimiter='\t', fieldnames=('ChrID','Pos')+inSamplesList )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for row in tsvin:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; #print(', '.join(row[col] for col in inSamplesList))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; RecordCnt += 1&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for k in inSamplesList:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; theValue = int(row[k])&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if theValue > MaxDepth:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MaxDepth = theValue&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; #DepthCnt[k][theValue] += 1&nbsp; # PyPy3:30.54 ns, Python3:22.23 ns&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; #yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cDepthCnt[k][theValue] += 1&nbsp; # PyPy3:29.82 ns, Python3:30.61 ns&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cDepthStat[k][0] += theValue&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cDepthStat[k][1] += theValue * theValue&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; #print(MaxDepth,DepthCnt)&nbsp; &nbsp; &nbsp; &nbsp; except KeyboardInterrupt:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pass&nbsp; &nbsp; &nbsp; &nbsp; #print('[!]{} Lines Read:[{}], MaxDepth is [{}].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)&nbsp; &nbsp; return RecordCnt,MaxDepth,cDepthCnt,cDepthStatif __name__ == "__main__":&nbsp; &nbsp; main()&nbsp; # time python3 ./samdepthplot.py t.tsv.gz 1


只是一些伪代码:from concurrent.futures import ProcessPoolExecutorfrom multiprocessing import Managerimport tracebackWORKER_POOL_SIZE = 10&nbsp; # you should set this as the number of your processesQUEUE_SIZE = 100&nbsp; &nbsp; &nbsp; &nbsp;# 10 times to your pool size is good enoughdef main():&nbsp; &nbsp; with Manager() as manager:&nbsp; &nbsp; &nbsp; &nbsp; q = manager.Queue(QUEUE_SIZE)&nbsp; &nbsp; &nbsp; &nbsp; # init worker pool&nbsp; &nbsp; &nbsp; &nbsp; executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)&nbsp; &nbsp; &nbsp; &nbsp; workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]&nbsp; &nbsp; &nbsp; &nbsp; # start producer&nbsp; &nbsp; &nbsp; &nbsp; run_producer(q)&nbsp; &nbsp; &nbsp; &nbsp; # wait to done&nbsp; &nbsp; &nbsp; &nbsp; for f in workers_pool:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f.result()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; except Exception:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; traceback.print_exc()def run_producer(q):&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; with open("your file path") as fp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for line in fp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; q.put(line)&nbsp; &nbsp; except Exception:&nbsp; &nbsp; &nbsp; &nbsp; traceback.print_exc()&nbsp; &nbsp; finally:&nbsp; &nbsp; &nbsp; &nbsp; q.put(None)def worker(i, q):&nbsp; &nbsp; while 1:&nbsp; &nbsp; &nbsp; &nbsp; line = q.get()&nbsp; &nbsp; &nbsp; &nbsp; if line is None:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print(f'worker {i} is done')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; q.put(None)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; # do something with this line&nbsp; &nbsp; &nbsp; &nbsp; # ...

