等待循环内pool.apply_async

我第一次尝试在我的python代码中实现多处理。我卡住了,因为我无法制作async_apply等待其所有过程完成。我想以较小的块处理元素,并在浏览一长串元素时保存结果。


举个简单的例子:


import multiprocessing as mp


def fun(x, y):

    print("here")

    return(x+y)


buffer = []


for val in range(10):

    buffer.append(val)

    print(f{Added value: {val})

    if len(buffer) == 5:

        #It is my understanding, this is necessary on Windows

        if __name__ == "__main__":

            pool = mp.Pool()

            res = [pool.apply_async(fun, args = (x,x)) for x in buffer]

            res = [r.wait() for r in res]

            print(f'Results: {res}')

            buffer = []

            pool.close()

            pool.join()


我希望这能产生以下输出:


Added value: 0

Added value: 1

Added value: 2

Added value: 3

Added value: 4

Here

Here

Here

Here

Here

Results: [0, 2, 4, 6, 8]

Added value: 5

Added value: 6

Added value: 7

Added value: 8

Added value: 9

Here

Here

Here

Here

Here

Results: [10, 12, 14, 16, 18]

但它实际上产生了这个(至少在我的机器上):


Added value: 0

Added value: 1

Added value: 2

Added value: 3

Added value: 4

Added value: 0

Added value: 1

Added value: 2

Added value: 3

Added value: 4

Added value: 5

Added value: 6

Added value: 7

Added value: 8

Added value: 9

Added value: 0

Added value: 1

Added value: 2

Added value: 3

Added value: 4

Added value: 5

Added value: 6

Added value: 7

Added value: 8

Added value: 9

Added value: 0

Added value: 1

Added value: 2

Added value: 3

Added value: 4

Added value: 5

Added value: 6

Added value: 7

Added value: 8

Added value: 9

Added value: 0

Added value: 1

Added value: 2

Added value: 3

Added value: 4

Added value: 5

Added value: 6

Added value: 7

Added value: 8

Added value: 9

Here

Here

Here

Here

Here

Results: [None, None, None, None, None]

Added value: 5

Added value: 6

Added value: 7

Added value: 8

Added value: 9

Added value: 0

Added value: 1

Added value: 2

Added value: 3

Added value: 4

Added value: 5

Added value: 6

Added value: 7

Added value: 8

Added value: 9


任何建议真的非常感谢。


qq_笑_17
浏览 259回答 1
1回答

德玛西亚99

尝试将整个 for 循环放在条件套件中。...if __name__ == '__main__':    for val in range(10):        buffer.append(val)        print(f'Added value: {val}')        if len(buffer) == 5:            pool = mp.Pool()            res = [pool.apply_async(fun, args = (x,x)) for x in buffer]            # wait til they are ALL done ?            for r in res:                r.wait()            # get the return values            res = [r.get() for r in res]            print(f'Results: {res}')            buffer = []            pool.close()            pool.join()这是您的原件,经过一些额外的检查。我仍然不知道为什么,但似乎不知何故,for循环中的行在多个python进程中运行。import multiprocessing as mpimport pickledef fun(x, y, pid=None):    print(f"here pid:{pid}",file=sys.stderr)    return (x+y,pid)buffer = []stuff = []with open(r'c:\pyProjects\stuff.pkl','wb') as f:    pickle.dump(stuff,f)for val in range(10):    buffer.append(val)    pid = os.getpid()    print(f'Added value: {val}.   pid={pid}')    d = {'val':val,'pid':pid}    with open(r'c:\pyProjects\stuff.pkl','rb') as f:        try:            stuff = pickle.load(f)            stuff.append(d)        except EOFError as e:            s = '\n'.join(f'\t\t\t\t{item}' for item in stuff)            print(f'\t\t\tEOFError {d}\n\t\t\tstuff:\n{s}\n')    with open(r'c:\pyProjects\stuff.pkl','wb') as f:        pickle.dump(stuff,f)    if len(buffer) == 5:        print(buffer)        #It is my understanding, this is necessary on Windows        if __name__ == "__main__":            pool = mp.Pool()            res = [pool.apply_async(fun, args = (x,x,pid)) for x in buffer]            res = [r.get() for r in res]            print(f'\t\t\tResults: {res}')            buffer = []            pool.close()            pool.join()完成后,您可以加载并仔细阅读腌制文件>>> import pickle>>> from pprint import pprint>>> with open(r'c:\pyProjects\stuff.pkl','rb') as f:...     a = pickle.load(f)>>> a.sort(key=lambda x: x['pid'])>>> pprint(a)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python