我想创建一个Flux具有按需生成的元素且预取有限的元素。
我尝试了以下操作,但看起来这段代码无法处理背压,因为它generateElements变得非常大 (1011):
AtomicInteger generateElements = new AtomicInteger(0);
Flux<Integer> source = Flux
.create(emitter -> {
while (true)
emitter.next(generateElements.getAndIncrement());
})
.subsribeOn(Schedulers.elastic())
.limitRate(1);
source.take(4).subsribe(System.out::println);
assertThat(generateElements.get()).isEqualTo(5);
我怎样才能使我的Flux预取仅限于一次?
森栏
相关分类