我想合并 2 个可观察值并保持顺序(可能基于选择器)。我还想对可观察的来源施加反压力。
因此,选择器会选择其中一个项目通过可观察对象进行推送,而另一个项目也会等待另一个项目进行比较。
Src1、Src2 和 Result 都是 类型IObservable<T>。
Src1: { 1,3,6,8,9,10 }
Src2: { 2,4,5,7,11,12 }
Result: 1,2,3,4,5,6,7,8,9,10,11,12
Timeline:
Src1: -1---3----6------8----9-10
Src2: --2-----4---5-7----11---------12
Result: --1--2--3-4-5-6--7-8--9-10-11-12
在上面的示例中,src1 发出“1”并被阻塞,直到 src2 发出它的第一项“2”。
应用一个选择器来选择最小的项目,该选择器从 src1 中选择项目。
Src2 现在等待下一个项目(来自 src1)与其当前项目(“2”)进行比较。
当 src1 发出下一个项目“3”时,再次运行选择,这次从 src2 中选择该项目。
重复此过程,直到其中一个可观察量完成。然后,剩余的 observable 会推送项目直到完成。
使用现有的 .net Rx 方法可以实现这一点吗?
编辑:请注意,保证 2 个源可观察量是有序的。
测试示例:
var source1 = new List<int>() { 1, 4, 6, 7, 8, 10, 14 }.AsEnumerable();
var source2 = new List<int>() { 2, 3, 5, 9, 11, 12, 13, 15 }.AsEnumerable();
var src1 = source1.ToObservable();
var src2 = source2.ToObservable();
var res = src1.SortedMerge(src2, (a, b) =>
{
if (a <= b)
return a;
else
return b;
});
res.Subscribe((x) => Console.Write($"{x}, "));
期望结果:1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
跃然一笑
相关分类