AsyncProducerConsumerQueue 的可观察包装器

AsyncProducerConsumerQueue<T>因此,我使用以下代码为 Stephen Cleary 创建了一个可观察的包装器。

我想知道这里是否有人知道我如何以更简单的方式做到这一点?

  • 是否可以在没有包装类的情况下编写它?

  • 是否可以防止将多个包装器应用于一个队列时出现错误?

  • 我可以让它在第一个订阅上连接,而不是通过直接调用吗Connect?如果是这样,这意味着什么?

  • 最后,你会怎么做呢?

using Nito.AsyncEx;

using System.Reactive;


static async Task ExampleUsage() {

    var queue = new AsyncProducerConsumerQueue<int>();

    var observable = queue.AsConnectableObservable();

    await queue.EnqueueAsync(1);

    observable.Subscribe(Console.WriteLine);

    observable.Connect();

    await queue.EnqueueAsync(2);

}


public static class AsyncExExtensions {

    public static IConnectableObservable<T> AsConnectableObservable<T>(this AsyncProducerConsumerQueue<T> queue) {

        return new ConnectableObservableForAsyncProducerConsumerQueue<T>(queue);

    }

}


class ConnectableObservableForAsyncProducerConsumerQueue<T> : IConnectableObservable<T> {


    readonly AsyncProducerConsumerQueue<T> Queue;


    long _isConnected = 0;

    ImmutableList<IObserver<T>> Observers = ImmutableList<IObserver<T>>.Empty;


    public ConnectableObservableForAsyncProducerConsumerQueue(AsyncProducerConsumerQueue<T> queue) {

        Queue = queue;

    }


