我正在使用aiohttp实现一个 Web API ,使用启用了 UVloop 的 gunicorn部署--worker-class aiohttp.GunicornUVLoopWebWorker。因此,我的代码总是在异步上下文中运行。我有在处理请求时实现并行作业以获得更好性能的想法。
我没有使用,asyncio因为我想要Parallelism,而不是Concurrency。
我知道python 中的多处理和GIL 问题。但加入流程也适用于我的问题。
下面是一个例子:
from aiohttp.web import middleware
@middleware
async def context_init(request, handler):
request.context = {}
request.context['threads'] = []
ret = await handler(request)
for thread in request.context['threads']:
thread.join()
return ret
考虑到thread.join()或process.join()阻塞当前线程,这将阻塞事件循环(据我所知)。如何异步加入?我想要的可以形象地表示为:await thread.join()或await process.join()。
更新:
感谢@user4815162342,我能够为我的项目编写正确的代码:
中间件:
from aiohttp.web import middleware
from util.process_session import ProcessSession
@middleware
async def context_init(request, handler):
request.context = {}
request.context['process_session'] = ProcessSession()
request.context['processes'] = {}
ret = await handler(request)
await request.context['process_session'].wait_for_all()
return ret
用途:
import asyncio
import concurrent.futures
from functools import partial
class ProcessSession():
def __init__(self):
self.loop = asyncio.get_running_loop()
self.pool = concurrent.futures.ProcessPoolExecutor()
self.futures = []
async def wait_for_all(self):
await asyncio.wait(self.futures)
def add_process(self, f, *args, **kwargs):
ret = self.loop.run_in_executor(self.pool, partial(f, *args, **kwargs))
self.futures.append(ret)
return ret
class ProcessBase():
def __init__(self, process_session, f, *args, **kwargs):
self.future = process_session.add_process(f, *args, **kwargs)
async def wait(self):
await asyncio.wait([self.future])
return self.future.result()
HUWWW
相关分类