我知道 subscribeOn 用于在订阅序列时切换执行线程,但我发现它不适用于 ServerRequest.bodyToMono/Flux
就像是
Flux.just(1,2,3)
.doOnNext(integer -> log.info("test {}",integer))
.subscribeOn(Schedulers.elastic())
.subscribe();
会使执行线程改变
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 1
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 2
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 3
但令我困惑的是
假设我有一个 Spring WebFlux 路由器:
@Configuration
public class TestRouter {
@Bean
public RouterFunction<ServerResponse> testRouterFunction(TestService testService) {
return route().path("/test", builder -> builder.nest(accept(MediaType.ALL),
route -> route.PUT("/", req -> {
Mono<String> valueMono = req.bodyToMono(String.class);
return ServerResponse.ok().body(testService.test(valueMono), String.class);
}))).build();
}
}
和一个服务:
@Service
@Slf4j
public class TestService {
public Mono<String> test(Mono<String> mono) {
return mono
.doOnSubscribe(subscription -> log.info("on subscribe"))
.subscribeOn(Schedulers.elastic())
.doOnNext(s -> log.info("received {}", s))
.subscribeOn(Schedulers.elastic());
}
}
基本逻辑是 http put 请求到 localhost:port/test 将接收以纯文本形式发送到服务器的内容
我尝试让 doOnNext 在其他线程而不是 Spring WebFlux 的 NIO 线程上运行,无论我放在哪里
subscribeOn
执行线程始终是 NIO 线程:
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : on subscribe
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : received test
感谢@MichaelBerry @SimonBaslé,你们俩都帮了我很多忙,对你们的答案都投赞成票
简而言之,reactor-netty 将覆盖 http 订阅的 subscribeOn,使用 aflatMap()包含一个单独的subscribeOn()on 不同的Mono/Flux或publishOn()可以完成我想要的工作
白衣非少年
相关分类