    public IDisposable Connect() {

        if (Interlocked.Exchange(ref _isConnected, 1) == 1) throw new InvalidOperationException("Observable cannot be connected more than once.");

        var cts = new CancellationTokenSource();

        var token = cts.Token;

        Task.Run(async () => {

            try {

                while (true) {

                    token.ThrowIfCancellationRequested();

                    var @event = await Queue.DequeueAsync(token).ConfigureAwait(false);

                    foreach (var observer in Observers)

                        observer.OnNext(@event);

                }

            } catch (Exception x) when (x is OperationCanceledException || x is InvalidOperationException) {

                foreach (var observer in Observers)

                    observer.OnCompleted();

            }

        });


慕尼黑8549860
浏览 82回答 1
1回答

holdtom

考虑以下两个演示。当您有多个观察者时,这些行为会有所不同。在第一个演示中,观察者将竞争队列中的项目,在第二个演示中,他们每个人都会收到一个副本。演示 #1 - 冷可观察var queue = new AsyncProducerConsumerQueue<int>();// This is a cold observable, so each observer is fed by its own individual dequeue loop// and therefore will be 'competing' with other observers for queued items.var coldObservable = Observable&nbsp; &nbsp; // Create an observable that asynchronously waits for items to become available on the&nbsp; &nbsp; // queue and then emits them to the observer. This will be cancelled when the observer&nbsp; &nbsp; // is unsubscribed.&nbsp;&nbsp; &nbsp; .Create<int>(async (observer, cancellationToken) =>&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; while (true)&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var item = await queue.DequeueAsync(cancellationToken).ConfigureAwait(false);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Console.WriteLine($"Dequeued {item}");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; observer.OnNext(item);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; })&nbsp; &nbsp; // If an InvalidOperationException is thrown by the above, continue with&nbsp; &nbsp; // an empty observable instead of the error. This effectively catches an&nbsp; &nbsp; // `OnError(InvalidOperationException)` and turns it into an `OnCompleted()`.&nbsp; &nbsp; .Catch<int, InvalidOperationException>(exn =>&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; Console.WriteLine("Caught InvalidOperation");&nbsp; &nbsp; &nbsp; &nbsp; return Observable.Empty<int>();&nbsp; &nbsp; });Console.WriteLine("TEST COLD");await queue.EnqueueAsync(1);Console.WriteLine("Enqueued 1");Console.WriteLine("Subscribing A");coldObservable.Subscribe(&nbsp; &nbsp; item => Console.WriteLine($"A received {item}"),&nbsp; &nbsp; () => Console.WriteLine("A completed"));Console.WriteLine("Subscribing B");coldObservable.Subscribe(&nbsp; &nbsp; item => Console.WriteLine($"B received {item}"),&nbsp; &nbsp; () => Console.WriteLine("B completed"));await queue.EnqueueAsync(2);Console.WriteLine("Enqueued 2");await queue.EnqueueAsync(3);Console.WriteLine("Enqueued 3");queue.CompleteAdding();Console.WriteLine("Completed adding");Console.WriteLine("Waiting...");await Task.Delay(2000);Console.WriteLine("DONE");// TEST COLD// Enqueued 1// Subscribing A// Dequeued 1// A received 1// Subscribing B// Enqueued 2// Enqueued 3// Completed adding// Waiting...// Dequeued 2// Dequeued 3// A received 2// B received 3// Caught InvalidOperation// Caught InvalidOperation// A completed// B completed// DONE演示 #2 - 热可观察var queue = new AsyncProducerConsumerQueue<int>();var coldObservable = // defined same as above// This is a hot observable, so each observer receives the same items from the queue.var hotObservable = coldObservable&nbsp; &nbsp; // Publish the cold observable to create an `IConnectableObservable` that will subscribe&nbsp; &nbsp; // to the dequeue loop when connected and emit the same items to all observers.&nbsp; &nbsp; .Publish()&nbsp; &nbsp; // Automatically connect to the published observable when the first observer subscribes&nbsp; &nbsp; // and automatically disconnect when the last observer unsubscribes. This means that the&nbsp; &nbsp; // first observer will receive any items queued before it subscribes, but additional&nbsp; &nbsp; // observers will only receive items queued after they subscribed.&nbsp; &nbsp; .RefCount();Console.WriteLine("TEST HOT");await queue.EnqueueAsync(1);Console.WriteLine("Enqueued 1");Console.WriteLine("Subscribing A");hotObservable.Subscribe(&nbsp; &nbsp; item => Console.WriteLine($"A received {item}"),&nbsp; &nbsp; () => Console.WriteLine("A completed"));Console.WriteLine("Subscribing B");hotObservable.Subscribe(&nbsp; &nbsp; item => Console.WriteLine($"B received {item}"),&nbsp; &nbsp; () => Console.WriteLine("B completed"));await queue.EnqueueAsync(2);Console.WriteLine("Enqueued 2");await queue.EnqueueAsync(3);Console.WriteLine("Enqueued 3");queue.CompleteAdding();Console.WriteLine("Completed adding");Console.WriteLine("Waiting...");await Task.Delay(2000);Console.WriteLine("DONE");// TEST HOT// Enqueued 1// Subscribing A// Dequeued 1// A received 1// Subscribing B// Enqueued 2// Enqueued 3// Dequeued 2// Completed adding// Waiting...// A received 2// B received 2// Dequeued 3// A received 3// B received 3// Caught InvalidOperation// A completed// B completed// DONE回答你原来的问题:是否可以在没有包装类的情况下编写它?是的,请参阅上面的演示。是否可以防止将多个包装器应用于一个队列时出现错误?上面演示的方法不会阻止其他方将项目出队(或在队列上执行任何其他操作)。如果您想确保只公开给定队列的单个队列IObservable<T>,请考虑通过创建一个ObservableProducerConsumerQueue<T>在内部创建和管理自己的AsyncProducerConsumerQueue. 您可以公开一个EnqueueAsync仅委托给内部队列的方法,并使用上面演示的可观察量之一将可观察量公开为属性或实现接口IObservable<T>。我可以让它在第一个订阅上连接,而不是通过直接调用 Connect 吗?如果是这样,这意味着什么?演示 #2 显示了此行为并描述了其含义。如果您希望能够在连接之前订阅观察者,请跳过调用RefCount并像以前一样使用IConnectableObservable返回的值Publish。最后,你会怎么做呢?如上所述,我将封装队列并使用上面演示的方法之一公开IObservable或公开。IConnectableObservable
打开App,查看更多内容
随时随地看视频慕课网APP