AsyncProducerConsumerQueue<T>
因此,我使用以下代码为 Stephen Cleary 创建了一个可观察的包装器。
我想知道这里是否有人知道我如何以更简单的方式做到这一点?
是否可以在没有包装类的情况下编写它?
是否可以防止将多个包装器应用于一个队列时出现错误?
我可以让它在第一个订阅上连接,而不是通过直接调用吗Connect
?如果是这样,这意味着什么?
最后,你会怎么做呢?
using Nito.AsyncEx;
using System.Reactive;
static async Task ExampleUsage() {
var queue = new AsyncProducerConsumerQueue<int>();
var observable = queue.AsConnectableObservable();
await queue.EnqueueAsync(1);
observable.Subscribe(Console.WriteLine);
observable.Connect();
await queue.EnqueueAsync(2);
}
public static class AsyncExExtensions {
public static IConnectableObservable<T> AsConnectableObservable<T>(this AsyncProducerConsumerQueue<T> queue) {
return new ConnectableObservableForAsyncProducerConsumerQueue<T>(queue);
}
}
class ConnectableObservableForAsyncProducerConsumerQueue<T> : IConnectableObservable<T> {
readonly AsyncProducerConsumerQueue<T> Queue;
long _isConnected = 0;
ImmutableList<IObserver<T>> Observers = ImmutableList<IObserver<T>>.Empty;
public ConnectableObservableForAsyncProducerConsumerQueue(AsyncProducerConsumerQueue<T> queue) {
Queue = queue;
}
public IDisposable Connect() {
if (Interlocked.Exchange(ref _isConnected, 1) == 1) throw new InvalidOperationException("Observable cannot be connected more than once.");
var cts = new CancellationTokenSource();
var token = cts.Token;
Task.Run(async () => {
try {
while (true) {
token.ThrowIfCancellationRequested();
var @event = await Queue.DequeueAsync(token).ConfigureAwait(false);
foreach (var observer in Observers)
observer.OnNext(@event);
}
} catch (Exception x) when (x is OperationCanceledException || x is InvalidOperationException) {
foreach (var observer in Observers)
observer.OnCompleted();
}
});
holdtom
相关分类