仅在需要时才在 Reactor 的 Flux 中请求下一个

我有一个 API,它返回实体列表,实体数量上限为 100 个。如果有更多实体,它将返回下一页的令牌。


我想创建一个通量,它返回所有实体(所有页面),但仅在需要时(如果有请求)返回。


我写了这段代码:


class Page {

    String token;

    List<Object> entities;

}


Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {

    return fct.apply(token).flatMapMany(page -> {

        if (page.token == null) {

            // no more pages

            return Flux.fromIterable(page.entities);

        }


        return Flux.fromIterable(page.entities).concatWith(Flux.defer(() -> load(page.token, fct)));

    });

}

它有效 - 几乎


如果我请求 99 个元素,则加载第一页,并且我的通量包含 99 个元素。


如果我请求 150 个元素,则会加载第一页和第二页,并且我的 Flux 包含 150 个元素。


但是,如果我请求 100 个元素,则会加载第一页和第二页(并且我的 Flux 包含 100 个元素)。我的问题是,第二页已加载,但我没有请求第 101 个元素。


当前行为:


subscribe()

=> Function is called to load page 1

request(10)

=> Received: 0-9

request(89)

=> Received: 10-98

request(1)

=> Received: 99

=> Function is called to load page 2

request(1)

=> Received: 100

预期是:页面 2 的加载发生在最后一个请求之后(1)


几乎就像在某个地方进行了预取,但我看不到在哪里。有任何想法吗?


肥皂起泡泡
浏览 177回答 1
1回答

郎朗坤

好的,我找到了。没有预取。事实上,它是Flux.defer根据订阅加载下一页的,而不是根据请求加载的。解决这个问题的快速(但肮脏的)测试是:Flux<Object> load(String token, final Function<String, Mono<Page>> fct) {&nbsp; &nbsp; return fct.apply(token).flatMapMany(page -> {&nbsp; &nbsp; &nbsp; &nbsp; if (page.token == null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // no more pages&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Flux.fromIterable(page.entities);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return Flux&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .fromIterable(page.entities)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .concatWith(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Flux.defer(() -> load(page.token, fct))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Flux.create(s -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; DelegateSubscriber[] ref = new DelegateSubscriber[1];&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s.onRequest(l -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (ref[0] == null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ref[0] = new DelegateSubscriber(s);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; load(page.token, fct).subscribe(ref[0]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ref[0].request(l);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }));&nbsp; &nbsp; });}static class DelegateSubscriber extends BaseSubscriber<Object> {&nbsp; &nbsp; FluxSink<Object> delegate;&nbsp; &nbsp; public DelegateSubscriber(final FluxSink<Object> delegate) {&nbsp; &nbsp; &nbsp; &nbsp; this.delegate = delegate;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; protected void hookOnSubscribe(Subscription subscription) {&nbsp; &nbsp; &nbsp; &nbsp; // nothing&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; protected void hookOnNext(Object value) {&nbsp; &nbsp; &nbsp; &nbsp; delegate.next(value);&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; protected void hookOnError(Throwable throwable) {&nbsp; &nbsp; &nbsp; &nbsp; delegate.error(throwable);&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java