限制异步任务

限制异步任务

我想运行一堆异步任务,并限制在任何给定时间可以完成的任务数量。

假设您有1000个网址,并且您只希望一次打开50个请求; 但只要一个请求完成,您就会打开与列表中下一个URL的连接。这样,一次只打开50个连接,直到URL列表用完为止。

如果可能的话,我也想利用给定数量的线程。

我提出了一种扩展方法,ThrottleTasksAsync可以实现我想要的功能。那里有更简单的解决方案吗?我认为这是一种常见的情况。

用法:

class Program{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }}

这是代码:

static class IEnumerableExtensions{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>


但是,线程池快速耗尽,你不能做asyncawait

额外: 为了解决调用时BlockingCollection抛出异常的问题,我正在使用带超时的重载。如果我没有使用超时,它会破坏使用的目的,因为不会阻止。有没有更好的办法?理想情况下,会有一种方法。Take()CompleteAdding()TryTakeTryTakeBlockingCollectionTryTakeTakeAsync



梵蒂冈之花
浏览 403回答 3
3回答

至尊宝的传说

根据要求,这是我最终使用的代码。工作在主 - 详细配置中设置,每个主服务器作为批处理进行处理。每个工作单元都以这种方式排队:var&nbsp;success&nbsp;=&nbsp;true;//&nbsp;Start&nbsp;processing&nbsp;all&nbsp;the&nbsp;master&nbsp;records.Master&nbsp;master;while&nbsp;(null&nbsp;!=&nbsp;(master&nbsp;=&nbsp;await&nbsp;StoredProcedures.ClaimRecordsAsync(...))){ &nbsp;&nbsp;&nbsp;&nbsp;await&nbsp;masterBuffer.SendAsync(master);}//&nbsp;Finished&nbsp;sending&nbsp;master&nbsp;recordsmasterBuffer.Complete();//&nbsp;Now,&nbsp;wait&nbsp;for&nbsp;all&nbsp;the&nbsp;batches&nbsp;to&nbsp;complete.await&nbsp;batchAction.Completion;return&nbsp;success;Masters一次缓冲一个,以节省其他外部进程的工作。每个主人的详细信息都通过以下方式发送给工作人员masterTransform&nbsp;TransformManyBlock。BatchedJoinBlock还创建了A&nbsp;以在一批中收集详细信息。实际工作是以detailTransform&nbsp;TransformBlock异步方式完成的,每次150个。BoundedCapacity设置为300以确保太多的Masters不会在链的开头进行缓冲,同时还留出足够的空间来排列足够的详细记录以允许一次处理150条记录。该块输出object到它的目标,因为它是整个取决于它是否是一个链接过滤Detail或Exception。所述batchAction&nbsp;ActionBlock收集来自所有批次的输出,并且执行散装数据库更新,错误日志等。对于每个批次。将有几个BatchedJoinBlocks,每个主人一个。由于每个ISourceBlock都是按顺序输出的,并且每个批次只接受与一个主数据相关联的详细记录的数量,因此将按顺序处理批次。每个块仅输出一个组,并在完成时取消链接。只有最后一个批处理块将其完成传播到最终ActionBlock。数据流网络://&nbsp;The&nbsp;dataflow&nbsp;networkBufferBlock<Master>&nbsp;masterBuffer&nbsp;=&nbsp;null;TransformManyBlock<Master,&nbsp;Detail>&nbsp;masterTransform&nbsp;=&nbsp;null;TransformBlock<Detail,&nbsp;object>&nbsp;detailTransform&nbsp;=&nbsp;null;ActionBlock<Tuple<IList<object>,&nbsp;IList<object>>>&nbsp;batchAction&nbsp;=&nbsp;null;//&nbsp;Buffer&nbsp;master&nbsp;records&nbsp;to&nbsp;enable&nbsp;efficient&nbsp;throttling.masterBuffer&nbsp;=&nbsp;new&nbsp;BufferBlock<Master>(new&nbsp;DataflowBlockOptions&nbsp;{&nbsp;BoundedCapacity&nbsp;=&nbsp;1&nbsp;});//&nbsp;Sequentially&nbsp;transform&nbsp;master&nbsp;records&nbsp;into&nbsp;a&nbsp;stream&nbsp;of&nbsp;detail&nbsp;records.masterTransform&nbsp;=&nbsp;new&nbsp;TransformManyBlock<Master,&nbsp;Detail>(async&nbsp;masterRecord&nbsp;=>{ &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;records&nbsp;=&nbsp;await&nbsp;StoredProcedures.GetObjectsAsync(masterRecord); &nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Filter&nbsp;the&nbsp;master&nbsp;records&nbsp;based&nbsp;on&nbsp;some&nbsp;criteria&nbsp;here &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;filteredRecords&nbsp;=&nbsp;records; &nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Only&nbsp;propagate&nbsp;completion&nbsp;to&nbsp;the&nbsp;last&nbsp;batch &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;propagateCompletion&nbsp;=&nbsp;masterBuffer.Completion.IsCompleted&nbsp;&&&nbsp;masterTransform.InputCount&nbsp;==&nbsp;0; &nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Create&nbsp;a&nbsp;batch&nbsp;join&nbsp;block&nbsp;to&nbsp;encapsulate&nbsp;the&nbsp;results&nbsp;of&nbsp;the&nbsp;master&nbsp;record. &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;batchjoinblock&nbsp;=&nbsp;new&nbsp;BatchedJoinBlock<object,&nbsp;object>(records.Count(),&nbsp;new&nbsp;GroupingDataflowBlockOptions&nbsp;{&nbsp;MaxNumberOfGroups&nbsp;=&nbsp;1&nbsp;}); &nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Add&nbsp;the&nbsp;batch&nbsp;block&nbsp;to&nbsp;the&nbsp;detail&nbsp;transform&nbsp;pipeline's&nbsp;link&nbsp;queue,&nbsp;and&nbsp;link&nbsp;the&nbsp;batch&nbsp;block&nbsp;to&nbsp;the&nbsp;the&nbsp;batch&nbsp;action&nbsp;block. &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;detailLink1&nbsp;=&nbsp;detailTransform.LinkTo(batchjoinblock.Target1,&nbsp;detailResult&nbsp;=>&nbsp;detailResult&nbsp;is&nbsp;Detail); &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;detailLink2&nbsp;=&nbsp;detailTransform.LinkTo(batchjoinblock.Target2,&nbsp;detailResult&nbsp;=>&nbsp;detailResult&nbsp;is&nbsp;Exception); &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;batchLink&nbsp;=&nbsp;batchjoinblock.LinkTo(batchAction,&nbsp;new&nbsp;DataflowLinkOptions&nbsp;{&nbsp;PropagateCompletion&nbsp;=&nbsp;propagateCompletion&nbsp;}); &nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Unlink&nbsp;batchjoinblock&nbsp;upon&nbsp;completion. &nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;(the&nbsp;returned&nbsp;task&nbsp;does&nbsp;not&nbsp;need&nbsp;to&nbsp;be&nbsp;awaited,&nbsp;despite&nbsp;the&nbsp;warning.) &nbsp;&nbsp;&nbsp;&nbsp;batchjoinblock.Completion.ContinueWith(task&nbsp;=> &nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;detailLink1.Dispose(); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;detailLink2.Dispose(); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;batchLink.Dispose(); &nbsp;&nbsp;&nbsp;&nbsp;}); &nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;filteredRecords;},&nbsp;new&nbsp;ExecutionDataflowBlockOptions&nbsp;{&nbsp;BoundedCapacity&nbsp;=&nbsp;1&nbsp;});//&nbsp;Process&nbsp;each&nbsp;detail&nbsp;record&nbsp;asynchronously,&nbsp;150&nbsp;at&nbsp;a&nbsp;time.detailTransform&nbsp;=&nbsp;new&nbsp;TransformBlock<Detail,&nbsp;object>(async&nbsp;detail&nbsp;=>&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;try &nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Perform&nbsp;the&nbsp;action&nbsp;for&nbsp;each&nbsp;detail&nbsp;here&nbsp;asynchronously &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;await&nbsp;DoSomethingAsync(); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;detail; &nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;catch&nbsp;(Exception&nbsp;e) &nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;success&nbsp;=&nbsp;false; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;e; &nbsp;&nbsp;&nbsp;&nbsp;}},&nbsp;new&nbsp;ExecutionDataflowBlockOptions&nbsp;{&nbsp;MaxDegreeOfParallelism&nbsp;=&nbsp;150,&nbsp;BoundedCapacity&nbsp;=&nbsp;300&nbsp;});//&nbsp;Perform&nbsp;the&nbsp;proper&nbsp;action&nbsp;for&nbsp;each&nbsp;batchbatchAction&nbsp;=&nbsp;new&nbsp;ActionBlock<Tuple<IList<object>,&nbsp;IList<object>>>(async&nbsp;batch&nbsp;=>{ &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;details&nbsp;=&nbsp;batch.Item1.Cast<Detail>(); &nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;errors&nbsp;=&nbsp;batch.Item2.Cast<Exception>(); &nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Do&nbsp;something&nbsp;with&nbsp;the&nbsp;batch&nbsp;here},&nbsp;new&nbsp;ExecutionDataflowBlockOptions&nbsp;{&nbsp;MaxDegreeOfParallelism&nbsp;=&nbsp;4&nbsp;});masterBuffer.LinkTo(masterTransform,&nbsp;new&nbsp;DataflowLinkOptions&nbsp;{&nbsp;PropagateCompletion&nbsp;=&nbsp;true&nbsp;});masterTransform.LinkTo(detailTransform,&nbsp;new&nbsp;DataflowLinkOptions&nbsp;{&nbsp;PropagateCompletion&nbsp;=&nbsp;true&nbsp;});
打开App,查看更多内容
随时随地看视频慕课网APP