使用 multiprocessing.Pool 时如何理解 multiprocessing.

为什么我不能把process在Pool成Queue?

这里我的代码在使用时有效Pool并且可以获得Test实例属性。


from multiprocessing import Pool

from multiprocessing import Queue



class Test(object):

    def __init__(self, num):

        self.num = num



if __name__ == '__main__':

    p = Pool()

    procs = []

    for i in range(5):

        proc = p.apply_async(Test, args=(i,))

        procs.append(proc)

    p.close()

    for each in procs:

        test = each.get(10)

        print(test.num)

    p.join()

当我尝试使用Queuenot pythonlist来存储进程时,这是行不通的。


我的代码:


from multiprocessing import Pool

from multiprocessing import Queue



class Test(object):

    def __init__(self, num):

        self.num = num



if __name__ == '__main__':

    p = Pool()

    q = Queue()

    for i in range(5):

        proc = p.apply_async(Test, args=(i,))

        q.put(proc)

    p.close()

    while not q.empty():

        q.get()

    p.join()

错误消息:


Traceback (most recent call last):

  File "C:\Users\laich\AppData\Local\Programs\Python\Python36- 

32\lib\multiprocessing\queues.py", line 234, in _feed

    obj = _ForkingPickler.dumps(obj)

  File "C:\Users\laich\AppData\Local\Programs\Python\Python36- 

32\lib\multiprocessing\reduction.py", line 51, in dumps

    cls(buf, protocol).dump(obj)

TypeError: can't pickle _thread.lock objects

我去看多处理文档:


class multiprocessing.Queue([maxsize]) 返回使用管道和一些锁/信号量实现的进程共享队列。当一个进程第一次将一个项目放入队列时,一个馈线线程将启动,它将对象从缓冲区传输到管道中。


标准库队列模块中的通常queue.Empty和queue.Full异常被引发以发出超时信号。


Queue 实现了queue.Queue除了task_done()and之外的所有方法join()。


这里说“放置一个项目”,这个项目不能是任何东西(python对象)?在我来说,我试图把process在Pool()成Queue。


Smart猫小萌
浏览 312回答 2
2回答

牛魔王的故事

您的Queue基于代码的代码至少存在两个问题。Pool.apply_async方法返回一个AsyncResult对象,而不是一个进程。您可以调用get该对象来获取相应过程的结果。考虑到这种差异,让我们看看您的代码:proc = p.apply_async(Test, args=(i,)) # Returns an AsyncResult objectq.put(proc) # won't work在您的情况下,第二行将始终失败。您放入队列的任何内容都必须是可腌制的,因为multiprocess.Queue使用序列化。这没有很好的文档记录,并且Python 的问题跟踪器中有一个未解决的问题来更新文档。问题是AsyncResult不能腌制。你可以自己试试:import pickleimport multiprocessing as mpwith mp.Pool() as p:    result = p.apply_async(lambda x: x, (1,))pickle.dumps(result) # ErrorAsyncResult内部包含一些锁定对象,它们不可序列化。让我们转到下一个问题:while not q.empty():    q.get()如果我没记错的话,在上面的代码中,您要调用AsyncResult.get而不是Queue.get. 在这种情况下,您必须首先从队列中获取您的对象,然后在您的对象上调用相应的方法。但是,在您的代码中情况并非如此,因为AsyncResult它不可序列化。

HUX布斯

AsyncResult对象不能被腌制,这multiprocessing.Queue是必需的。但是,这里不需要一个,因为队列没有在进程之间共享。这意味着您可以只使用常规的Queue.from multiprocessing import Pool#from multiprocessing import Queuefrom queue import Queueclass Test(object):    def __init__(self, num):        self.num = num        print('Test({!r}) created'.format(num))if __name__ == '__main__':    p = Pool()    q = Queue()    for i in range(5):        proc = p.apply_async(Test, args=(i,))        q.put(proc)    p.close()    while not q.empty():        q.get()    p.join()    print('done')输出:Test(0)Test(1)Test(2)Test(3)Test(4)done
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python