C#:使用 Parallel.ForEach 和异步操作限制最大并发操作

我正在尝试使用 asp.net core 2.1 实现自托管 Web 服务,但遇到了实现后台长时间执行任务的问题。


由于每种ProcessSingle方法(在下面的代码片段中)的高 CPU 负载和时间消耗,我想限制同时执行的任务的数量。但是我可以看到所有任务Parallel.ForEach几乎立即开始,尽管我设置了MaxDegreeOfParallelism = 3


我的代码是(这是一个简化版本):


public static async Task<int> Work()

{

    var id = await CreateIdInDB() // async create record in DB


    // run background task, don't wait when it finishes

    Task.Factory.StartNew(async () => {

        Parallel.ForEach(

            listOfData,

            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },

            async x => await ProcessSingle(x));

    });


    // return created id immediately

    return id;

}


public static async Task ProcessSingle(MyInputData inputData)

{

    var dbData = await GetDataFromDb(); // get data from DB async using Dapper

    // some lasting processing (sync)

    await SaveDataToDb(); // async save processed data to DB using Dapper

}

如果我理解正确,问题出async x => await ProcessSingle(x)在 Parallel.ForEach 内部,不是吗?


有人可以描述一下,它应该如何以正确的方式实施?

更新

由于我的问题存在某种歧义,因此有必要关注主要方面:

  1. 方法分为三部分ProcessSingle

    • 从数据库异步获取数据

    • 进行长时间高 CPU 负载的数学计算

    • 将结果保存到数据库异步

  2. 问题包括两个独立的:

    • 如何降低 CPU 使用率(例如同时运行不超过三个数学计算)?

    • 如何保持ProcessSingle方法的结构 - 由于异步 DB 调用而使它们保持异步。

希望现在会更清楚。

PS 已经给出了合适的答案,它可以工作(特别感谢@MatrixTai)。编写此更新是为了进行一般说明。


慕码人8056858
浏览 1067回答 2
2回答

慕森卡

更新正如我刚刚注意到您在评论中提到的那样,问题是由数学计算引起的。将计算和更新数据库的部分分开会更好。对于计算部分,使用Parallel.ForEach()以便优化您的工作,您可以控制线程数。只有在所有这些任务完成之后。用于async-await将您的数据更新到数据库,无需SemaphoreSlim我提及。public static async Task<int> Work(){&nbsp; &nbsp; var id = await CreateIdInDB() // async create record in DB&nbsp; &nbsp; // run background task, don't wait when it finishes&nbsp; &nbsp; Task.Run(async () => {&nbsp; &nbsp; &nbsp; &nbsp; //Calculation Part&nbsp; &nbsp; &nbsp; &nbsp; ConcurrentBag<int> data = new ConcurrentBag<int>();&nbsp; &nbsp; &nbsp; &nbsp; Parallel.ForEach(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; listOfData,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; x => {ConcurrentBag.Add(calculationPart(x))});&nbsp; &nbsp; &nbsp; &nbsp; //Update DB part&nbsp; &nbsp; &nbsp; &nbsp; int[] data_arr = data.ToArray();&nbsp; &nbsp; &nbsp; &nbsp; List<Task> worker = new List<Task>();&nbsp; &nbsp; &nbsp; &nbsp; foreach (var i in data_arr)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; worker.Add(DBPart(x));&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; await Task.WhenAll(worker);&nbsp; &nbsp; });&nbsp; &nbsp; // return created id immediately&nbsp; &nbsp; return id;}当你async-await在Parallel.forEach.首先,阅读第一个和第二个答案的这个问题。将这两者结合起来毫无意义。实际上async-await会最大限度地利用可用线程,所以简单地使用它。public static async Task<int> Work(){&nbsp; &nbsp; var id = await CreateIdInDB() // async create record in DB&nbsp; &nbsp; // run background task, don't wait when it finishes&nbsp; &nbsp; Task.Run(async () => {&nbsp; &nbsp; &nbsp; &nbsp; List<Task> worker = new List<Task>();&nbsp; &nbsp; &nbsp; &nbsp; foreach (var i in listOfData)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; worker.Add(ProcessSingle(x));&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; await Task.WhenAll(worker);&nbsp; &nbsp; });&nbsp; &nbsp; // return created id immediately&nbsp; &nbsp; return id;}但是还有另一个问题,在这种情况下,这些任务仍然一起开始,消耗你的 CPU 使用率。因此,为避免这种情况,请使用 SemaphoreSlimpublic static async Task<int> Work(){&nbsp; &nbsp; var id = await CreateIdInDB() // async create record in DB&nbsp; &nbsp; // run background task, don't wait when it finishes&nbsp; &nbsp; Task.Run(async () => {&nbsp; &nbsp; &nbsp; &nbsp; List<Task> worker = new List<Task>();&nbsp; &nbsp; &nbsp; &nbsp; //To limit the number of Task started.&nbsp; &nbsp; &nbsp; &nbsp; var throttler = new SemaphoreSlim(initialCount: 20);&nbsp; &nbsp; &nbsp; &nbsp; foreach (var i in listOfData)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await throttler.WaitAsync();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; worker.Add(Task.Run(async () =>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; await ProcessSingle(x);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throttler.Release();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ));&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; await Task.WhenAll(worker);&nbsp; &nbsp; });&nbsp; &nbsp; // return created id immediately&nbsp; &nbsp; return id;}此外,不要Task.Factory.StartNew()在简单Task.Run()就足以完成您想要的工作时使用,请阅读 Stephen Cleary 撰写的这篇出色的文章。

牛魔王的故事

如果您更熟悉“传统”并行处理概念,请像这样重写您的 ProcessSingle() 方法:public static void ProcessSingle(MyInputData inputData){&nbsp; &nbsp; var dbData = GetDataFromDb(); // get data from DB async using Dapper&nbsp; &nbsp; // some lasting processing (sync)&nbsp; &nbsp; SaveDataToDb(); // async save processed data to DB using Dapper}当然,您最好也以类似的方式更改 Work() 方法。
打开App,查看更多内容
随时随地看视频慕课网APP