猿问

TPL DataFlow 仅在处理完所有数据后才传播完成

我有一个生产者向 a 发送数据BufferBlock,当从源读取所有数据时,它调用Complete().


当完成发生时,我得到一个异常Receive:InvalidOperationException: 'The source completed without providing data to receive.'


我目前正在使用:


var bufferBlock = new BufferBlock<string>();


var transformBlock = new TransformBlock<string, string>(s =>

{

    Thread.Sleep(50);


    return s;

});


bufferBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });


foreach (var i in Enumerable.Range(0, 10))

    bufferBlock.Post(i.ToString());


bufferBlock.Complete();


while (!transformBlock.Completion.IsCompleted)

    Console.WriteLine(transformBlock.Receive());

为了避免它,我目前正在使用:


while (bufferBlock.Count > 0)

    await Task.Delay(100);


bufferBlock.Complete();

这听起来不像是一个真正干净的解决方案。


是竞态条件吗?IE 块标记为未完成并且它们在我调用接收时完成?


我想我可以替换!transformBlock.Completion.IsCompleted为block.OutputAvailableAsync是吗?


万千封印
浏览 215回答 2
2回答

明月笑刀无情

是的,手动从块中检索消息的正确方法是使用该OutputAvailableAsync方法,并结合使用TryReceive:while (await transformBlock.OutputAvailableAsync()){&nbsp; &nbsp; while (transformBlock.TryReceive(out var item))&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; Console.WriteLine(item);&nbsp; &nbsp; }}await transformBlock.Completion; // Required to propagate exceptions性质BufferBlock.Count,TransformBlock.OutputCount等只适用于监控和统计。在大多数情况下,使用它们来控制数据流是可能的竞争条件和潜在错误的指示。
随时随地看视频慕课网APP
我要回答