猿问

如何使用 TPL Dataflow 处理完整的输入列表?

我是 TPL Dataflow 的新手,它可以工作,但我不确定我是否正确使用它。我有一个输入(字符串)列表,我想以最大并行度处理它们(全部),并知道何时全部完成。现在我只是foreach通过输入并调用Post,ActionBlock忽略返回值。这似乎不正确,因为它可能会丢失输入。


我的问题是:如何避免丢失物品?是否有一个内置块,我可以只向其中提供我的输入,并且它将确保它们都被尝试?(无论每次输入成功/失败。)


我看到的建议基本上是:


await block.Completion;

这是否说明了失败的输入(其中Post或SendAsync将返回 false)?对我来说奇怪的是,这个决定似乎是在我打电话时而Post不是之后做出的,所以这Completion甚至不包括这些项目。


我觉得我基本上需要一个重试循环来处理上次无法处理的输入,类似于:


while (items.Count > 0) {

  foreach (var item in items) {

    if (await block.SendAsync(item)) {

      items.Remove(item);

    }

  }


  await block.Completion;

}


block.Complete();

(除非有更好的循环处理/错误检查。)


这个额外的级别是不必要的吗?或者我在概念上哪里错了?


慕容708150
浏览 139回答 2
2回答

慕标琳琳

这似乎不正确,因为它可能会丢失输入。假设您使用默认值,这是正确的。Post仅false当块拒绝输入时才返回。如果模块已接收到Complete信号,或者模块的输入缓冲区已满,则可能会发生这种情况。默认情况下,每个块的输入缓冲区可以无限增长,因此ActionBlock具有默认输入缓冲区大小的块只会false在调用Post后返回。Complete最常见的用例ActionBlock是具有无限的有限容量,并且代码仅Complete在添加所有项目后调用。在这种情况下,Post永远不会返回false,您可以安全地忽略返回值。

翻过高山走不出你

Post如果块完成,或者块的输入缓冲区已满,该方法将返回 false。由于该设置并不是什么奇特的东西,并且很可能在项目的后期阶段需要解决高 RAM 使用率的新问题,因此我认为使用该方法并简单地忽略结果BoundedCapacity并不是一个安全的选择。Post为了避免涉及丢失消息(可能是订单或发票)的无趣错误,您可以执行以下操作:foreach (var item in items){    var accepted = block.Post(item);    if (!accepted) throw new InvalidOperationException("Item was not accepted");}这样,您至少会收到有关某些内容损坏的通知,并且不会让错误行为蔓延。另一方面,等待SendAsync并忽略结果要安全得多。SendAsync通常会在发生异常或发生取消的情况下返回,在这种情况下,您会在块发生时false收到通知。所以在这种情况下不需要抛出异常。awaitCompletionforeach (var item in items){    await block.SendAsync(item).ConfigureAwait(false);}出于性能原因,您可以同时使用Post和SendAsync。仅当您有数千万个项目需要处理时,这才会产生影响。foreach (var item in items){    if (!block.Post(item))    {        await block.SendAsync(item).ConfigureAwait(false);    }}
随时随地看视频慕课网APP
我要回答