如何将轮询 api 转换为反应流

假设我有一个具有以下签名的函数:


class Item {

  String name;

  Long id;

}

public Flux<Item> getNew(long id);

getNew()返回在 id (0..N) 之后添加的项目流。那么如何将其变成无限流呢?


所以像这样:


public Flux<Item> observe(long id) {

    return Flux.interval(Duration.ofSeconds(1)).

             flatMap(counter -> getNew(id)); // <-- how to use last value from getNew flux as the new id                

}

我能够做到的唯一方法是使用某种类型的状态变量:


   public Flux<Long> observe(long id) {

     final AtomicLong last = new AtomicLong(id);

     return Flux.interval(Duration.ofSeconds(1)).

         flatMap(l -> getNew(last.get())).

         doOnNext(last::set);    

   }    

有没有更惯用的方法来做到这一点?我试图为此创建生成器,但我不知道如何实现它。


慕哥6287543
浏览 114回答 1
1回答

富国沪深

如果您可以通过检查识别出最后Item发出的信号,则可以使用运算符:getNew.expand   public Flux<Item> observe(long id) {        return getNew(id)                .expand(item -> isLast(item)                        ? getNew(item.id)                        : Flux.empty());    }    /**     * @return true if the given item is the last item emitted by getNew     */    private boolean isLast(Item item) {        return // ... snip ...    }如果您不能通过检查来识别最后一个Item,那么您将不得不使用状态变量。虽然,我建议使用.deferand.repeat而不是 .interval... public Flux<Item> observe(long id) {        final AtomicLong nextStartId = new AtomicLong(id);        return Flux.defer(() -> getNew(nextStartId.get()))                .doOnNext(item -> nextStartId.set(item.id))                .repeat();    }反对使用的主要原因.interval是:如果没有及时产生需求,将发出 onError 信号因此,如果 API 花费的时间太长,或者处理结果的时间太长,流将以错误结束。对于较长的间隔,这可能不是问题,但对于相对较短的间隔(例如您的示例中的 1 秒),这可能是一个问题。如果你想在每次重复迭代之前延迟,那么你可以使用.repeatWhen, 带有 reactor-extra 的Repeat固定退避。这将为您提供“固定延迟”语义,而不是“固定间隔”。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java