使用 Rx java Observable 一次进行多个异步调用(触发和忘记调用)

我有一个需要异步调用的下游 api 调用列表(大约 10 个)。到目前为止,我一直在使用 callables


List<RequestContextPreservingCallable <FutureResponse>> callables

我会将 api 调用添加到此列表中,并在最后使用 executeAsyncNoReturnRequestContextPreservingCallables 提交它。


使用 Rx java Observables 我该怎么做?


List<RequestContextPreservingCallable<FutureResponse>> callables = new 

ArrayList<RequestContextPreservingCallable<FutureResponse>>();


callables.add(apiOneConnector.CallToApiOne(name));

callables.add(apiTwoConnector.CallToApiTWO(sessionId));

....


//execute all the calls

executeAsyncNoReturnRequestContextPreservingCallables(callables);


蓝山帝景
浏览 332回答 1
1回答

慕神8447489

您可以使用zip运算符。该zip运营商可以采取多种观测,并同时执行它们,所有的结果已经抵达后,将继续进行。然后,您可以将这些结果转换为您需要的形式并传递到下一个级别。按照你的例子。假设您有多个 API 调用来获取名称和会话等,如下所示Observable.zip(getNameRequest(), getSessionIdRequest(), new BiFunction<String, String, Object>() {&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public Object apply(String name, String sessionId) throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // here you will get all the results once everything is completed. you can then take these&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // results and transform into another object and returnm from here. I decided to transform the results into an Object[]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // the retuen type of this apply funtion is generic, so you can choose what to return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Object[]{name, sessionId};&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; })&nbsp; &nbsp; .subscribeOn(Schedulers.io())&nbsp; // will start this entire chain in an IO thread&nbsp; &nbsp; .observeOn(AndroidSchedulers.mainThread()) // observeOn will filp the thread to the given one , so that the downstream will be executed in the specified thread. here I'm switching to main at this point onwards&nbsp; &nbsp; .subscribeWith(new DisposableObserver<Object>() {&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void onNext(Object finalResult) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// here you will get the final result with all the api results&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void onError(Throwable e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // any error during the entire process will be triggered here&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void onComplete() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;//will be called once the whole chain is completed and terminated&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; });您甚至可以将 observables 列表传递给zip如下&nbsp; &nbsp; List<Observable<String>> requests = new ArrayList<>();&nbsp; &nbsp; requests.add(getNameRequest());&nbsp; &nbsp; requests.add(getSessionIdRequest());&nbsp; &nbsp; Observable.zip(requests, new Function<Object[], Object[]>() {&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public Object[] apply(Object[] objects) throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Object[]{objects[0], objects[1]};&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }).subscribeWith(new DisposableObserver<Object[]>() {&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void onNext(Object[] objects) {&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void onError(Throwable e) {&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public void onComplete() {&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java