猿问

将数据库和网络调用与 RxJava2 结合

我有 2 个数据源:数据库(缓存)和 api,我需要将它们组合成一个流。我知道我可以简单地使用 concatArray 或类似的东西,但我想实现更复杂的行为:

  • 最多可发射 2 个元素的可观察流。

  • 它将在开始时订阅这两个来源。

  • 如果 api 调用足够快(<~300 毫秒),它将仅从中发出数据并完成流。

  • 如果 api 调用会很慢(>~300ms),从数据库发出数据并仍然等待来自 api 的数据

  • 如果 api 调用不会成功,则从数据库发出数据并发出错误。

  • 如果数据库以某种方式比 api 慢,它就不能发出它的数据(流完成解决了这个问题)

我用以下代码完成了它:

   public Observable<Entity> getEntity() {

    final CompositeDisposable disposables = new CompositeDisposable();

    return Observable.<Entity>create(emitter -> {

        final Entity[] localEntity = new Entity[1];


        //database call:

        disposables.add(database.getEntity()

                .subscribeOn(schedulers.io())

                .doOnSuccess(entity -> localEntity[0] = entity) //saving our entity because 

                                                        //apiService can emit error before 300 ms 

                .delay(300, MILLISECONDS)

                .subscribe((entity, throwable) -> {

                    if (entity != null && !emitter.isDisposed()) {

                        emitter.onNext(entity);

                    }

                }));


        //network call:

        disposables.add(apiService.getEntity()

                .subscribeOn(schedulers.io())

                .onErrorResumeNext(throwable -> {

                    return Single.<Entity>error(throwable) //we will delay error here

                            .doOnError(throwable1 -> {

                                if (localEntity[0] != null) emitter.onNext(localEntity[0]); //api error, emit localEntity

                            })

                            .delay(200, MILLISECONDS, true); //to let it emit localEntity before emitting error

                })

                .subscribe(entity -> {

                    emitter.onNext(entity); 

                    emitter.onComplete(); //we got entity from api, so we can complete the stream

                }, emitter::onError));

    })


代码有点笨重,我在 observable 中创建了 observables,我认为这是不好的。但是这样我就可以全局访问发射器,这使我能够以我想要的方式控制主流(发射数据、成功、错误)。


有没有更好的方法来实现这一目标?我很想看一些代码示例。谢谢!


慕哥9229398
浏览 200回答 3
3回答

慕妹3242003

可能是下面的代码可以完成这项工作。根据您的要求,我假设 api 和数据库处理Single<Entity>.private static final Object STOP = new Object();public static void main(String[] args) {&nbsp; &nbsp; Database database = new Database(Single.just(new Entity("D1")));&nbsp; &nbsp; ApiService apiService = new ApiService(Single.just(new Entity("A1")));&nbsp; &nbsp; // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(500, MILLISECONDS));&nbsp; &nbsp; // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));&nbsp; &nbsp; BehaviorSubject<Object> subject = BehaviorSubject.create();&nbsp; &nbsp; Observable.merge(&nbsp; &nbsp; &nbsp; &nbsp; apiService.getEntity()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toObservable()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnNext(t -> subject.onNext(STOP))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnError(e -> subject.onNext(STOP))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .onErrorResumeNext(t ->&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Observable.concatDelayError(database.getEntity().toObservable(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Observable.error(t))),&nbsp; &nbsp; &nbsp; &nbsp; database.getEntity()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .delay(300, MILLISECONDS)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toObservable()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .takeUntil(subject)&nbsp; &nbsp; )&nbsp; &nbsp; .subscribe(System.out::println,&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.err::println);&nbsp; &nbsp; Observable.timer(1, MINUTES) // just for blocking the main thread&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toBlocking()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribe();}我没能取出使用的Subject从因条件“如果数据库在某种程度上会比API更慢,它不能发射其数据”和“如果API调用将是缓慢的(>〜300毫秒),排出数据数据库并仍然等待来自 api 的数据”。否则,amb()运算符将是一个很好的用途。

MMMHUHU

另一种解决方案可能是这个(没有主题):public static void main(String[] args) throws InterruptedException {&nbsp; &nbsp; Database database = new Database(Single.just(new Entity("D1")));&nbsp; &nbsp; ApiService apiService = new ApiService(Single.just(new Entity("A1")));&nbsp; &nbsp; // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));&nbsp; &nbsp; // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));&nbsp; &nbsp; database.getEntity()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toObservable()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .groupJoin(apiService.getEntity()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.toObservable()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.onErrorResumeNext(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err -> Observable.concatDelayError(database.getEntity().toObservable(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Observable.error(err))),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;dbDuration -> Observable.timer(300, MILLISECONDS),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;apiDuration -> Observable.never(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;(db, api) -> api.switchIfEmpty(Observable.just(db)))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .flatMap(o -> o)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribe(System.out::println,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Throwable::printStackTrace,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;() -> System.out.println("It's the end!"));&nbsp; &nbsp; Observable.timer(1, MINUTES) // just for blocking the main thread&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toBlocking()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribe();}如果 API 服务在 300 毫秒 ( dbDuration -> timer(300, MILLISECONDS))内没有发出任何内容,则从数据库中发出实体 ( api.switchIfEmpty(db))。如果 api 在 300 毫秒内发出某些内容,则 仅发出其Entity( api.switchIfEmpty(.))。这似乎也如您所愿...

呼啦一阵风

另一个更好的解决方案:public static void main(String[] args) throws InterruptedException {&nbsp; &nbsp; Database database = new Database(Single.just(new Entity("D1")));&nbsp; &nbsp; ApiService apiService = new ApiService(Single.just(new Entity("A1")));&nbsp; &nbsp; // ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));&nbsp; &nbsp; // ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));&nbsp; &nbsp; Observable<Entity> apiServiceWithDbAsBackup =&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; apiService.getEntity()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toObservable()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .onErrorResumeNext(err ->&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Observable.concatDelayError(database.getEntity().toObservable(), Observable.error(err)));&nbsp; &nbsp; Observable.amb(database.getEntity()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.toObservable()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.delay(300, MILLISECONDS)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.concatWith(apiServiceWithDbAsBackup),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;apiServiceWithDbAsBackup)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribe(System.out::println,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Throwable::printStackTrace,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;() -> System.out.println("It's the end!"));我们使用amb()延迟到数据库 observable 来获取将发出的第一个。如果 api 服务出错,我们会从数据库中发出项目。这似乎也如您所愿...
随时随地看视频慕课网APP

相关分类

Java
我要回答