我正在尝试实现一个异步方法,该方法采用ChannelReader<T>s 数组,并从任何具有可用项目的通道中获取值。它是一种与具有以下签名的方法具有相似功能的BlockingCollection<T>.TakeFromAny方法:
public static int TakeFromAny(BlockingCollection<T>[] collections, out T item,
CancellationToken cancellationToken);
此方法返回collections从中删除项目的数组中的索引。async方法不能有参数out,所以我要实现的 API 是这样的:
public static Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default);
该方法应该异步读取一个项目,并返回消耗的项目以及数组TakeFromAnyAsync<T>中关联通道的索引。channelReaders如果所有通道都已完成(成功或出错),或者在 期间全部完成await,则该方法应异步抛出一个ChannelClosedException.
我的问题是:如何实施该TakeFromAnyAsync<T>方法?实现看起来很棘手。很明显,在任何情况下,该方法都不应从通道中消耗多个项目。此外,它不应遗留即发即弃的任务,或让一次性资源未被处置。该方法通常会在循环中调用,因此它也应该相当高效。它的复杂度应该不比 O(n) 差,其中n通道的数量。
要了解此方法的用处,您可以查看Go语言的select语句。从旅游:
该select语句让 goroutine 等待多个通信操作。
Aselect阻塞直到它的一个 case 可以运行,然后它执行那个 case。如果多个准备就绪,它会随机选择一个。
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
在上面的示例中,要么从通道中获取一个值c1并分配给变量msg1,要么从通道中获取一个值c2并分配给变量msg2。Goselect语句不限于从通道读取。它可以包括多种异构情况,如写入有界通道、等待计时器等。复制 Goselect语句的全部功能超出了这个问题的范围。
慕森王
郎朗坤
相关分类