Flowable 来自 Cache 和其他 Flowable for DataSource

我是 RxJava 的新手,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。


我有 Dao,它Flowable<Item>在一定范围内提供给 DataSource 类。该数据源有本地缓存,可以随时失效。当存储库向 DataSource 询问某个范围时(可能超出 DataSourse 范围,边界在完全缓存之前是未知的)它必须产生错误(或以其他方式通知 Repository)。


我想Flowable<Item>为 DataSource 创建方法,它将从缓存中发出项目,如果需要,将它们与连接起来Flowable<Item> dao.getRange(...),同时缓存来自 dao 的新项目。我还需要处理来自 dao 的错误,它们必须被处理或转换为更高级别的错误。


DataSource.class:


List<Item> cache;


Flowable<Item> getRange(int start, int amount) {


    final int cacheSize = cache.size();

    final int canLoadFromCache = cacheSize - start;

    final int loadFromDao = amount - canLoadFromCache;


    if (isCorrupted) return Flowable.fromCallable(() -> {

        throw new Exception("CorruptedDatasource");

    });


    Flowable<Item> cacheFlow = null;

    Flowable<Item> daoFlow = null;


    if (canLoadFromCache > 0) {

        cacheFlow = Flowable.fromIterable(

                cache.subList(start, canLoadFromCache)

        );


        daoFlow = dao.getRange(

                uri, 

                cacheSize, //start

                loadFromDao //amount

        );

    } else {

        if (isFullyCached) return Flowable.fromCallable(() -> {

            throw new Exception("OutOfBounds");

        });


        //To not deal with gaps load and cache data between;

        //Or replace it with data structure which can handle for us;

        daoFlow = dao.getRange(

                uri,

                cacheSize,

                start - cacheSize + amount);

        //all these items should be cached;

        //other cached and put downstream;

        //Dao errs should be converted to higher lever exceptions,

        //Or set flags in DataSource;

    }

    // return concatenated flowable;

}

在更高级别的存储库连接来自多个数据源的数据,因此必须有一种方法来连接来自多个来源的范围,如果一个来源还不够,则应该添加下一个来源的范围。


红颜莎娜
浏览 70回答 1
1回答

吃鸡游戏

尝试concatorconcatEager连接两个可观察值。也doOnNext()可以doOnError()帮助你缓存和错误处理List<Item> cache;Flowable<Item> getRange(int start, int amount) {&nbsp; &nbsp; ...&nbsp; &nbsp; &nbsp; &nbsp; if (isFullyCached) return Flowable.fromCallable(() -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new Exception("OutOfBounds");&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; //To not deal with gaps load and cache data between;&nbsp; &nbsp; &nbsp; &nbsp; //Or replace it with data structure which can handle for us;&nbsp; &nbsp; &nbsp; &nbsp; daoFlow = dao.getRange(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; uri,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cacheSize,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; start - cacheSize + amount);&nbsp; &nbsp; &nbsp; &nbsp; //all these items should be cached;&nbsp; &nbsp; &nbsp; &nbsp; //other cached and put downstream;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnNext(result -> /* insert caching logic here */)&nbsp; &nbsp; &nbsp; &nbsp; //Dao errs should be converted to higher lever exceptions,&nbsp; &nbsp; &nbsp; &nbsp; //Or set flags in DataSource;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnError(error -> /* handle error here */)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .onErrorReturn(/* and/or return some empty item */)&nbsp; &nbsp; }&nbsp; &nbsp; // return concatenated flowable;&nbsp; &nbsp; return cacheFlow.concat(daoFlow);}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java