使用 FastAPI 我试图检测StreamingResponse是否已完全被客户端使用或是否已被取消。
我有以下示例应用程序:
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def ainfinite_generator():
while True:
yield b"some fake data "
await asyncio.sleep(.001)
async def astreamer(generator):
try:
async for data in generator:
yield data
except Exception as e:
# this isn't triggered by a cancelled request
print(e)
finally:
# this always throws a StopAsyncIteration exception
# no matter whether the generator was consumed or not
leftover = await generator.__anext__()
if leftover:
print("we didn't finish")
else:
print("we finished")
@app.get("/")
async def infinite_stream():
return StreamingResponse(astreamer(ainfinite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
它似乎是第一个async for in generator“astreamer消耗”异步生成器的。在该循环之后,进一步尝试获得下一次迭代失败并出现异常StopAsyncIteration,即使生成器如上定义的那样是“无限的”。
我查看了PEP-525,我唯一看到的是,如果将异常抛入生成器,它将导致进一步尝试从生成器读取以抛出 StopAsyncIteration 异常,但我看不到它在哪里会发生。至少,我没有在 Starlette 的 StreamingResponse类中看到这一点(它似乎与“内容”无关)。执行后生成器不会“释放”吗async for in gen?
斯蒂芬大帝
牧羊人nacy
随时随地看视频慕课网APP
相关分类