如何实现 Channels 的 BlockingCollection.TakeFromAny

我正在尝试实现一个异步方法,该方法采用ChannelReader<T>s 数组,并从任何具有可用项目的通道中获取值。它是一种与具有以下签名的方法具有相似功能的BlockingCollection<T>.TakeFromAny方法:


public static int TakeFromAny(BlockingCollection<T>[] collections, out T item,

    CancellationToken cancellationToken);

此方法返回collections从中删除项目的数组中的索引。async方法不能有参数out,所以我要实现的 API 是这样的:


public static Task<(T Item, int Index)> TakeFromAnyAsync<T>(

    ChannelReader<T>[] channelReaders,

    CancellationToken cancellationToken = default);

该方法应该异步读取一个项目,并返回消耗的项目以及数组TakeFromAnyAsync<T>中关联通道的索引。channelReaders如果所有通道都已完成(成功或出错),或者在 期间全部完成await,则该方法应异步抛出一个ChannelClosedException.


我的问题是:如何实施该TakeFromAnyAsync<T>方法?实现看起来很棘手。很明显,在任何情况下,该方法都不应从通道中消耗多个项目。此外,它不应遗留即发即弃的任务,或让一次性资源未被处置。该方法通常会在循环中调用,因此它也应该相当高效。它的复杂度应该不比 O(n) 差,其中n通道的数量。


要了解此方法的用处,您可以查看Go语言的select语句。从旅游:


该select语句让 goroutine 等待多个通信操作。


Aselect阻塞直到它的一个 case 可以运行,然后它执行那个 case。如果多个准备就绪,它会随机选择一个。


select {

case msg1 := <-c1:

    fmt.Println("received", msg1)

case msg2 := <-c2:

    fmt.Println("received", msg2)

}

在上面的示例中,要么从通道中获取一个值c1并分配给变量msg1,要么从通道中获取一个值c2并分配给变量msg2。Goselect语句不限于从通道读取。它可以包括多种异构情况,如写入有界通道、等待计时器等。复制 Goselect语句的全部功能超出了这个问题的范围。


不负相思意
浏览 113回答 2
2回答

慕森王

这是另一种方法。此实现在概念上与 alexm 的实现相同,直到没有频道立即可用的项目为止。然后它的不同之处在于避免了Task.WhenAny循环模式,而是为每个通道启动一个异步循环。所有循环都在竞相更新共享变量,该共享变量在临界区更新,以防止从多个通道消耗元素。ValueTuple<T, int, bool>&nbsp;consumed/// <summary>/// Takes an item asynchronously from any one of the specified channel readers./// </summary>public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(&nbsp; &nbsp; ChannelReader<T>[] channelReaders,&nbsp; &nbsp; CancellationToken cancellationToken = default){&nbsp; &nbsp; ArgumentNullException.ThrowIfNull(channelReaders);&nbsp; &nbsp; if (channelReaders.Length == 0) throw new ArgumentException(&nbsp; &nbsp; &nbsp; &nbsp; $"The {nameof(channelReaders)} argument is a zero-length array.");&nbsp; &nbsp; foreach (var cr in channelReaders) if (cr is null) throw new ArgumentException(&nbsp; &nbsp; &nbsp; &nbsp; $"The {nameof(channelReaders)} argument contains at least one null element.");&nbsp; &nbsp; cancellationToken.ThrowIfCancellationRequested();&nbsp; &nbsp; // Fast path (at least one channel has an item available immediately)&nbsp; &nbsp; for (int i = 0; i < channelReaders.Length; i++)&nbsp; &nbsp; &nbsp; &nbsp; if (channelReaders[i].TryRead(out var item))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return (item, i);&nbsp; &nbsp; // Slow path (all channels are currently empty)&nbsp; &nbsp; using var linkedCts = CancellationTokenSource&nbsp; &nbsp; &nbsp; &nbsp; .CreateLinkedTokenSource(cancellationToken);&nbsp; &nbsp; (T Item, int Index, bool HasValue) consumed = default;&nbsp; &nbsp; Task[] tasks = channelReaders.Select(async (channelReader, index) =>&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; while (true)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (!await channelReader.WaitToReadAsync(linkedCts.Token)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ConfigureAwait(false)) break;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Only the exceptional cases below are normal.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (OperationCanceledException)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; when (linkedCts.IsCancellationRequested) { break; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch when (channelReader.Completion.IsCompleted&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; && !channelReader.Completion.IsCompletedSuccessfully) { break; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // This channel has an item available now.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lock (linkedCts)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (consumed.HasValue)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return; // An item has already been consumed from another channel.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (!channelReader.TryRead(out var item))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; continue; // We lost the race to consume the available item.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumed = (item, index, true); // We consumed an item successfully.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; linkedCts.Cancel(); // Cancel the other tasks.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }).ToArray();&nbsp; &nbsp; // The tasks should never fail. If a task ever fails, we have a bug.&nbsp; &nbsp; try { foreach (var task in tasks) await task.ConfigureAwait(false); }&nbsp; &nbsp; catch (Exception ex) { Debug.Fail("Unexpected error", ex.ToString()); throw; }&nbsp; &nbsp; if (consumed.HasValue)&nbsp; &nbsp; &nbsp; &nbsp; return (consumed.Item, consumed.Index);&nbsp; &nbsp; cancellationToken.ThrowIfCancellationRequested();&nbsp; &nbsp; Debug.Assert(channelReaders.All(cr => cr.Completion.IsCompleted));&nbsp; &nbsp; throw new ChannelClosedException();}应该注意的是,这个解决方案,以及 alexm 的解决方案,都依赖于WaitToReadAsync在一个元素被消耗时取消所有挂起的操作。不幸的是,这会触发臭名昭著的内存泄漏问题,该问题会影响具有空闲生产者的 .NET 通道。当取消通道上的任何异步操作时,取消的操作将保留在内存中,附加到通道的内部结构,直到将元素写入通道。此行为已被Microsoft 分类为设计使然,但不排除改进它的可能性。有趣的是,这种歧义使得这种效果不符合记录条件.&nbsp;因此,了解这一点的唯一途径是偶然,要么从非官方渠道阅读,要么陷入其中。

