我有一个 observable 产生一系列数字,这些数字之间的延迟范围从 0 到 1 秒(随机):
var random = new Random();
var randomDelaysObservable = Observable.Create<int>(async observer =>
{
var value = 0;
while (true)
{
// delay from 0 to 1 second
var randomDelay = TimeSpan.FromSeconds(random.NextDouble());
await Task.Delay(randomDelay);
observer.OnNext(value++);
}
return Disposable.Empty;
// ReSharper disable once FunctionNeverReturns
});
我希望有一个消费者使用这些数字并将它们写出到控制台,但每 2 秒(恰好每两秒)只取一个数字。
现在,我有这个观察者的代码(虽然我知道使用 是不正确的await):
var delayedConsoleWritingObserver = Observer.Create<int>(async value =>
{
// fixed delay of 2 seconds
var fixedDelay = TimeSpan.FromSeconds(2);
await Task.Delay(fixedDelay);
Console.WriteLine($"[{DateTime.Now:O}] Received value: {value}.");
});
randomDelaysObservable.Subscribe(delayedConsoleWritingObserver);
如果生产者每 0 到 1 秒产生一个数字,而消费者只能每 2 秒消耗一个数字,很明显,生产者产生数字的速度比消费者消耗它们的速度快(背压)。我想要做的是能够提前“预加载”例如来自生产者的 10 或 20 个数字(如果消费者不能足够快地处理它们),以便消费者可以在没有随机延迟的情况下消费它们(但不是所有这些都因为可观察序列是无限的,如果它运行了一段时间,我们就会耗尽内存)。
如果我有一个较慢的消费者,这将在某种程度上稳定来自生产者的可变延迟。不过,我不认为一个可能的解决方案如何在ReactiveX运营商做到这一点,我看的文件Buffer,Sample,Debounce和Window,和他们没有像我期待的东西。
关于这如何可能的任何想法?请注意,即使我的观察者代码使用async/也不是真正正确await,但我想不出更好的方法来说明我想要实现的目标。
慕标5832272
慕神8447489
相关分类