线程安全发送异步/可等待 FIFO

所以我正在为.System.Net.WebSockets.ClientWebSocket


您如何看待我想出的以下方法,它是线程安全的吗?重要的是,一次只执行 1 个线程,等待线程将按 FIFO 顺序执行。_ws.SendAsync


另一种解决方案是使用BlocklingCollection和生产者 - 消费者模式。但是 BlockingCollection 没有异步等待功能,所以如果它是线程安全的,我更喜欢当前的解决方案。我也非常感谢其他建议。


private ClientWebSocket _ws;

private int _sendTicketCount = 0;

private int _sendTicketCurrent = 1;


public async Task SendAsync(string message)

{

    if (_ws.State != WebSocketState.Open)

        throw new Exception("Connection is not open");


    try

    {

        int mySendTicket = Interlocked.Increment(ref _sendTicketCount);


        if (mySendTicket != _sendTicketCurrent)

        {

            await Task.Run(() => SpinWait.SpinUntil(() => mySendTicket == _sendTicketCurrent)).ConfigureAwait(false);

        }


        var cancel = new CancellationTokenSource(5000);

        var bytes = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));

        await _ws.SendAsync(bytes, WebSocketMessageType.Text, true, cancel.Token).ConfigureAwait(false);

    }

    catch(Exception ex)

    {

        throw ex;

    }

    finally

    {

        Interlocked.Increment(ref _sendTicketCurrent);

    }

}


汪汪一只猫
浏览 127回答 2
2回答

叮当猫咪

您有一个有多个生产者和一个消费者的情况。您可以使用使用正确选项设置的操作块来执行此操作。

炎炎设计

_sendTicketCurrent需要是易失性的,否则允许JIT将值缓存在寄存器中,这可能导致无限循环。除此之外,它看起来是线程安全的,除了这是对CPU的巨大浪费(SpinWait完全使用内核)。另外,做真的没有意义。无论如何,您都要持有一个线程,因此您可以在当前线程中执行等待。await Task.Run(() => SpinWait.SpinUntil(() => mySendTicket == _sendTicketCurrent))为了以更优化的方式执行所需的操作,已经提供了一个开箱即用的异步兼容同步基元:SemaphoreSlim。private SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);public async Task SendAsync(string message){&nbsp; &nbsp; if (_ws.State != WebSocketState.Open)&nbsp; &nbsp; &nbsp; &nbsp; throw new Exception("Connection is not open");&nbsp; &nbsp; try&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; await _semaphore.WaitAsync().ConfigureAwait(false);&nbsp; &nbsp; &nbsp; &nbsp; var cancel = new CancellationTokenSource(5000);&nbsp; &nbsp; &nbsp; &nbsp; var bytes = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));&nbsp; &nbsp; &nbsp; &nbsp; await _ws.SendAsync(bytes, WebSocketMessageType.Text, true, cancel.Token).ConfigureAwait(false);&nbsp; &nbsp; }&nbsp; &nbsp; finally&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; _semaphore.Release();&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP