我正在使用消息队列中的消息并使用Task.Run(). 但我想将消费速度限制在某个最大线程数,并且在线程数低于该数量之前不从消息队列中消费。
假设我想要最多 100 个线程。在这种情况下,当达到 100 个线程时,它应该停止从消息队列中消费。当消息处理任务完成并且线程数下降到 99 时,它应该从队列中再消费一条消息。
我尝试用于TransformBlock此目的,这里是用于演示目的的示例代码:
public partial class MainWindow : Window
{
object syncObj = new object();
int i = 0;
public MainWindow()
{
InitializeComponent();
}
private async Task<bool> ProcessMessage(string message)
{
await Task.Delay(5000);
lock (syncObj)
{
i++;
System.Diagnostics.Debug.WriteLine(i);
}
return true;
}
private async void Button_Click(object sender, RoutedEventArgs e)
{
var processor = new TransformBlock<string, bool>(
(str) => ProcessMessage(str),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 }
);
for(int i = 0; i < 1000; i++)
{
await processor.SendAsync("a");
}
}
}
限制并行任务的数量按预期工作,但所有消息都会立即发送到 TransformBlock,因此SendAsync循环在任务处理之前结束。
我希望它继续接受消息,只要线程数低于最大值。允许并行度,但在达到 100 时等待。
有没有办法使用 TransformBlock 来做到这一点,或者我应该诉诸其他方法?
偶然的你
相关分类