猿问

合并两个可观察量时保留排序

我想合并 2 个可观察值并保持顺序(可能基于选择器)。我还想对可观察的来源施加反压力。


因此,选择器会选择其中一个项目通过可观察对象进行推送,而另一个项目也会等待另一个项目进行比较。


Src1、Src2 和 Result 都是 类型IObservable<T>。


Src1: { 1,3,6,8,9,10 }

Src2: { 2,4,5,7,11,12 }

Result: 1,2,3,4,5,6,7,8,9,10,11,12


Timeline:

Src1:    -1---3----6------8----9-10

Src2:    --2-----4---5-7----11---------12

Result:  --1--2--3-4-5-6--7-8--9-10-11-12

  1. 在上面的示例中,src1 发出“1”并被阻塞,直到 src2 发出它的第一项“2”。

  2. 应用一个选择器来选择最小的项目,该选择器从 src1 中选择项目。

  3. Src2 现在等待下一个项目(来自 src1)与其当前项目(“2”)进行比较。

  4. 当 src1 发出下一个项目“3”时,再次运行选择,这次从 src2 中选择该项目。

  5. 重复此过程,直到其中一个可观察量完成。然后,剩余的 observable 会推送项目直到完成。

使用现有的 .net Rx 方法可以实现这一点吗?

编辑:请注意,保证 2 个源可观察量是有序的。

测试示例:

var source1 = new List<int>() { 1, 4, 6, 7, 8, 10, 14 }.AsEnumerable();

var source2 = new List<int>() { 2, 3, 5, 9, 11, 12, 13, 15 }.AsEnumerable();


var src1 = source1.ToObservable();

var src2 = source2.ToObservable();


var res = src1.SortedMerge(src2, (a, b) =>

    {

       if (a <= b)

           return a;

       else

           return b;

    });


res.Subscribe((x) => Console.Write($"{x}, "));

期望结果:1,2,3,4,5,6,7,8,9,10,11,12,13,14,15


森林海
浏览 108回答 1
1回答

跃然一笑

这很有趣。必须稍微调整一下算法。还可以进一步改进。假设:有两个通用类型 的streamA流。streamBT两个流分别排序,使得streamA[i] < streamA[i+1]和streamB[i] < stream[i+1]。您不能假设streamA[i]和之间有任何关系streamB[i]。流 A 和 B 是谨慎的:相同的元素不会从两者中发出。如果发生这种情况,我会扔掉NotImplementedException。这个案子很容易处理,但我想避免歧义。有一个min类型的函数T。没有对两条流的相对速度做出任何假设,但如果其中一条始终比另一条快,则背压将成为问题。这是我使用的算法:设两个队列,qA并且qB。当您从 获取一个项目时streamA,将其排队到qA。当您从 获取一个项目时streamB,将其排队到qB。当和&nbsp;qA中都有一个项目时qB,比较两个队列的顶部项目。删除并发出这两者的最小值。如果两个队列仍然非空,则重复。如果 或streamA完成streamB,则转储队列的内容并终止。注意:这无疑是懒惰的,可能应该更改为转储,然后继续返回未完成的 observable。这是代码:public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other){&nbsp; &nbsp; return SortedMerge(source, other, (a, b) => Enumerable.Min(new[] { a, b}));}public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other, Func<T, T, T> min){&nbsp; &nbsp; return source&nbsp; &nbsp; &nbsp; &nbsp; .Select(i => (key: 1, value: i)).Materialize()&nbsp; &nbsp; &nbsp; &nbsp; .Merge(other.Select(i => (key: 2, value: i)).Materialize())&nbsp; &nbsp; &nbsp; &nbsp; .Scan((qA: ImmutableQueue<T>.Empty, qB: ImmutableQueue<T>.Empty, exception: (Exception)null, outputMessages: new List<T>()),&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (state, message) =>&nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (message.Kind == NotificationKind.OnNext)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var key = message.Value.key;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var value = message.Value.value;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var qA = state.qA;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var qB = state.qB;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (key == 1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; qA = qA.Enqueue(value);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; qB = qB.Enqueue(value);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var output = new List<T>();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while(!qA.IsEmpty && !qB.IsEmpty)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var aVal = qA.Peek();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var bVal = qB.Peek();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var minVal = min(aVal, bVal);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(aVal.Equals(minVal) && bVal.Equals(minVal))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new NotImplementedException();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(aVal.Equals(minVal))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; output.Add(aVal);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; qA = qA.Dequeue();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; output.Add(bVal);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; qB = qB.Dequeue();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return (qA, qB, null, output);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else if (message.Kind == NotificationKind.OnError)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return (state.qA, state.qB, message.Exception, new List<T>());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else //message.Kind == NotificationKind.OnCompleted&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var output = state.qA.Concat(state.qB).ToList();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return (ImmutableQueue<T>.Empty, ImmutableQueue<T>.Empty, null, output);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; .Publish(tuples => Observable.Merge(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tuples&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Where(t => t.outputMessages.Any() && (!t.qA.IsEmpty || !t.qB.IsEmpty))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .SelectMany(t => t.outputMessages&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Select(v => Notification.CreateOnNext<T>(v))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ToObservable()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tuples&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Where(t => t.outputMessages.Any() && t.qA.IsEmpty && t.qB.IsEmpty)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .SelectMany(t => t.outputMessages&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Select(v => Notification.CreateOnNext<T>(v))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ToObservable()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Concat(Observable.Return(Notification.CreateOnCompleted<T>()))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tuples&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Where(t => t.exception != null)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .Select(t => Notification.CreateOnError<T>(t.exception))&nbsp; &nbsp; &nbsp; &nbsp; ))&nbsp; &nbsp; &nbsp; &nbsp; .Dematerialize();ImmutableQueue来自System.Collections.Immutable. Scan需要跟踪状态。由于OnCompleted处理需要具体化。诚然,这是一个复杂的解决方案,但我不确定是否有更干净的以 Rx 为中心的方法。如果您需要进一步说明,请告诉我。
随时随地看视频慕课网APP
我要回答