我正在尝试使用 Parallel.ForEach() 并行处理 BlockingCollection 中的一些项目。当处理一个项目时,它可以生成 0-2 个以上的项目来处理。要处理的项目数最终总是会达到 0。
我的问题是,由于消费者也是生产者(处理项目可以生成更多要处理的项目),当 BlockingCollection 为空时,我无法调用 BlockingCollection 的 CompleteAdding(),因为当前可能有其他线程正在处理将生成更多项目的项目项目。因此我不知道如何让 BlockingCollection/Parallel.ForEach 知道它可以退出。
这是情况的示例(为简单起见进行了修改)
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
var process = new BlockingCollection<int>() { 30 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item-1);
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");
}
else
{
// Some add 0 items
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
我尝试将 Parallel.ForEach 期间的 MaxDegreeOfParallelism 修改为要处理的项目数和 Environment.ProcessorCount 中的最小值,但这在 Parallel.ForEach 期间没有任何作用。
我还尝试过存储未处理项目的数量,并在每个线程上更新此数字时执行锁定。当未处理的项目为 0 时,我将调用 AddingCompleted 方法。这也不管用。
梦里花落0921
一只甜甜圈
随时随地看视频慕课网APP
相关分类