await SendAsync 不在 TPL 数据流 BatchBlock 上等待

示例程序具有以下 BatchBlock: new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });,其中有 60 个 int 数据项正在发送并在一个单独的任务上使用。


问题是 ,await sourceBlock.SendAsync(i);似乎并没有在等待,即使达到 BatchBlock 有界容量,数据仍在不断发送,而没有消耗任务首先取出任何项目。最终 BatchBlock 只接收 2 批 10 个 int 数据项。我希望await sourceBlock.SendAsync(i);在发送 20 个项目时暂停执行,因为块的有界容量设置为 10,最多 2 个组。然后在某个时候,消费任务将接收数据并且该过程将重复。


我附上了下面的代码,创建一个简单的控制台应用程序,将以下内容添加到主程序中:


BatchBlockIssueReplication().GetAwaiter().GetResult();


调用方法:


    public static async Task BatchBlockIssueReplication()

    {

        var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });


        // Reading data from the source block

        Task fireAndForget = Task.Run(async () =>

        {

            while (!sourceBlock.Completion.IsCanceled)

            {

                await Task.Delay(1500);

                if (await sourceBlock.OutputAvailableAsync() && sourceBlock.TryReceiveAll(out var results))

                {

                    Console.WriteLine("Received: ");

                    foreach (var result in results)

                    {

                        Console.Write($"{result.Length} ");

                    }

                    Console.WriteLine();

                }

            }

        });


        for (int i = 0; i < 60; i++)

        {

            Console.WriteLine($"Sending {i} to the source block");

            await sourceBlock.SendAsync(i);

        }

        Console.WriteLine("Finished sending data to the source block");


        await Task.Delay(10000);

    }


蝴蝶不菲
浏览 228回答 2
2回答

慕运维8079593

您尚未设置BoundedCapacity,它控制输入缓冲区中可以等待的项目数。超过这将使SendAsync等待。您设置MaxNumberOfGroups属性,这是该块在拒绝接收任何其他输入之前将生成的组数。从文档:获取或设置应该由块生成的最大组数。如果您希望您的块在输入缓冲区中保留例如 20 个块并等待,您应该设置 BoundedCapacity :var&nbsp;sourceBlock&nbsp;=&nbsp;new&nbsp;BatchBlock<int>(10,&nbsp;new&nbsp;GroupingDataflowBlockOptions&nbsp; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{&nbsp; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;BoundedCapacity&nbsp;=&nbsp;20&nbsp; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;});

拉莫斯之舞

一旦达到最大值就await sourceBlock.SendAsync(i);不会暂停,因为该块会主动拒绝更多项目。发生这种情况时,SendAsync返回false指示该块将不接受新消息。如果你写出SendAsync调用的结果,你可以看到块停止接收新消息的位置:Sending 0 to the source blockTrueSending 1 to the source blockTrueSending 2 to the source blockTrueSending 3 to the source blockTrueSending 4 to the source blockTrueSending 5 to the source blockTrueSending 6 to the source blockTrueSending 7 to the source blockTrueSending 8 to the source blockTrueSending 9 to the source blockTrueSending 10 to the source blockTrueSending 11 to the source blockTrueSending 12 to the source blockTrueSending 13 to the source blockTrueSending 14 to the source blockTrueSending 15 to the source blockTrueSending 16 to the source blockTrueSending 17 to the source blockTrueSending 18 to the source blockTrueSending 19 to the source blockTrueSending 20 to the source blockFalseSending 21 to the source blockFalseSending 22 to the source blockFalseSending 23 to the source blockFalseSending 24 to the source blockFalseSending 25 to the source blockFalseSending 26 to the source blockFalseSending 27 to the source blockFalseSending 28 to the source blockFalseSending 29 to the source blockFalseSending 30 to the source blockFalseSending 31 to the source blockFalseSending 32 to the source blockFalseSending 33 to the source blockFalseSending 34 to the source blockFalseSending 35 to the source blockFalseSending 36 to the source blockFalseSending 37 to the source blockFalseSending 38 to the source blockFalseSending 39 to the source blockFalseSending 40 to the source blockFalseSending 41 to the source blockFalseSending 42 to the source blockFalseSending 43 to the source blockFalseSending 44 to the source blockFalseSending 45 to the source blockFalseSending 46 to the source blockFalseSending 47 to the source blockFalseSending 48 to the source blockFalseSending 49 to the source blockFalseSending 50 to the source blockFalseSending 51 to the source blockFalseSending 52 to the source blockFalseSending 53 to the source blockFalseSending 54 to the source blockFalseSending 55 to the source blockFalseSending 56 to the source blockFalseSending 57 to the source blockFalseSending 58 to the source blockFalseSending 59 to the source blockFalseFinished sending data to the source blockReceived:&nbsp;10 10
打开App,查看更多内容
随时随地看视频慕课网APP