“长”间隔后的完整可观察序列

以下可观察序列将每个元素添加到 ReplaySubject 中,以便我以后可以访问任何元素,甚至等待 ReplaySubject 的完成。它在达到一个时间跨度后完成收割主体。


ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();

characteristic.WhenNotificationReceived()

    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))

    .Subscribe(

        onNext: result =>

        {

            string nextTag = BitConverter.ToString(result.Data);

            nextTag = nextTag.Replace("-", "");


            rfidPlayer.OnNext(nextTag);

        },

        onCompleted: () =>

        {

            rfidPlayer.OnCompleted();

        });

我希望序列一直运行到自上次“OnNext”调用以来的给定时间,然后完成。这在各种蓝牙通信场景中非常有用,其中蓝牙设备将向我提供一系列数据,然后停止而没有任何类型的完成消息或事件。在这些情况下,我需要启发式确定序列何时完成,然后自己完成。因此,如果自上次蓝牙通知以来“太长”,我想完成ReplaySubject。


我可以通过创建一个计时器来做到这一点,在收到每个元素时重置它,然后在计时器达到“太长”时完成ReplaySubject,但是我听说创建一个对象并从可观察的订阅中操作它不是线程安全的。


关于如何在“太长”的时间间隔后完成序列的任何建议?


这是一个与我所听到的不是线程安全的版本,但应该按预期工作:


bool reading = true;

System.Timers.Timer timer = new System.Timers.Timer(1000);

timer.Elapsed += (sender, e) =>

{

    reading = false;

};


ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();

characteristic.WhenNotificationReceived()

    .TakeWhile(x => reading)

    .Subscribe(

        onNext: result =>

        {

            string nextTag = BitConverter.ToString(result.Data);

            nextTag = nextTag.Replace("-", "");


            timer.Stop();

            timer.Start();


            rfidPlayer.OnNext(nextTag);

        },

        onCompleted: () =>

        {

            rfidPlayer.OnCompleted();

        });

根据西蒙娜尔的第一个答案,这似乎是令人满意的:


                characteristic.WhenNotificationReceived()

                .Timeout(TimeSpan.FromSeconds(1))

                .Subscribe(

                    onNext: result =>

                    {

                        string nextTag = BitConverter.ToString(result.Data);

                        nextTag = nextTag.Replace("-", "");


                        rfidPlayer.OnNext(nextTag);

                    },

                    onError: error =>

                    {

                        rfidPlayer.OnCompleted();

                    });


摇曳的蔷薇
浏览 116回答 3
3回答

慕村9548890

您可以考虑使用超时运算符 。唯一的缺点是它以错误信号终止。您可能需要处理错误超时运算符允许您中止具有 onError 终止的可观察对象,如果该可观察对象在指定的时间跨度内未能发出任何项。如果您使用以下方法,则可以超越错误&nbsp;.Timeout(200, Promise.resolve(42));另一个变体允许您指示超时切换到您指定的备份 Observable,而不是在触发超时条件时以错误终止。characteristic.WhenNotificationReceived()&nbsp; &nbsp; .Timeout(TimeSpan.FromSeconds(1))&nbsp; &nbsp; .Subscribe(&nbsp; &nbsp; &nbsp; &nbsp; onNext: result =>&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ....&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; rfidPlayer.OnNext(....);&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; onError: error =>&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; rfidPlayer.OnCompleted();&nbsp; &nbsp; &nbsp; &nbsp; });

侃侃尔雅

由于例外情况,我发现使用起来很讨厌。Timeout我更喜欢将值注入到可用于终止序列的序列中。例如,如果我的序列产生非负数,那么如果我注入一个我知道结束序列。-1下面是一个示例:从这个可观察量开始,它生成从 1 开始的幂 2,并且它还延迟生成每个值的挂起值的毫秒数。Observable&nbsp; &nbsp; .Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x))所以1,2,4,8等,越来越慢。现在我想停止这个序列,如果秒没有值,那么我可以这样做:3.0&nbsp; &nbsp; .Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x))&nbsp; &nbsp; .Switch()&nbsp; &nbsp; .TakeWhile(x => x >= 0)如果我运行此序列,我将得到以下输出:1&nbsp;2&nbsp;4&nbsp;8&nbsp;16&nbsp;32&nbsp;64&nbsp;128&nbsp;256&nbsp;512&nbsp;1024&nbsp;2048&nbsp;该序列即将生成,但它首先等待毫秒以产生该值 - 与此同时,触发并输出一个从而停止序列。40964096Observable.Timer(TimeSpan.FromSeconds(3.0))-1此查询的关键部分是使用 。它通过仅订阅最新的外部可观察和取消订阅前一个外部可观察来获取并生成 a。SwitchIObservable<IObservable<T>>IObservable<T>因此,在我的查询中,序列生成的每个新值都会停止并重新启动 .Timer在你的例子中,你的可观察量看起来像这样:characteristic&nbsp; &nbsp; .WhenNotificationReceived()&nbsp; &nbsp; .Select(result => BitConverter.ToString(result.Data).Replace("-", ""))&nbsp; &nbsp; .Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x))&nbsp; &nbsp; .Switch()&nbsp; &nbsp; .TakeWhile(x => x != null)&nbsp; &nbsp; .Subscribe(rfidPlayer);

UYOU

下面是一个可以使用的自定义运算符,它是内置超时运算符顶部的薄层。TakeUntilTimeout/// <summary>/// Applies a timeout policy for each element in the observable sequence./// If the next element isn't received within the specified timeout duration/// starting from its predecessor, the sequence terminates./// </summary>public static IObservable<T> TakeUntilTimeout<T>(&nbsp; &nbsp; this IObservable<T> source,&nbsp; &nbsp; TimeSpan timeout){&nbsp; &nbsp; return source.Timeout(timeout, Observable.Empty<T>());}
打开App,查看更多内容
随时随地看视频慕课网APP