我有一个 RabbitMQ 队列,我可以批量异步读取它,但我必须保留这些消息的顺序。我有一个名为的字段ServiceNumber,它定义了消息的唯一编号,我必须保持这个顺序。
例如
SN1 SN2 SN1 SN1 SN1 SN2
1 2 3 4 5 6
在这种情况下,我们可以同时处理消息 1 和 2(它们具有不同的 SN),然后我们可以处理 3 和 6,然后是 4,然后是 5。
我尝试通过ContinueWith以下方式通过链来实现它:
private readonly Dictionary<string, Task> _currentTasks = new Dictionary<string, Task>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);
private async Task WrapMessageInQueue(string serviceNumber, Func<Task> taskFunc)
{
Task taskToAwait;
await _semaphore.WaitAsync();
try
{
_currentTasks.TryGetValue(serviceNumber, out var task);
if (task == null)
task = Task.CompletedTask;
taskToAwait = _currentTasks[serviceNumber] = task.ContinueWith(_ => taskFunc());
}
finally
{
_semaphore.Release();
}
await taskToAwait.ConfigureAwait(false);
}
void Main()
{
Task.Run(async () => {
var task1 = Task.Run(() =>
{
return WrapMessageInQueue("10", async () =>
{
await Task.Delay(5000);
Console.WriteLine("first task finished");
});
});
while (task1.Status == TaskStatus.WaitingForActivation)
{
Console.WriteLine("waiting task to be picked by a scheduler. Status = {0}", task1.Status);
await Task.Delay(100);
}
var task2 = Task.Run(() =>
{
return WrapMessageInQueue("10", async () =>
{
Console.WriteLine("second task finished");
});
});
await Task.WhenAll(new[] {task1, task2});
}).Wait();
}
这里的主要思想是第一个 RUNNED 任务应该在所有其余任务开始之前完成。所以我实现了一个字典,我在其中存储一个任务,然后每个后续任务都被添加到ContinueWith链中。因此,它会在前一个执行后严格执行。当第三个任务到达时,它会在队列中占据一席之地,依此类推。
但由于某种原因它不起作用,输出是
第二个任务完成
第一个任务完成
这段代码有什么问题?有没有更好的办法?
陪伴而非守候
相关分类