如何知道何时停止并行 foreach,其中消费者也是 C# 中的生产者

我正在尝试使用 Parallel.ForEach() 并行处理 BlockingCollection 中的一些项目。当处理一个项目时,它可以生成 0-2 个以上的项目来处理。要处理的项目数最终总是会达到 0。


我的问题是,由于消费者也是生产者(处理项目可以生成更多要处理的项目),当 BlockingCollection 为空时,我无法调用 BlockingCollection 的 CompleteAdding(),因为当前可能有其他线程正在处理将生成更多项目的项目项目。因此我不知道如何让 BlockingCollection/Parallel.ForEach 知道它可以退出。


这是情况的示例(为简单起见进行了修改)


using System;

using System.Collections.Concurrent;

using System.Threading.Tasks;


namespace Example

{

    class Example

    {

        static void Main(string[] args)

        {

            var process = new BlockingCollection<int>() { 30 };


            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };


            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>

            {

                if (item > 20)

                {

                    // Some add 2 items

                    process.Add(item - 1);

                    process.Add(item - 1);

                    Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");

                }

                else if (item > 10)

                {

                    // Some add 1 item

                    process.Add(item-1);

                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");

                }

                else

                {

                    // Some add 0 items

                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");

                }

            });


            // Parallel.ForEach never exits

            Console.WriteLine("Completed Processing");


            Console.ReadKey();

        }

    }

}

我尝试将 Parallel.ForEach 期间的 MaxDegreeOfParallelism 修改为要处理的项目数和 Environment.ProcessorCount 中的最小值,但这在 Parallel.ForEach 期间没有任何作用。


我还尝试过存储未处理项目的数量,并在每个线程上更新此数字时执行锁定。当未处理的项目为 0 时,我将调用 AddingCompleted 方法。这也不管用。


梦里花落0921
浏览 203回答 1
1回答

一只甜甜圈

你走在正确的轨道上:我还尝试过存储未处理项目的数量,并在每个线程上更新此数字时执行锁定。当未处理的项目为 0 时,我将调用 AddingCompleted 方法。问题是你实际上是在计算活跃工人的数量,而不是未处理项目的数量。也就是说,当您开始处理某事时,您只会增加计数器,因此队列中可能有许多其他项目未由该计数器表示。要执行后者,您需要做的是每次向队列中添加内容时递增一个计数器,然后每次完成处理队列中的内容时递减计数器。现在,如果您尝试过,您可能会遇到一个不同的问题:默认情况下,该Parallel.ForEach()方法会尝试从源中批量处理项目。这不适用于像BlockingCollection<T>在枚举期间可能阻塞的源,等待额外的数据。在您的示例中,这会导致死锁,在Parallel.ForEach()等待更多项目之前它将对最近的批次进行排队,同时BlockingCollection<T>等待更多项目被处理,从而导致更多项目排队。方法等待集合,ForEach()集合等待ForEach()方法,就会出现死锁。不过有一个解决方法:您可以提供ForEach()一个分区程序,该分区程序专门配置为不缓冲数据,而是在检索工作项时立即将其排队。将这两种策略放在一起,你会得到一个看起来像这样的代码版本(我为诊断目的添加了一些小的输出更改):static void Main(string[] args){&nbsp; &nbsp; const int firstValue = 30;&nbsp; &nbsp; const int secondValues = 20;&nbsp; &nbsp; const int thirdValues = 10;&nbsp; &nbsp; var process = new BlockingCollection<int>() { firstValue };&nbsp; &nbsp; var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };&nbsp; &nbsp; int totalItemCount = process.Count;&nbsp; &nbsp; OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);&nbsp; &nbsp; Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; string message;&nbsp; &nbsp; &nbsp; &nbsp; if (item > secondValues)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Some add 2 items&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Interlocked.Add(ref totalItemCount, 2);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; process.Add(item - 1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; process.Add(item - 1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; else if (item > thirdValues)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Some add 1 item&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Interlocked.Increment(ref totalItemCount);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; process.Add(item - 1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Some add 0 items&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; int newCount = Interlocked.Decrement(ref totalItemCount);&nbsp; &nbsp; &nbsp; &nbsp; if (newCount == 0)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; process.CompleteAdding();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");&nbsp; &nbsp; });&nbsp; &nbsp; // Parallel.ForEach will exit&nbsp; &nbsp; Console.WriteLine("Completed Processing");&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; Console.ReadKey();}
打开App,查看更多内容
随时随地看视频慕课网APP