猿问

如何为 Flux.generate 指定调度程序

如何指定调度程序Flux.generate?我里面有阻塞呼叫,我希望能够取消它。到目前为止我通过以下方式破解了它

Flux<Integer> generate = Flux.generate(....);
Mono<List<Integer>> fut =
        Flux.just("ignored")
                .publishOn(Schedulers.single())
                .flatMap(ignored -> generate)
                .timeout(Duration.ofSeconds(2), Flux.empty())

还有更惯用的方式吗?


呼啦一阵风
浏览 88回答 1
1回答

德玛西亚99

使用订阅&nbsp; &nbsp; &nbsp; &nbsp; Flux<Integer> g1 = Flux.generate(c -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(Thread.currentThread());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c.next(1);&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(g1.take(5).collectList().block());&nbsp; &nbsp; &nbsp; &nbsp; Flux<Integer> g2 = g1.subscribeOn(Schedulers.elastic());&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(g2.take(5).collectList().block());输出Thread[main,5,main]Thread[main,5,main]Thread[main,5,main]Thread[main,5,main]Thread[main,5,main][1, 1, 1, 1, 1]Thread[elastic-2,5,main]Thread[elastic-2,5,main]Thread[elastic-2,5,main]Thread[elastic-2,5,main]Thread[elastic-2,5,main][1, 1, 1, 1, 1]
随时随地看视频慕课网APP

相关分类

Java
我要回答