在异步生成器函数中从托儿所内部产生 yield 不好吗?

有人告诉我下面的代码是不安全的,因为它不允许有一个从 nursery 内部产生的异步生成器,除非它是一个异步上下文管理器。


T = TypeVar('T')


async def delay(interval: float, source: AsyncIterable[T]) -> AsyncIterable[T]:

    """Delays each item in source by an interval.


    Received items are temporarily stored in an unbounded queue, along with a timestamp, using

    a background task. The foreground task takes items from the queue, and waits until the

    item is older than the given interval and then yields it."""


    send_channel, receive_channel = trio.open_memory_channel(math.inf)


    async def pull_task():

        async with aclosing(source) as agen:

            async for item in agen:

                send_channel.send_nowait((item, trio.current_time() + interval))


    async with trio.open_nursery() as nursery:

        nursery.start_soon(pull_task)

        async with receive_channel:

            async for item, timestamp in receive_channel:

                now = trio.current_time()

                if timestamp > now:

                    await trio.sleep(timestamp - now)

                yield item

我很难理解这怎么可能会破裂。如果有人可以提供使用这个确切的生成器函数的示例代码,它证明了不安全性,我们将不胜感激和奖励。


上述代码的目标是在不施加任何背压的情况下延迟异步序列的处理。如果您能证明此代码无法像我预期的那样工作,那我们也将不胜感激。


呼如林
浏览 111回答 1
1回答

慕姐8265434

不幸的是,这是正确的——yield在 nursery 或 cancel 范围内不受支持,除非在用于@contextlib.asynccontextmanager创建异步上下文管理器或编写异步 pytest fixture 的狭窄情况下。有几个原因。其中一些是技术性的:Trio 必须跟踪哪些 nurseries/cancel 作用域当前在堆栈中处于“活动”状态,当你离开yield一个时,它会破坏嵌套,Trio 无法知道你已经完成这。(库无法检测出yield上下文管理器。)但还有一个根本的、无法解决的原因,那就是Trio和结构化并发的整个思想是,每个任务都“属于”一个父任务,如果子任务崩溃,父任务可以收到通知。但是当你yield在一个生成器中时,生成器框架会被冻结并与当前任务分离——它可能会在另一个任务中恢复,或者根本不会恢复。所以当你yield,这打破了托儿所中所有子任务和他们父母之间的联系。只是没有办法将其与结构化并发的原则相协调。如果我运行以下async def arange(*args):    for val in range(*args):        yield valasync def break_it():    async with aclosing(delay(0, arange(3))) as aiter:        with trio.move_on_after(1):            async for value in aiter:                await trio.sleep(0.4)                print(value)trio.run(break_it)然后我得到RuntimeError: Cancel scope stack corrupted: attempted to exit<trio.CancelScope at 0x7f364621c280, active, cancelled> in <Task'__main__.break_it' at 0x7f36462152b0> that's still within its child<trio.CancelScope at 0x7f364621c400, active>This is probably a bug in your code, that has caused Trio's internalstate to become corrupted. We'll do our best to recover, but from nowon there are no guarantees.Typically this is caused by one of the following:  - yielding within a generator or async generator that's opened a cancel    scope or nursery (unless the generator is a @contextmanager or    @asynccontextmanager); see https://github.com/python-trio/trio/issues/638 [...]通过更改超时和延迟,使超时在生成器内部而不是在生成器外部过期,我还能够得到一个不同的错误:trio.MultiError: Cancelled(), GeneratorExit() raised out of aclosing()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python