如何将任务列表转换为 Observable 并在完成后处理元素?

给定一个集合Tasks:


var americanAirlines = new FlightPriceChecker("AA");

...

var runningTasks = new List<Task<IList<FlightPrice>>>

{

    americanAirlines.GetPricesAsync(from, to),

    delta.GetPricesAsync(from, to),

    united.GetPricesAsync(from, to)

};

我想GetPricesAsync()以它们到达的任何顺序处理结果。目前,我正在使用 while 循环来实现这一点:


while (runningTasks.Any())

{

    // Wait for any task to finish

    var completed = await Task.WhenAny(runningTasks);

    // Remove from running list   

    runningTasks.Remove(completed);

    // Process the completed task (updates a property we may be binding to)

    UpdateCheapestFlight(completed.Result);

}

这是一个可以使用 Rx 更优雅地解决的问题吗?我尝试使用类似下面的代码但卡住了,因为我必须在某个地方await每个getFlightPriceTask都会阻塞,然后才执行下一个,而不是执行第一个完成的然后等待下一个:


runningTasks

  .ToObservable()

  .Select(getFlightPriceTask => .???.)


德玛西亚99
浏览 240回答 3
3回答

翻过高山走不出你

尝试这个:runningTasks&nbsp; .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())&nbsp; .Merge()&nbsp; .Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))

慕尼黑5688855

(使用Merge()就是诀窍!),我想对此发表评论并提出替代解决方案。评论 Shlomo 的解决方案这个解决方案非常简单,表达了 Rx 的优雅。唯一的问题是不能等待完成。这在生产代码中通常不是问题,我们只关心更新然后绑定到 UI 的属性。我的另一个评论是计算是在Subscribe()- 有些人喜欢保持订阅超轻量级的,但我认为这主要是个人喜好。runningTasks&nbsp; // Get all tasks and turn them into Observables.&nbsp; .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())&nbsp; // Merge all tasks (in my case 3) into one "lane". Think of cars trying&nbsp; // to leave a three lane highway and going for a one lane exit.&nbsp; .Merge()&nbsp; // For every task "leaving the highway" calculate the minimum price.&nbsp; .Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))备选方案 1:使用 Do()这根本没有使用Subscribe(),这有点违背 Rx 的想法,但它可以等待,因此表现得像原始版本。await runningTasks&nbsp; &nbsp; .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())&nbsp; &nbsp; .Merge()&nbsp; &nbsp; // Process result of each task.&nbsp; &nbsp; .Do(flightPrices => UpdateCheapestFlight(flightPrices))&nbsp; &nbsp; // Taking all elements will only complete if all three tasks have completed.&nbsp; &nbsp; .Take(runningTasks.Count);备选方案 2:消除 UpdateCheapestFlight()最后,我认为做这种更具 Rx 风格的方法是根本不使用原始的辅助方法,而是讲述一个易于阅读的“Rx-story”。var minFlightPrice = await runningTasks&nbsp; &nbsp; // Get all the tasks and turn them into Observables&nbsp;&nbsp; &nbsp; .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())&nbsp; &nbsp; // Merge all three into one "lane".&nbsp; &nbsp; .Merge()&nbsp; &nbsp; // Get local minimum value of each airline&nbsp; &nbsp; .Select(x => x.Min())&nbsp; &nbsp; // Take all the local minimums...&nbsp; &nbsp; .Take(runningTasks.Count)&nbsp; &nbsp; // ...and find the minimum of them.&nbsp; &nbsp; .Min();

BIG阳

这是另一个解决方案:await runningTasks&nbsp; &nbsp; .ToObservable()&nbsp; &nbsp; .Merge()&nbsp; &nbsp; .Do(result => UpdateCheapestFlight(result))&nbsp; &nbsp; .DefaultIfEmpty();它看起来类似于 Shlomo 的解决方案,但有一个细微的区别:任务不是投影到嵌套的 observable ( IObservable<IObservable<TResult>>),而是投影到任务的 observable ( IObservable<Task<TResult>>)。Rx 包含Merge对这两种结构都起作用的运算符的重载。后者稍微更有效,因为它避免了创建任务的一些一次性包装器。当我们从异步委托而不是已经物化的任务开始时,前者更强大,因为它允许控制并发级别(通过不一次启动所有任务),还因为它可以处理任何挂起任务的自动取消以防结果 observable 因任何原因(包括任何任务中发生的错误)在任何时间取消订阅。该Do运营商用于处理的任务的结果在他们的完成,一次一个结果的顺序。最后DefaultIfEmpty需要操作员,以防止InvalidOperationException在初始任务列表为空的情况下发生。这是因为等待生成的 observable,并且等待 observable 需要返回一个值(最后发出的值)。以下是上述示例中使用的 Rx 运算符的签名:// Converts an enumerable sequence to an observable sequence.public static IObservable<TSource> ToObservable<TSource>(&nbsp; &nbsp; this IEnumerable<TSource> source);// Merges results from all source tasks into a single observable sequence.public static IObservable<TSource> Merge<TSource>(&nbsp; &nbsp; this IObservable<Task<TSource>> sources);// Invokes an action for each element in the observable sequence, and propagates// all observer messages through the result sequence. This method can be used for// debugging, logging, etc. of query behavior by intercepting the message stream// to run arbitrary actions for messages on the pipeline.public static IObservable<TSource> Do<TSource>(this IObservable<TSource> source,&nbsp; &nbsp; Action<TSource> onNext);// Returns the elements of the specified sequence or the type parameter's default// value in a singleton sequence if the sequence is empty.public static IObservable<TSource> DefaultIfEmpty<TSource>(&nbsp; &nbsp; this IObservable<TSource> source);
打开App,查看更多内容
随时随地看视频慕课网APP