有人告诉我下面的代码是不安全的,因为它不允许有一个从 nursery 内部产生的异步生成器,除非它是一个异步上下文管理器。
T = TypeVar('T')
async def delay(interval: float, source: AsyncIterable[T]) -> AsyncIterable[T]:
"""Delays each item in source by an interval.
Received items are temporarily stored in an unbounded queue, along with a timestamp, using
a background task. The foreground task takes items from the queue, and waits until the
item is older than the given interval and then yields it."""
send_channel, receive_channel = trio.open_memory_channel(math.inf)
async def pull_task():
async with aclosing(source) as agen:
async for item in agen:
send_channel.send_nowait((item, trio.current_time() + interval))
async with trio.open_nursery() as nursery:
nursery.start_soon(pull_task)
async with receive_channel:
async for item, timestamp in receive_channel:
now = trio.current_time()
if timestamp > now:
await trio.sleep(timestamp - now)
yield item
我很难理解这怎么可能会破裂。如果有人可以提供使用这个确切的生成器函数的示例代码,它证明了不安全性,我们将不胜感激和奖励。
上述代码的目标是在不施加任何背压的情况下延迟异步序列的处理。如果您能证明此代码无法像我预期的那样工作,那我们也将不胜感激。
慕姐8265434
相关分类