我有 2 个数据源:数据库(缓存)和 api,我需要将它们组合成一个流。我知道我可以简单地使用 concatArray 或类似的东西,但我想实现更复杂的行为:
最多可发射 2 个元素的可观察流。
它将在开始时订阅这两个来源。
如果 api 调用足够快(<~300 毫秒),它将仅从中发出数据并完成流。
如果 api 调用会很慢(>~300ms),从数据库发出数据并仍然等待来自 api 的数据
如果 api 调用不会成功,则从数据库发出数据并发出错误。
如果数据库以某种方式比 api 慢,它就不能发出它的数据(流完成解决了这个问题)
我用以下代码完成了它:
public Observable<Entity> getEntity() {
final CompositeDisposable disposables = new CompositeDisposable();
return Observable.<Entity>create(emitter -> {
final Entity[] localEntity = new Entity[1];
//database call:
disposables.add(database.getEntity()
.subscribeOn(schedulers.io())
.doOnSuccess(entity -> localEntity[0] = entity) //saving our entity because
//apiService can emit error before 300 ms
.delay(300, MILLISECONDS)
.subscribe((entity, throwable) -> {
if (entity != null && !emitter.isDisposed()) {
emitter.onNext(entity);
}
}));
//network call:
disposables.add(apiService.getEntity()
.subscribeOn(schedulers.io())
.onErrorResumeNext(throwable -> {
return Single.<Entity>error(throwable) //we will delay error here
.doOnError(throwable1 -> {
if (localEntity[0] != null) emitter.onNext(localEntity[0]); //api error, emit localEntity
})
.delay(200, MILLISECONDS, true); //to let it emit localEntity before emitting error
})
.subscribe(entity -> {
emitter.onNext(entity);
emitter.onComplete(); //we got entity from api, so we can complete the stream
}, emitter::onError));
})
代码有点笨重,我在 observable 中创建了 observables,我认为这是不好的。但是这样我就可以全局访问发射器,这使我能够以我想要的方式控制主流(发射数据、成功、错误)。
有没有更好的方法来实现这一目标?我很想看一些代码示例。谢谢!
慕妹3242003
MMMHUHU
呼啦一阵风
相关分类