asyncio.CancelledError 和“从未检索到_GatheringFuture 异常”

我正在观看import asyncio:学习 Python 的 AsyncIO #3 - 使用协程。讲师举了下面的例子:

import asyncio

import datetime


async def keep_printing(name):

    while True:

        print(name, end=" ")

        print(datetime.datetime.now())

        await asyncio.sleep(0.5)


async def main():

    group_task = asyncio.gather(

                     keep_printing("First"),

                     keep_printing("Second"),

                     keep_printing("Third")

                 )

    try:

        await asyncio.wait_for(group_task, 3)

    except asyncio.TimeoutError:

        print("Time's up!")



if __name__ == "__main__":

    asyncio.run(main())

输出有异常:


First 2020-08-11 14:53:12.079830

Second 2020-08-11 14:53:12.079830

Third 2020-08-11 14:53:12.080828 

First 2020-08-11 14:53:12.580865

Second 2020-08-11 14:53:12.580865

Third 2020-08-11 14:53:12.581901 

First 2020-08-11 14:53:13.081979

Second 2020-08-11 14:53:13.082408

Third 2020-08-11 14:53:13.082408 

First 2020-08-11 14:53:13.583497

Second 2020-08-11 14:53:13.583935

Third 2020-08-11 14:53:13.584946

First 2020-08-11 14:53:14.079666

Second 2020-08-11 14:53:14.081169

Third 2020-08-11 14:53:14.115689

First 2020-08-11 14:53:14.570694

Second 2020-08-11 14:53:14.571668

Third 2020-08-11 14:53:14.635769

First 2020-08-11 14:53:15.074124

Second 2020-08-11 14:53:15.074900

Time's up!

_GatheringFuture exception was never retrieved

future: <_GatheringFuture finished exception=CancelledError()>

concurrent.futures._base.CancelledError

然后讲师就继续讨论其他主题,再也没有回到这个例子来说明如何修复它。幸运的是,通过实验,我发现我们可以通过在async 函数try/except中添加另一个来修复它:

事实上,有了这个版本main,我们甚至不需要try...except asyncio.CancelledErrorin keep_printing。它仍然可以正常工作。


那是为什么?为什么 catching CancelledErrorin mainwork 而不是 in keep_printing?视频讲师处理此异常的方式只会让我更加困惑。keep_printing他根本不需要更改任何代码!


忽然笑
浏览 270回答 3
3回答

守着星空守着你

让我们看看发生了什么:此代码安排三个协程执行并返回Future对象group_task(内部类的实例_GatheringFuture)聚合结果。group_task = asyncio.gather(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;keep_printing("First"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;keep_printing("Second"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;keep_printing("Third")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;)此代码等待未来完成超时。如果发生超时,它会取消未来并引发asyncio.TimeoutError.&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; await asyncio.wait_for(group_task, 3)&nbsp; &nbsp; except asyncio.TimeoutError:&nbsp; &nbsp; &nbsp; &nbsp; print("Time's up!")发生超时。让我们看看 asyncio 库task.py。wait_for执行以下操作:timeout_handle = loop.call_later(timeout, _release_waiter, waiter)...await waiter...await _cancel_and_wait(fut, loop=loop)&nbsp; # _GatheringFuture.cancel() insideraise exceptions.TimeoutError()当我们这样做时_GatheringFuture.cancel(),如果实际上取消了任何子任务,CancelledError则传播class _GatheringFuture(futures.Future):&nbsp; &nbsp; ...&nbsp; &nbsp; def cancel(self):&nbsp; &nbsp; &nbsp; &nbsp; ...&nbsp; &nbsp; &nbsp; &nbsp; for child in self._children:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if child.cancel():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ret = True&nbsp; &nbsp; &nbsp; &nbsp; if ret:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # If any child tasks were actually cancelled, we should&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # propagate the cancellation request regardless of&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # *return_exceptions* argument.&nbsp; See issue 32684.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._cancel_requested = True&nbsp; &nbsp; &nbsp; &nbsp; return ret然后...if outer._cancel_requested:&nbsp; &nbsp; # If gather is being cancelled we must propagate the&nbsp; &nbsp; # cancellation regardless of *return_exceptions* argument.&nbsp; &nbsp; # See issue 32684.&nbsp; &nbsp; outer.set_exception(exceptions.CancelledError())else:&nbsp; &nbsp; outer.set_result(results)因此,从收集中提取结果或异常更为正确futureasync def main():&nbsp; &nbsp; group_task = asyncio.gather(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;keep_printing("First"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;keep_printing("Second"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;keep_printing("Third")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;)&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; await asyncio.wait_for(group_task, 3)&nbsp; &nbsp; except asyncio.TimeoutError:&nbsp; &nbsp; &nbsp; &nbsp; print("Time's up!")&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; result = await group_task&nbsp; &nbsp; except asyncio.CancelledError:&nbsp; &nbsp; &nbsp; &nbsp; print("Gather was cancelled")

烙印99

我认为你需要await放在asyncio.gather.&nbsp;因此,此调用取自您的代码:&nbsp;&nbsp;&nbsp;&nbsp;group_task&nbsp;=&nbsp;asyncio.gather( &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;keep_printing("First"), &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;keep_printing("Second"), &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;keep_printing("Third") &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;)需要改成:&nbsp;&nbsp;&nbsp;&nbsp;group_task&nbsp;=&nbsp;await&nbsp;asyncio.gather( &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;keep_printing("First"), &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;keep_printing("Second"), &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;keep_printing("Third") &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;)不知道为什么,我还在学习这些东西。

繁星点点滴滴

当aw因为超时被取消时,wait_for等待aw被取消。如果将 CancelledError 处理到协程中,则会出现超时错误。这在 3.7 版中发生了变化。例子import asyncioimport datetimeasync def keep_printing(name):&nbsp; &nbsp; print(datetime.datetime.now())&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; await asyncio.sleep(3600)&nbsp; &nbsp; except asyncio.exceptions.CancelledError:&nbsp; &nbsp; &nbsp; &nbsp; print("done")async def main():&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; await asyncio.wait_for(keep_printing("First"), timeout=3)&nbsp; &nbsp; except asyncio.exceptions.TimeoutError:&nbsp; &nbsp; &nbsp; &nbsp; print("timeouted")if __name__ == "__main__":&nbsp; &nbsp; asyncio.run(main())用于从 Task 或 Future 检索结果的 gather 方法,你有一个无限循环并且从不返回任何结果。如果 aws 序列中的任何 Task 或 Future 被取消(wait_for 发生了什么),它会被视为引发了 CancelledError——在这种情况下 gather() 调用不会被取消。这是为了防止取消一个已提交的任务/未来导致其他任务/未来被取消。对于保护聚集法,你可以将它覆盖到盾牌上。import asyncioimport datetimeasync def keep_printing(name):&nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; print(name, datetime.datetime.now())&nbsp; &nbsp; &nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await asyncio.sleep(0.5)&nbsp; &nbsp; &nbsp; &nbsp; except asyncio.exceptions.CancelledError:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print(f"canceled {name}")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Noneasync def main():&nbsp; &nbsp; group_task = asyncio.shield(asyncio.gather(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;keep_printing("First"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;keep_printing("Second"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;keep_printing("Third"))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; await asyncio.wait_for(group_task, 3)&nbsp; &nbsp; except asyncio.exceptions.TimeoutError:&nbsp; &nbsp; &nbsp; &nbsp; print("Done")if __name__ == "__main__":&nbsp; &nbsp; asyncio.run(main())
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python