使用 asyncio 在 Flask 视图中并行化工作

我正在开发一个 Flask 应用程序,其中对客户端的响应取决于我从几个外部 API 获得的回复。对这些 API 的请求在逻辑上是相互独立的,因此可以通过并行发送这些请求来实现速度增益(在下面的示例中,响应时间将几乎减少一半)。

在我看来,实现这一点的最简单、最现代的方法是使用 asyncio 并在一个单独的异步函数中处理所有工作,该函数是使用 asyncio.run() 从 Flask 视图函数调用的。我在下面提供了一个简短的工作示例。

将 celery 或任何其他类型的队列与单独的工作进程一起使用在这里实际上没有意义,因为响应在发送回复之前必须等待 API 结果。据我所知,这是这个想法的一个变体,其中通过 asyncio 访问处理循环。当然有这方面的应用程序,但我认为如果我们真的只想在响应请求之前并行化 IO,那么这就不必要地复杂了。

然而,我知道在 Flask 中使用各种多线程可能存在一些陷阱。因此我的问题是:

  1. 在生产环境中使用时,下面的实现是否被认为是安全的?这与我们运行 Flask 的服务器类型有何关系?特别是,内置开发服务器或典型的多工作程序 Gunicorn 设置。

  2. 关于异步函数中 Flask 的应用程序和请求上下文是否需要考虑任何因素,或者我可以像在任何其他函数中一样使用它们吗?即我可以简单地导入 current_app 来访问我的应用程序配置或使用 g 和 session 对象吗?当写信给他们时,显然必须考虑可能的竞争条件,但是还有其他问题吗?在我的基本测试(不在示例中)中,一切似乎都工作正常。

  3. 还有其他解决方案可以对此进行改进吗?

这是我的示例应用程序。由于 ascynio 接口随着时间的推移发生了一些变化,因此可能值得注意的是,我在 Python 3.7 和 3.8 上对此进行了测试,并且我已尽力避免 asyncio 中已弃用的部分。

import asyncio

import random

import time

from flask import Flask


app = Flask(__name__)


async def contact_api_a():

    print(f'{time.perf_counter()}: Start request 1')

    # This sleep simulates querying and having to wait for an external API

    await asyncio.sleep(2)


    # Here is our simulated API reply

    result = random.random()


    print(f'{time.perf_counter()}: Finish request 1')


    return result



async def contact_api_b():

    print(f'{time.perf_counter()}: Start request 2')

    await asyncio.sleep(1)


    result = random.random()


    print(f'{time.perf_counter()}: Finish request 2')


    return result



async def contact_apis():

    # Create the two tasks

    task_a = asyncio.create_task(contact_api_a())

    task_b = asyncio.create_task(contact_api_b())


    # Wait for both API requests to finish

    result_a, result_b = await asyncio.gather(task_a, task_b)


    print(f'{time.perf_counter()}: Finish both requests')




aluckdog
浏览 269回答 1
1回答

慕丝7291255

这可以安全地在生产环境中运行,但 asyncio 无法与 Gunicorn 异步工作线程(例如 gevent 或 eventlet)有效配合。这是因为result_a, result_b = asyncio.run(contact_apis())将会阻止 gevent/eventlet 事件循环直到其完成,而使用 gevent/eventlet 生成等效项则不会。Flask 服务器不应该在生产中使用。Gunicorn 线程工作线程(或多个 Gunicorn 进程)会很好,因为 asyncio 会阻塞线程/进程。全局变量可以正常工作,因为它们与线程(线程工作线程)或绿色线程(gevent/eventlet)相关联,而不是与 asyncio 任务相关联。我想说Quart是一种改进。Quart 是使用 asyncio 重新实现的 Flask API。对于 Quart,上面的代码片段是:import asyncioimport randomimport timefrom quart import Quart    app = Quart(__name__)    async def contact_api_a():    print(f'{time.perf_counter()}: Start request 1')    # This sleep simulates querying and having to wait for an external API    await asyncio.sleep(2)    # Here is our simulated API reply    result = random.random()    print(f'{time.perf_counter()}: Finish request 1')    return result    async def contact_api_b():    print(f'{time.perf_counter()}: Start request 2')    await asyncio.sleep(1)    result = random.random()    print(f'{time.perf_counter()}: Finish request 2')    return result    async def contact_apis():    # Create the two tasks    task_a = asyncio.create_task(contact_api_a())    task_b = asyncio.create_task(contact_api_b())    # Wait for both API requests to finish    result_a, result_b = await asyncio.gather(task_a, task_b)    print(f'{time.perf_counter()}: Finish both requests')    return result_a, result_b    @app.route('/')async def hello_world():    start_time = time.perf_counter()    # All async processes are organized in a separate function    result_a, result_b = await contact_apis()    # We implement some final business logic before finishing the request    final_result = result_a + result_b    processing_time = time.perf_counter() - start_time    return f'Result: {final_result:.2f}; Processing time: {processing_time:.2f}'我还建议使用基于 asyncio 的请求库,例如httpx
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python