缓冲 observable 以稳定慢观察者的可变延迟

我有一个 observable 产生一系列数字,这些数字之间的延迟范围从 0 到 1 秒(随机):


var random = new Random();


var randomDelaysObservable = Observable.Create<int>(async observer =>

{

    var value = 0;


    while (true)

    {

        // delay from 0 to 1 second

        var randomDelay = TimeSpan.FromSeconds(random.NextDouble());

        await Task.Delay(randomDelay);


        observer.OnNext(value++);

    }


    return Disposable.Empty;

    // ReSharper disable once FunctionNeverReturns

});

我希望有一个消费者使用这些数字并将它们写出到控制台,但每 2 秒(恰好每两秒)只取一个数字。


现在,我有这个观察者的代码(虽然我知道使用 是不正确的await):


var delayedConsoleWritingObserver = Observer.Create<int>(async value =>

{

    // fixed delay of 2 seconds

    var fixedDelay = TimeSpan.FromSeconds(2);

    await Task.Delay(fixedDelay);


    Console.WriteLine($"[{DateTime.Now:O}] Received value: {value}.");

});



randomDelaysObservable.Subscribe(delayedConsoleWritingObserver);

如果生产者每 0 到 1 秒产生一个数字,而消费者只能每 2 秒消耗一个数字,很明显,生产者产生数字的速度比消费者消耗它们的速度快(背压)。我想要做的是能够提前“预加载”例如来自生产者的 10 或 20 个数字(如果消费者不能足够快地处理它们),以便消费者可以在没有随机延迟的情况下消费它们(但不是所有这些都因为可观察序列是无限的,如果它运行了一段时间,我们就会耗尽内存)。


如果我有一个较慢的消费者,这将在某种程度上稳定来自生产者的可变延迟。不过,我不认为一个可能的解决方案如何在ReactiveX运营商做到这一点,我看的文件Buffer,Sample,Debounce和Window,和他们没有像我期待的东西。


关于这如何可能的任何想法?请注意,即使我的观察者代码使用async/也不是真正正确await,但我想不出更好的方法来说明我想要实现的目标。


眼眸繁星
浏览 291回答 2
2回答

慕标5832272

