从 while 循环创建 Flowable

我是 RxJava 的新手,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。


先是自己写了dao,处理InputStream,提供指定范围内的Item。目前它只是在列表中收集数据,但我想使用 flowable 一个一个地提供项目;目前它提供Maybe<List<Item>>。还有一些错误需要传输到更高级别(数据源)。比如EndOfFile,通知DataSource数据缓存完毕;


Dao.class:


List<Item> loadRange(int start, int number) throws ... {

    ...

    while(...) {

        ...

        //TODO contribute item to flowable

        resultList.add(new Item(...)) 


    }

    return resultList;

}

Maybe<List<Item>>刚刚创建的方法Maybe.fromCallable();


蓝山帝景
浏览 136回答 1
1回答

千巷猫影

这样的事情应该适用于此:Flowable<Item> loadRange(int start, int number) {&nbsp; &nbsp; &nbsp; &nbsp; return Flowable.create(emitter -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (...){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; emitter.onNext(new Item());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; emitter.onComplete();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } catch (IOException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; emitter.onError(e);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }, BackpressureStrategy.BUFFER);&nbsp; &nbsp; }我假设一旦循环完成你想要完成,也向下游发送错误,而不是处理方法签名。您也可以更改BackPressureStrategy以适合您的用例,即DROP,LATEST等等。由于您是 RxJava 的新手,匿名类将是:Flowable<Item> loadRange(int start, int number) {&nbsp; &nbsp; &nbsp; &nbsp; return Flowable.create(new FlowableOnSubscribe<Item>() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override public void subscribe(FlowableEmitter<Item> emitter) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (...){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; emitter.onNext(new Item());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; emitter.onComplete();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } catch (IOException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; emitter.onError(e);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }, BackpressureStrategy.BUFFER);&nbsp; &nbsp; }
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java