如何获取 Flux 的最后一项而不用 reduce() 或 last() 折叠它

如何获取 Flux 的最后一项而不用 reduce() 或 last() 折叠它?这是我的用例:


1)我有Flux<T>根据状态生成的生成器。2)当内部Flux完成时,它会改变影响我在生成器中发出的下一个对象的状态Flux。


示意性地看起来像这样


static class State {

    int secret = 2;

    int iteration = 0;

}


Random rand = new Random(1024);

Flux<Integer> stream = Flux.<Flux<Integer>, State>generate(State::new, (state, sink) -> {


    System.out.println(String.format("Generate: %d", state.secret));

    Flux<Integer> inner = Flux.range(1, rand.nextInt(10));


    sink.next(inner.doOnComplete(() -> {

        // How do I get last item of `inner` here ?

        // For example I'd like to decrement `state.secret` by last value of `inner`

    }));


    return state;

}).flatMap(Function.identity());

UPD:我没有标记我的答案,因为黑客被证明是不可靠的。有可能.generate()在前一个被完全消耗之前被调用Flux,因此使得值last不正确。


BIG阳
浏览 37回答 1
1回答

幕布斯6054654

第一个版本不可靠。我又黑了一个:static <T> Flux<T> expandOnLastItem(Supplier<Flux<T>> seed, Function<T, Flux<T>> generator) {&nbsp; &nbsp; return Flux.just(new AtomicReference<T>())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .flatMap(last -> Flux.just(seed.get().materialize())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .flatMap(Function.identity())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .expand(v -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (v.hasValue()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; last.set(v.get());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else if (v.isOnComplete() && last.get() != null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Flux<T> res = generator.apply(last.get());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; last.set(null);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return res.materialize();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Flux.empty();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .filter(s -> !s.isOnComplete())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .dematerialize());}可以用作static Flux<Integer> getPage(int pageId, int size) {&nbsp; &nbsp; return Flux.defer(() -> {&nbsp; &nbsp; &nbsp; &nbsp; if (pageId < 3) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Returning data for pageId: " + pageId);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Flux.range(pageId * 100, size);&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Returning empty for pageId: " + pageId);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Flux.empty();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; });}expandOnLastItem(&nbsp; &nbsp; &nbsp; &nbsp; () -> getPage(0, 5),&nbsp; &nbsp; &nbsp; &nbsp; lastId -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("&nbsp; Expanding. Last item: " + lastId);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int curPage = lastId / 100;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return getPage(curPage + 1, 5);&nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; .reduce(0L, (count, value) -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("==> " + value);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return count + 1;&nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; .block();所以我通过改变生成器中的状态变量来破解它。它可以工作,但功能不是很好。如果其他人可以提出替代方案,我将不胜感激。Random rand = new Random(1024);Flux.<Flux<String>, State>generate(State::new, (state, sink) -> {&nbsp; &nbsp; if (state.iteration < 4) {&nbsp; &nbsp; &nbsp; &nbsp; final int count = rand.nextInt(10) + 1;&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(String.format("*** Generate %d: start %d (count %d)", state.iteration, state.secret, count));&nbsp; &nbsp; &nbsp; &nbsp; Flux<Integer> inner = Flux.range(state.secret, count);&nbsp; &nbsp; &nbsp; &nbsp; final int[] last = {Integer.MIN_VALUE};&nbsp; &nbsp; &nbsp; &nbsp; sink.next(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; inner&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnNext(value -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; last[0] = value;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(value -> String.format("Iter %d value %d", state.iteration, value))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnComplete(() -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(String.format("Inner complete (last item was %d)", last[0]));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state.secret = last[0];&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state.iteration += 1;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }));&nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Generate complete");&nbsp; &nbsp; &nbsp; &nbsp; sink.complete();&nbsp; &nbsp; }&nbsp; &nbsp; return state;})&nbsp; &nbsp; &nbsp; &nbsp; .flatMap(Function.identity())&nbsp; &nbsp; &nbsp; &nbsp; .map(value -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(String.format("Ext map: %s", value));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value;&nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; .buffer(5)&nbsp; &nbsp; &nbsp; &nbsp; .flatMapIterable(Function.identity())&nbsp; &nbsp; &nbsp; &nbsp; .subscribe(value -> System.out.println(String.format("&nbsp; ---> %s", value)));System.out.println("Exiting");
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java