猿问

如何收集顺序调用异步 API 的结果?

我有一个异步 API,它基本上通过分页返回结果


public CompletableFuture<Response> getNext(int startFrom);

每个Response对象包含一个偏移量列表startFrom和一个标志,指示是否还有更多元素剩余,因此,另一个getNext()请求。


我想编写一个遍历所有页面并检索所有偏移量的方法。我可以像这样以同步方式编写它


int startFrom = 0;

List<Integer> offsets = new ArrayList<>();


for (;;) {

    CompletableFuture<Response> future = getNext(startFrom);

    Response response = future.get(); // an exception stops everything

    if (response.getOffsets().isEmpty()) {

        break; // we're done

    }

    offsets.addAll(response.getOffsets());

    if (!response.hasMore()) {

        break; // we're done

    }

    startFrom = getLast(response.getOffsets());

}

换句话说,我们在 0 处调用getNext()with startFrom。如果抛出异常,我们将整个过程短路。否则,如果没有偏移,我们就完成了。如果有偏移量,我们将它们添加到主列表中。如果没有更多可取的,我们就完成了。否则,我们将 重置为startFrom我们获取并重复的最后一个偏移量。


理想情况下,我希望在不阻塞CompletableFuture::get()并返回CompletableFuture<List<Integer>>包含所有偏移量的情况下执行此操作。


我怎样才能做到这一点?我怎样才能组成期货来收集他们的结果?


我正在考虑“递归”(实际上不是在执行中,而是在代码中)


private CompletableFuture<List<Integer>> recur(int startFrom, List<Integer> offsets) {

    CompletableFuture<Response> future = getNext(startFrom);

    return future.thenCompose((response) -> {

        if (response.getOffsets().isEmpty()) {

            return CompletableFuture.completedFuture(offsets);

        }

        offsets.addAll(response.getOffsets());

        if (!response.hasMore()) {

            return CompletableFuture.completedFuture(offsets);

        }

        return recur(getLast(response.getOffsets()), offsets);

    });

}


public CompletableFuture<List<Integer>> getAll() {

    List<Integer> offsets = new ArrayList<>();

    return recur(0, offsets);

}

从复杂性的角度来看,我不喜欢这个。我们能做得更好吗?


森栏
浏览 122回答 2
2回答

慕桂英3389331

我还想尝试一下EA Async,因为它实现了对async/await 的Java 支持(受 C# 启发)。所以我只是拿了你的初始代码,并转换了它:public CompletableFuture<List<Integer>> getAllEaAsync() {&nbsp; &nbsp; int startFrom = 0;&nbsp; &nbsp; List<Integer> offsets = new ArrayList<>();&nbsp; &nbsp; for (;;) {&nbsp; &nbsp; &nbsp; &nbsp; // this is the only thing I changed!&nbsp; &nbsp; &nbsp; &nbsp; Response response = Async.await(getNext(startFrom));&nbsp; &nbsp; &nbsp; &nbsp; if (response.getOffsets().isEmpty()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break; // we're done&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; offsets.addAll(response.getOffsets());&nbsp; &nbsp; &nbsp; &nbsp; if (!response.hasMore()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break; // we're done&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; startFrom = getLast(response.getOffsets());&nbsp; &nbsp; }&nbsp; &nbsp; // well, you also have to wrap your result in a future to make it compilable&nbsp; &nbsp; return CompletableFuture.completedFuture(offsets);}然后您必须检测您的代码,例如通过添加Async.init();在你的main()方法开始时。我必须说:这看起来真的很神奇!在幕后,EA异步注意到还有一个Async.await()方法中调用,而重写能够处理所有的thenCompose()/ thenApply()/递归你。唯一的要求是您的方法必须返回一个CompletionStage或CompletableFuture。这真的是异步代码变得简单!

慕森王

为了练习,我制作了这个算法的通用版本,但它相当复杂,因为你需要:调用服务的初始值 (the&nbsp;startFrom)服务调用本身 (&nbsp;getNext())用于累积中间值的结果容器 (the&nbsp;offsets)一个累加器 (&nbsp;offsets.addAll(response.getOffsets()))执行“递归”的条件 (&nbsp;response.hasMore())计算下一个输入的函数 (&nbsp;getLast(response.getOffsets()))所以这给出了:public <T, I, R> CompletableFuture<R> recur(T initialInput, R resultContainer,&nbsp; &nbsp; &nbsp; &nbsp; Function<T, CompletableFuture<I>> service,&nbsp; &nbsp; &nbsp; &nbsp; BiConsumer<R, I> accumulator,&nbsp; &nbsp; &nbsp; &nbsp; Predicate<I> continueRecursion,&nbsp; &nbsp; &nbsp; &nbsp; Function<I, T> nextInput) {&nbsp; &nbsp; return service.apply(initialInput)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .thenCompose(response -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; accumulator.accept(resultContainer, response);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (continueRecursion.test(response)) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return recur(nextInput.apply(response),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; resultContainer, service, accumulator,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; continueRecursion, nextInput);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return CompletableFuture.completedFuture(resultContainer);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });}public CompletableFuture<List<Integer>> getAll() {&nbsp; &nbsp; return recur(0, new ArrayList<>(), this::getNext,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (list, response) -> list.addAll(response.getOffsets()),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Response::hasMore,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; r -> getLast(r.getOffsets()));}的一小的简化recur()是可能通过替换initialInput由所述CompletableFuture由所述第一呼叫的结果返回时,resultContainer并且accumulator可以被合并成一个单一的Consumer和service然后可以用合并后的nextInput功能。但这有点复杂getAll():private <I> CompletableFuture<Void> recur(CompletableFuture<I> future,&nbsp; &nbsp; &nbsp; &nbsp; Consumer<I> accumulator,&nbsp; &nbsp; &nbsp; &nbsp; Predicate<I> continueRecursion,&nbsp; &nbsp; &nbsp; &nbsp; Function<I, CompletableFuture<I>> service) {&nbsp; &nbsp; return future.thenCompose(result -> {&nbsp; &nbsp; &nbsp; &nbsp; accumulator.accept(result);&nbsp; &nbsp; &nbsp; &nbsp; if (continueRecursion.test(result)) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return recur(service.apply(result), accumulator, continueRecursion, service);&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return CompletableFuture.completedFuture(null);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; });}public CompletableFuture<List<Integer>> getAll() {&nbsp; &nbsp; ArrayList<Integer> resultContainer = new ArrayList<>();&nbsp; &nbsp; return recur(getNext(0),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result -> resultContainer.addAll(result.getOffsets()),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Response::hasMore,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; r -> getNext(getLast(r.getOffsets())))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .thenApply(unused -> resultContainer);}
随时随地看视频慕课网APP

相关分类

Java
我要回答