如何获取 Flux 的最后一项而不用 reduce() 或 last() 折叠它?这是我的用例:
1)我有Flux<T>根据状态生成的生成器。2)当内部Flux完成时,它会改变影响我在生成器中发出的下一个对象的状态Flux。
示意性地看起来像这样
static class State {
int secret = 2;
int iteration = 0;
}
Random rand = new Random(1024);
Flux<Integer> stream = Flux.<Flux<Integer>, State>generate(State::new, (state, sink) -> {
System.out.println(String.format("Generate: %d", state.secret));
Flux<Integer> inner = Flux.range(1, rand.nextInt(10));
sink.next(inner.doOnComplete(() -> {
// How do I get last item of `inner` here ?
// For example I'd like to decrement `state.secret` by last value of `inner`
}));
return state;
}).flatMap(Function.identity());
UPD:我没有标记我的答案,因为黑客被证明是不可靠的。有可能.generate()在前一个被完全消耗之前被调用Flux,因此使得值last不正确。
幕布斯6054654
相关分类