以下是使用纯 Rx 时您的 observables 的样子:var producer = Observable.Generate(&nbsp; &nbsp; (r: new Random(), i: 0),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // initial state&nbsp; &nbsp; _ => true,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // condition&nbsp; &nbsp; t => (t.r, t.i + 1),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // iterator&nbsp; &nbsp; t => t.i,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// result selector&nbsp; &nbsp; t => TimeSpan.FromSeconds(t.r.NextDouble()) // timespan generator);var consumer = producer.Zip(&nbsp; &nbsp; Observable.Interval(TimeSpan.FromSeconds(2)),&nbsp; &nbsp; (_, i) => i);然而,“毫不拖延地抓住第一个 n”并不是一件容易的事情。所以我们可以创建一个非时间间隔的生产者:var rawProducer = Observable.Range(0, int.MaxValue);然后分别创建时间间隔:var timeGaps = Observable.Repeat(TimeSpan.Zero).Take(10) //or 20&nbsp; &nbsp; .Concat(Observable.Generate(new Random(), r => true, r => r, r => TimeSpan.FromSeconds(r.NextDouble())));然后结合这两个:var timeGappedProducer = rawProducer.Zip(timeGaps, (i, ts) => Observable.Return(i).Delay(ts))&nbsp; &nbsp; .Concat();消费者看起来基本相同:var lessPressureConsumer = timeGappedProducer .Zip(&nbsp; &nbsp; Observable.Interval(TimeSpan.FromSeconds(2)),&nbsp; &nbsp; (_, i) => i);鉴于所有这些,我真的不明白你为什么要这样做。这不是处理背压的好方法,而且这个问题听起来有点像XY 问题。您提到的运算符 ( Sample、Throttle等) 是处理背压的更好方法。

慕神8447489

您所描述的问题非常适合在生产者和消费者之间共享的简单有界缓冲区。生产者必须有一个与写入缓冲区相关的条件,说明缓冲区不能满。消费者必须有一个条件,说明缓冲区不能为空。请参阅以下使用 Ada 语言的示例。with Ada.Text_IO; use Ada.Text_IO;procedure Main is&nbsp; &nbsp;type Order_Nums is range 1..10_000;&nbsp; &nbsp;Type Index is mod 10;&nbsp; &nbsp;type Buf_T is array(Index) of Order_Nums;&nbsp; &nbsp;protected Orders is&nbsp; &nbsp; &nbsp; entry Prepare(Order : in Order_Nums);&nbsp; &nbsp; &nbsp; entry Sell(Order : out Order_Nums);&nbsp; &nbsp;private&nbsp; &nbsp; &nbsp; Buffer&nbsp; : Buf_T;&nbsp; &nbsp; &nbsp; P_Index : Index := Index'First;&nbsp; &nbsp; &nbsp; S_Index : Index := Index'First;&nbsp; &nbsp; &nbsp; Count&nbsp; &nbsp;: Natural := 0;&nbsp; &nbsp;end Orders;&nbsp; &nbsp;protected body Orders is&nbsp; &nbsp; &nbsp; entry Prepare(Order : in Order_Nums) when Count < Index'Modulus is&nbsp; &nbsp; &nbsp; begin&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Buffer(P_Index) := Order;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;P_Index := P_Index + 1;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Count := Count + 1;&nbsp; &nbsp; &nbsp; end Prepare;&nbsp; &nbsp; &nbsp; entry Sell(Order : out Order_Nums) when Count > 0 is&nbsp; &nbsp; &nbsp; begin&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Order := Buffer(S_Index);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;S_Index := S_Index + 1;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Count := Count - 1;&nbsp; &nbsp; &nbsp; end Sell;&nbsp; &nbsp;end Orders;&nbsp; &nbsp;task Chef is&nbsp; &nbsp; &nbsp; Entry Stop;&nbsp; &nbsp;end Chef;&nbsp; &nbsp;task Seller is&nbsp; &nbsp; &nbsp; Entry Stop;&nbsp; &nbsp;end Seller;&nbsp; &nbsp;task body Chef is&nbsp; &nbsp; &nbsp; The_Order : Order_Nums := Order_Nums'First;&nbsp; &nbsp;begin&nbsp; &nbsp; &nbsp; loop&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;select&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; accept Stop;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; exit;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;else&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delay 1.0; -- one second&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Orders.Prepare(The_Order);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Put_Line("Chef made order number " & The_Order'Image);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; The_Order := The_Order + 1;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; exit when The_Order = Order_Nums'Last;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;end select;&nbsp; &nbsp; &nbsp; end loop;&nbsp; &nbsp;end Chef;&nbsp; &nbsp;task body Seller is&nbsp; &nbsp; &nbsp; The_Order : Order_Nums;&nbsp; &nbsp;begin&nbsp; &nbsp; &nbsp; loop&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;select&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; accept Stop;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; exit;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;else&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delay 2.0; -- two seconds&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Orders.Sell(The_Order);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Put_Line("Sold order number " & The_Order'Image);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;end select;&nbsp; &nbsp; &nbsp; end loop;&nbsp; &nbsp;end Seller;begin&nbsp; &nbsp;delay 60.0; -- 60 seconds&nbsp; &nbsp;Chef.Stop;&nbsp; &nbsp;Seller.Stop;end Main;共享缓冲区名为 Orders。Orders 包含一个 10 Order_Nums 的循环缓冲区。包含订单的数组的索引被声明为mod 10包含 0 到 9 的值。Ada 模块化类型表现出环绕算术,因此递增超过 9 环绕为 0。Prepare 条目具有边界条件,要求Count < Index'Modulus其计算结果为 Count < 10在这种情况下。卖出条目有一个边界条件Count < 0。Chef 任务等待 1 秒来制作比萨饼,但一直等到缓冲区中有空间。只要缓冲区中有空间,Chef 就会生成一个订单。卖家等待 2 秒以消费订单。每个任务在其停止条目被调用时终止。Main 等待 60 秒,然后为每个任务调用停止条目。
打开App,查看更多内容
随时随地看视频慕课网APP