猿问

使用并行性时限制消息队列消耗

我正在使用消息队列中的消息并使用Task.Run(). 但我想将消费速度限制在某个最大线程数,并且在线程数低于该数量之前不从消息队列中消费。


假设我想要最多 100 个线程。在这种情况下,当达到 100 个线程时,它应该停止从消息队列中消费。当消息处理任务完成并且线程数下降到 99 时,它应该从队列中再消费一条消息。


我尝试用于TransformBlock此目的,这里是用于演示目的的示例代码:


public partial class MainWindow : Window

    {

        object syncObj = new object();

        int i = 0;

        public MainWindow()

        {

            InitializeComponent();

        }



        private async Task<bool> ProcessMessage(string message)

        {

            await Task.Delay(5000);


            lock (syncObj)

            {

                i++;

                System.Diagnostics.Debug.WriteLine(i);

            }

            return true;

        }


        private async void Button_Click(object sender, RoutedEventArgs e)

        {

            var processor = new TransformBlock<string, bool>(

                    (str) => ProcessMessage(str),

                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 }

                    );


            for(int i = 0; i < 1000; i++)

            {

                await processor.SendAsync("a");

            }



    }

}

限制并行任务的数量按预期工作,但所有消息都会立即发送到 TransformBlock,因此SendAsync循环在任务处理之前结束。


我希望它继续接受消息,只要线程数低于最大值。允许并行度,但在达到 100 时等待。


有没有办法使用 TransformBlock 来做到这一点,或者我应该诉诸其他方法?


慕田峪9158850
浏览 74回答 1
1回答

偶然的你

数据流块具有输入缓冲区。该输入缓冲区充当队列。如果您想将消息保留在自己的队列中,您可以通过限制数据流块愿意接收的项目数量来完成您想要的事情:var processor = new TransformBlock<string, bool>(&nbsp; &nbsp; (str) => ProcessMessage(str),&nbsp; &nbsp; new ExecutionDataflowBlockOptions&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; BoundedCapacity = 100,&nbsp; &nbsp; &nbsp; MaxDegreeOfParallelism = 100,&nbsp; &nbsp; });请注意,这BoundedCapacity包括块正在处理的项目。由于BoundedCapacity == MaxDegreeOfParallelism,这实际上会关闭数据流块的队列。因此 SendAsync 循环在任务处理之前结束。当有(最多)100 个任务需要处理时,它仍然会结束。如果您想等到所有项目完成处理,请调用Complete()和await Completed。
随时随地看视频慕课网APP
我要回答