手动控制执行任务的顺序

我有一个 RabbitMQ 队列,我可以批量异步读取它,但我必须保留这些消息的顺序。我有一个名为的字段ServiceNumber,它定义了消息的唯一编号,我必须保持这个顺序。


例如


   SN1 SN2 SN1 SN1 SN1 SN2

   1   2   3   4   5   6 

在这种情况下,我们可以同时处理消息 1 和 2(它们具有不同的 SN),然后我们可以处理 3 和 6,然后是 4,然后是 5。


我尝试通过ContinueWith以下方式通过链来实现它:


private readonly Dictionary<string, Task> _currentTasks = new Dictionary<string, Task>();

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);


private async Task WrapMessageInQueue(string serviceNumber, Func<Task> taskFunc)

{

    Task taskToAwait;

    await _semaphore.WaitAsync();

    try

    {

        _currentTasks.TryGetValue(serviceNumber, out var task);

        if (task == null)

            task = Task.CompletedTask;


        taskToAwait = _currentTasks[serviceNumber] = task.ContinueWith(_ => taskFunc());

    }

    finally

    {

        _semaphore.Release();

    }


    await taskToAwait.ConfigureAwait(false);

}


void Main()

{

    Task.Run(async () => {

        var task1 = Task.Run(() =>

        {

            return WrapMessageInQueue("10", async () =>

            {

                await Task.Delay(5000);

                Console.WriteLine("first task finished");

            });

        });


        while (task1.Status == TaskStatus.WaitingForActivation) 

        {

            Console.WriteLine("waiting task to be picked by a scheduler. Status = {0}", task1.Status);

            await Task.Delay(100);

        }


        var task2 = Task.Run(() =>

        {

            return WrapMessageInQueue("10", async () =>

            {

                Console.WriteLine("second task finished");

            });

        });

        

        await Task.WhenAll(new[] {task1, task2});

    }).Wait();

}

这里的主要思想是第一个 RUNNED 任务应该在所有其余任务开始之前完成。所以我实现了一个字典,我在其中存储一个任务,然后每个后续任务都被添加到ContinueWith链中。因此,它会在前一个执行后严格执行。当第三个任务到达时,它会在队列中占据一席之地,依此类推。


但由于某种原因它不起作用,输出是


第二个任务完成


第一个任务完成


这段代码有什么问题?有没有更好的办法?


呼唤远方
浏览 187回答 3
3回答

陪伴而非守候

有趣的方法。但是由于消息的处理(由 SN)无论如何都必须是顺序的,为什么每个消息都有一个任务?它只会让事情变得更复杂,因为您需要控制任务的执行顺序。为什么没有收集器任务将传入的消息分类到队列中(按 SN)并为每个 SN 启动一个任务来处理队列?
打开App,查看更多内容
随时随地看视频慕课网APP