郎朗坤

如果按照 Go 中使用通道的方式使用通道,问题就会容易得多:Channel(Readers) 作为输入,Channel(Readers) 作为输出。IEnumerable<ChannelReader<T>> sources=....;await foreach(var msg in sources.TakeFromAny(token)){....}要么var merged=sources.TakeFromAny(token);...var msg=await merged.ReadAsync(token);在这种情况下,来自所有通道阅读器的输入被复制到一个输出通道。该方法的返回值是该频道的ChannelReader。CopyToAsync 助手可以使用CopyToAsync函数将消息从输入源复制到输出通道:static async Task CopyToAsync<T>(&nbsp; &nbsp; &nbsp; &nbsp; this ChannelReader<T> input,&nbsp; &nbsp; &nbsp; &nbsp; ChannelWriter<T> output,&nbsp; &nbsp; &nbsp; &nbsp; CancellationToken token=default){&nbsp; &nbsp;while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))&nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//Early exit if cancellation is requested&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;while (!token.IsCancellationRequested &&&nbsp; input.TryRead(out T? msg))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;await output.WriteAsync(msg,token);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp;}}此代码类似于ReadAllAsync,但如果请求取消则立即退出。ReadAllAsync即使要求取消,也将退还所有可用物品。使用的方法包括WriteAsync如果通道关闭则不会抛出异常,这使得错误处理变得更加容易。错误处理和面向铁路的编程WaitToReadAsync如果源出错但该异常确实会抛出,并且该异常将传播到调用方法并传播到Task.WhenAll输出通道。这可能有点混乱,因为它会中断整个管道。为避免这种情况,可以将错误吞没或记录在内部CopyToAsync。一个更好的选择是使用面向铁路的编程并将所有消息包装在一个Result<TMsg,TError>类中,例如:static async Task CopyToAsync<Result<T,Exception>>(&nbsp; &nbsp; &nbsp; &nbsp; this ChannelReader<Result<T,Exception>> input,&nbsp; &nbsp; &nbsp; &nbsp; ChannelWriter<Result<T,Exception>> output,&nbsp; &nbsp; &nbsp; &nbsp; CancellationToken token=default){&nbsp; &nbsp;try&nbsp; &nbsp;{&nbsp; &nbsp; &nbsp;while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))&nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//Early exit if cancellation is requested&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;while (!token.IsCancellationRequested &&&nbsp; input.TryRead(out T? msg))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;var newMsg=Result.FromValue(msg);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;await output.WriteAsync(newMsg,token);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp;}&nbsp; }&nbsp; catch(Exception exc)&nbsp; {&nbsp; &nbsp; output.TryWrite(Result<T>.FromError(exc));&nbsp; }}TakeFromAsyncTakeFromAny(MergeAsync可能是更好的名字)可以是:static ChannelReader<T> TakeFromAny(&nbsp; &nbsp; &nbsp; &nbsp; this IEnumerable<ChannelReader<T> inputs,&nbsp; &nbsp; &nbsp; &nbsp; CancellationToken token=default){&nbsp; &nbsp; var outChannel=Channel.CreateBounded<T>(1);&nbsp; &nbsp; var readers=inputs.Select(rd=>CopyToAsync(rd,outChannel,token));&nbsp; &nbsp; _ = Task.WhenAll(readers)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ContinueWith(t=>outChannel.TryComplete(t.Exception));&nbsp; &nbsp; return outChannel;}使用 1 的有界容量可确保下游代码的背压行为不会改变。添加源索引这也可以调整为发出源的索引:static async Task CopyToAsync<T>(&nbsp; &nbsp; &nbsp; &nbsp; this ChannelReader<T> input,int index,&nbsp; &nbsp; &nbsp; &nbsp; ChannelWriter<(T,int)> output,&nbsp; &nbsp; &nbsp; &nbsp; CancellationToken token=default){&nbsp; while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))&nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; while (!token.IsCancellationRequested &&&nbsp; input.TryRead(out T? msg))&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await output.WriteAsync((msg,index),token);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; }}static ChannelReader<(T,int)> TakeFromAny(&nbsp; &nbsp; &nbsp; &nbsp; this IEnumerable<ChannelReader<T> inputs,&nbsp; &nbsp; &nbsp; &nbsp; CancellationToken token=default){&nbsp; &nbsp; var outChannel=Channel.CreateBounded<(int,T)>(1);&nbsp; &nbsp; var readers=inputs.Select((rd,idx)=>CopyToAsync(rd,idx,outChannel,token));&nbsp; &nbsp; _ = Task.WhenAll(readers)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ContinueWith(t=>outChannel.TryComplete(t.Exception));&nbsp; &nbsp; return outChannel;}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go