使用“spawn”启动redis进程但面临TypeError:

我必须使用“spawn”来启动进程,因为我需要在进程之间传输 cuda 张量。但是使用“spawn”创建redis进程总是面临TypeError:无法pickle _thread.lock对象


由于某种原因,这段代码删除了某些部分


看来只有使用“fork”才能正常工作


import redis

from torch.multiprocessing import Process


class Buffer(Process):


    def __init__(self, name=0, num_peers=2, actor_queue=0, communicate_queue=0):

        Process.__init__(self)

      

        #some arguments

        self.actor_queue = actor_queue

        self.communicate_queue = communicate_queue

       

        pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)

        self.r = redis.Redis(connection_pool=pool)

        self.r.flushall()


    async def write(self, r):

    #do sth


    async def aggregate(self, r):

    #do sth


    def run(self):

        name_process = mp.current_process().name + str(mp.current_process().pid)

        print('starting...', name_process)

        loop = asyncio.get_event_loop()

        asyncio.set_event_loop(loop)

        tasks = asyncio.gather(

            loop.create_task(self.write(self.r)),

            loop.create_task(self.aggregate(self.r)),

        )

        try:

            loop.run_until_complete(tasks)

        finally:

            loop.close()


if __name__ == '__main__':

    mp.set_start_method('spawn')


    queue = mp.Queue(maxsize=5)

    queue.put('sth')

    name = 'yjsp'

    num_peers = 2

    p =Buffer(name, num_peers, queue, c_queue)

    p.start()


杨__羊羊
浏览 127回答 1
1回答

墨色风雨

问题解决了!我们应该在 run() 中定义池和其他东西原因如下:线程存在于进程内部,并且进程旋转子进程以启用并行。线程需要锁来防止资源问题,例如多个进程获取相同的资源并导致死锁。如果我们在 run() 中定义池,那么当我们进入 run() 方法时,我们就已经处于子进程中。像这样    def run(self):         pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)         r = redis.Redis(connection_pool=pool)         r.flushall()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python