猿问

C# 使用 Action 同步 HashSet

我有一个问题,我有一个简单的订阅者和简单的发布者,它们看起来像:


public async Task SendRequest()

{

    var topic = "SomeTopic";

    var requestHash = Helpers.ReturnUniqueKey(DateTime.Now, topic);

    requestKeys.Add(requestHash);


    Console.WriteLine($"Key count {requestKeys.Count}");

    var responseHandler = new Action<ResponseMessage>(response =>

    {

        Console.WriteLine($"Key count {requestKeys.Count}");

        foreach (var key in requestKeys)

        {

            Console.WriteLine($"Response { BitConverter.ToString(response.IdentyficationHash) } - Key { BitConverter.ToString(key) }");

            if (!key.SequenceEqual(response.IdentyficationHash)) return;

            requestKeys.Remove(key);

        }

    });

    bus.Subscribe(BusController.ManualRequest, responseHandler, configuration => configuration.WithTopic(BusController.ManualRequest));


    bus.Publish(someRequest, topic);


    async Task WaitForItToWorkAsync()

    {

        var retry = 0;

        var complete = false;

        while (!complete)

        {

            if (retry >= 20) return ; // Ill ass some msg leater

            complete = !requestKeys.Contains(requestHash);

            retry += 1;

            await Task.Delay(1000);

         }

         return // Ill ass some msg leater

      }  

      await WaitForItToWorkAsync()

}

主要想法是我通过一些请求向某些服务发送消息并等待到达(我知道我可以使用 rpc,但可以有任何服务并且 rpc 不支持主题),这条路径有效,问题是 requestKeys HashSet 它类中的一个字段


private readonly HashSet<byte[]> requestKeys;

正如您在每个方法调用中看到的那样,我将 Key 添加到该字段,如果我发出第一个请求,它可以正常工作,但其他请求不会更新此密钥集合,我的意思是在 Action 之外它会更新,但在它之外是一个问题。我能做什么来解决这个问题?


胡子哥哥
浏览 94回答 1
1回答

Smart猫小萌

如果您想SendRequest()在收到响应之前阻止完成,您可以使用 aSemaphoreSlim而不是在 a 中添加和删除键HashSet,例如:public async Task SendRequest(){&nbsp; &nbsp; var topic = "SomeTopic";&nbsp; &nbsp; SemaphoreSlim semaphoreSlim = new SemaphoreSlim(0, 1);&nbsp; &nbsp; var responseHandler = new Action<ResponseMessage>(response =>&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; //signal that the response has arrived&nbsp; &nbsp; &nbsp; &nbsp; semaphoreSlim.Release();&nbsp; &nbsp; });&nbsp; &nbsp; bus.Subscribe(BusController.ManualRequest, responseHandler, configuration => configuration.WithTopic(BusController.ManualRequest));&nbsp; &nbsp; bus.Publish(someRequest, topic);&nbsp; &nbsp; //wait for the response to arrive&nbsp; &nbsp; await semaphoreSlim.WaitAsync();&nbsp; &nbsp; semaphoreSlim.Dispose();}
随时随地看视频慕课网APP
我要回答