猿问

调用 thread.join() 是否会在异步上下文中阻止事件循环?

我正在使用aiohttp实现一个 Web API ,使用启用了 UVloop 的 gunicorn部署--worker-class aiohttp.GunicornUVLoopWebWorker。因此,我的代码总是在异步上下文中运行。我有在处理请求时实现并行作业以获得更好性能的想法。


我没有使用,asyncio因为我想要Parallelism,而不是Concurrency。


我知道python 中的多处理和GIL 问题。但加入流程也适用于我的问题。


下面是一个例子:


from aiohttp.web import middleware


@middleware

async def context_init(request, handler):

    request.context = {}

    request.context['threads'] = []

    ret = await handler(request)

    for thread in request.context['threads']:

        thread.join()

    return ret

考虑到thread.join()或process.join()阻塞当前线程,这将阻塞事件循环(据我所知)。如何异步加入?我想要的可以形象地表示为:await thread.join()或await process.join()。


更新:


感谢@user4815162342,我能够为我的项目编写正确的代码:


中间件:


from aiohttp.web import middleware

from util.process_session import ProcessSession


@middleware

async def context_init(request, handler):

    request.context = {}

    request.context['process_session'] = ProcessSession()

    request.context['processes'] = {}

    ret = await handler(request)

    await request.context['process_session'].wait_for_all()

    return ret

用途:


import asyncio

import concurrent.futures

from functools import partial


class ProcessSession():


    def __init__(self):

        self.loop = asyncio.get_running_loop()

        self.pool = concurrent.futures.ProcessPoolExecutor()

        self.futures = []


    async def wait_for_all(self):

        await asyncio.wait(self.futures)


    def add_process(self, f, *args, **kwargs):

        ret = self.loop.run_in_executor(self.pool, partial(f, *args, **kwargs))

        self.futures.append(ret)

        return ret


class ProcessBase():


    def __init__(self, process_session, f, *args, **kwargs):

        self.future = process_session.add_process(f, *args, **kwargs)


    async def wait(self):

        await asyncio.wait([self.future])

        return self.future.result()


波斯汪
浏览 156回答 2
2回答

HUWWW

回答您的问题:是的,它确实阻止了事件循环。我发现这ThreadPoolExecutor在这种情况下效果很好。from util.process_session import ProcessSessionfrom concurrent.futures.thread import ThreadPoolExecutorimport asynciofrom aiohttp.web import middleware@middlewareasync def context_init(request, handler):    request.context = {}    request.context['threads'] = []    ret = await handler(request)    with ThreadPoolExecutor(1) as executor:           await asyncio.get_event_loop().run_in_executor(executor,            functools.partial(join_threads, data={             'threads': request.context['threads']           }))    return retdef join_threads(threads):    for t in threads:        t.join()
随时随地看视频慕课网APP

相关分类

Python
我要回答