异步收集的顺序版本

我试图创建一个类似于asyncio.gather的方法,但它将按顺序而不是异步执行任务列表:


async def in_sequence(*tasks):

    """Executes tasks in sequence"""

    for task in tasks:

        await task

接下来,此方法应该按如下方式使用:


async def some_work(work_name):

    """Do some work"""

    print(f"Start {work_name}")

    await asyncio.sleep(1)

    if raise_exception:

        raise RuntimeError(f"{work_name} raise an exception")

    print(f"Finish {work_name}")


async def main():

    try:

        await asyncio.gather(

            some_work("work1"),         # work1, work2, in_sequence and work5 executed in concurrently

            some_work("work2"),

            in_sequence(

                some_work("work3"),     # work3 and work4 executed in sequence

                some_work("work4")

            ),

            some_work("work5"),



    except RuntimeError as error:

        print(error)                    # raise an exception at any point to terminate

一切都很好,直到我试图在some_work中抛出一个例外:


async def main():

    try:

        await asyncio.gather(

            some_work("work1"),

            some_work("work2"),

            in_sequence(

                some_work("work3", raise_exception=True),       # raise an exception here

                some_work("work4")

            ),

            some_work("work5"),



    except RuntimeError as error:

        print(error)

之后,我立即收到以下错误消息:


RuntimeWarning: coroutine 'some_work' was never awaited

我阅读了文档并继续实验:


async def in_sequence(*tasks):

    """Executes tasks in sequence"""

    _tasks = []

    for task in tasks:

        _tasks.append(asyncio.create_task(task))


    for _task in _tasks:

        await _task

这个版本按预期工作!

在这方面,我有下一个问题:

  1. 为什么第二个版本有效,而第一个版本不起作用?

  2. asyncio是否已经拥有按顺序执行任务列表的工具?

  3. 我是否选择了正确的实现方法,或者是否有更好的选择?


DIEA
浏览 92回答 4
4回答

慕桂英4014372

第一个版本不起作用,因为没有捕获可在 上引发的异常。第二个方法之所以有效,是因为创建了一个运行协程的类似未来的 Task 对象。该对象不会返回/传播包装协程的结果。当您使用该对象时,它将挂起,直到有结果或异常集,或者直到它被取消。in_sequenceawait taskcreate_taskawait似乎没有。第二个版本将同时执行传递的协程,因此它是不正确的实现。如果你真的想使用一些功能,你可以:in_sequence以某种方式延迟协程的创建。对函数中的顺序执行进行分组async例如:async def in_sequence(*fn_and_args):    for fn, args, kwargs in fn_and_args:        await fn(*args, **kwargs)  # create a coro and await it in placein_sequence(    (some_work, ("work3",), {'raise_exception': True}),    (some_work, ("work4",), {}),)async def in_sequence():    await some_work("work3", raise_exception=True)    await some_work("work4")

当年话下

这个版本按预期工作!第二个版本的问题在于,它实际上并没有按顺序运行协程,而是并行运行它们。这是因为调度协程与当前协程并行运行。因此,当您在循环中等待任务时,您实际上允许所有任务在等待第一个任务时运行。尽管外观如此,但整个循环的运行时间与最长的任务一样长。(有关更多详细信息,请参阅此处。asyncio.create_task()第一个版本显示的警告旨在防止您意外创建从未等待过的协程,例如,仅编写 而不是 。就 asyncio 而言,是实例化协程对象并将它们传递给“忘记”等待其中一些对象。asyncio.sleep(1)await asyncio.sleep(1)mainin_sequence禁止显示警告消息的一种方法是允许协程旋转,但立即取消它。例如:async def in_sequence(*coros):    remaining = iter(coros)    for coro in remaining:        try:            await coro        except Exception:            for c in remaining:                asyncio.create_task(c).cancel()            raise请注意,以下划线开头的变量名称标记未使用的变量,因此,如果您实际使用它们,则不应命名变量。

精慕HU

从 user4815162342 和安东·波米申科的解决方案中汲取灵感,我想出了它的以下变体:async def in_sequence(*storm):    twister = iter(storm)    for task in twister:        task = task() # if it's a regular function, it's done here.        if inspect.isawaitable(task):            try:                await task # if it's also awaitable, await it            except BaseException as e:                task.throw(e) # if an error occurs, throw it into the coroutine            finally:                task.close() # to ensure coroutine closer    assert not any(twister) # optionally verify that the iterator is now empty通过这种方式,您可以将常规函数与协程与此组合在一起。但一定要这样称呼它:in_sequenceawait in_sequence(*[b.despawn, b.release])请注意缺少 (),因为否则将立即调用常规函数,并且协程将引发 for 从未等待过。(是一个协程,不是我的例子)()__call__()RuntimeWarningb.despawnb.release您也可以在调用 之前进行额外的检查,但这取决于您。callable(task)task()

白板的微信

你说in_sequence的版本有效(asyncio.create_task),但我认为它没有。来自文档将 coro 协程包装到任务中并计划其执行。返回任务对象。它似乎并行运行协程,但您需要按顺序运行它们。所以实验并找到了两种方法来解决这个问题使用原始in_sequence函数并添加以下代码,以隐藏该错误:import warningswarnings.filterwarnings(    'ignore',    message=r'^coroutine .* was never awaited$',    category=RuntimeWarning)修复in_sequence函数,如下所示:async def in_sequence(*tasks):    for index, task in enumerate(tasks):        try:            await task        except Exception as e:            for task in tasks[index + 1:]:                task.close()            raise e其他问题的答案:当您在协程上没有链接时,C++代码会触发该警告。只是简单的代码可以告诉你这个想法(在终端):async def test():    return 1f = test()f = None # after that you will get that error我不知道见上文
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python