猿问

Reactor - 理解 .flatMap() 中的线程池

我试图了解反应式编程是如何工作的。我为此准备了简单的演示:WebClient来自 Spring Framework 的 reactive 将请求发送到简单的 rest api,并且此客户端在每个操作中打印线程名称。


休息API:


@RestController

@SpringBootApplication

public class RestApiApplication {


    public static void main(String[] args) {

        SpringApplication.run(RestApiApplication.class, args);

    }


    @PostMapping("/resource")

    public void consumeResource(@RequestBody Resource resource) {

        System.out.println(String.format("consumed resource: %s", resource.toString()));

    }

}


@Data

@AllArgsConstructor

class Resource {

    private final Long id;

    private final String name;

}

问题是行为与我预期的不同。


.map()我预计,.filter()和 的每次调用都.flatMap()将在线程上执行,而ormain的每次调用都将在 nio 线程池中的线程上执行。所以我希望日志看起来像:.doOnSuccess().doOnError


------- map [main] --------

------- filter [main] --------

------- flatmap [main] --------

(and so on...)

------- onsuccess [reactor-http-nio-2] --------

(and so on...)

但我得到的日志是:


------- map [main] --------

------- filter [main] --------

------- flatmap [main] --------

------- map [main] --------

------- filter [main] --------

------- flatmap [main] --------

------- onsuccess [reactor-http-nio-2] --------

------- onsuccess [reactor-http-nio-6] --------

------- onsuccess [reactor-http-nio-4] --------

------- onsuccess [reactor-http-nio-8] --------

------- map [reactor-http-nio-2] --------

------- filter [reactor-http-nio-2] --------

------- flatmap [reactor-http-nio-2] --------

------- map [reactor-http-nio-2] --------

每次下一次登录.map(),.filter()都是.flatMap()在 reactor-http-nio 的线程上完成的。


下一个难以理解的事实是在主线程和 reactor-http-nio 上执行的操作之间的比率总是不同的。有时所有操作.map(),.filter()和.flatMap()都在主线程上执行。


函数式编程
浏览 148回答 1
1回答

慕桂英546537

Reactor 和 RxJava 一样,可以被认为是并发不可知的。也就是说,它不强制执行并发模型。相反,它让您(开发人员)掌握一切。但是,这并不妨碍该库帮助您处理并发。获得aFlux或aMono并不一定意味着它运行在专用的Thread中。相反,大多数运算符继续在前一个运算符执行的线程中工作。subscribe()除非指定,否则最顶层的运算符(源)本身在进行调用的线程上运行。从您的代码中,以下代码段:webClient.post()          .uri("/resource")          .syncBody(res)          .header("Content-Type", "application/json")          .header("Accept", "application/json")          .retrieve()          .bodyToMono(Resource.class)导致线程从main切换到netty 的工作池。之后,以下所有操作均由 netty 工作线程执行。如果你想控制这种行为,你应该publishOn(...)在你的代码中添加一条语句,例如:webClient.post()          .uri("/resource")          .syncBody(res)          .header("Content-Type", "application/json")          .header("Accept", "application/json")          .retrieve()          .bodyToMono(Resource.class)          .publishOn(Schedulers.elastic())这样,弹性调度程序线程池将执行任何后续操作。另一个例子是使用专用调度程序处理 HTTP 请求执行后的繁重任务。import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;import static com.github.tomakehurst.wiremock.client.WireMock.get;import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;import com.github.tomakehurst.wiremock.WireMockServer;import java.util.concurrent.TimeUnit;import org.junit.jupiter.api.Test;import org.junit.jupiter.api.extension.ExtendWith;import org.springframework.web.reactive.function.client.ClientResponse;import org.springframework.web.reactive.function.client.WebClient;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;import reactor.core.scheduler.Schedulers;import ru.lanwen.wiremock.ext.WiremockResolver;import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;import ru.lanwen.wiremock.ext.WiremockUriResolver;import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;@ExtendWith({  WiremockResolver.class,  WiremockUriResolver.class})public class ReactiveThreadsControlTest {  private static int concurrency = 1;  private final WebClient webClient = WebClient.create();  @Test  public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {    String requestUri = "/slow-response";    server.stubFor(get(urlEqualTo(requestUri))      .willReturn(aResponse().withStatus(200)        .withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))    );    Flux      .generate(() -> Integer.valueOf(1), (i, sink) -> {        System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));        sink.next(i);        return i + 1;      })      .subscribeOn(Schedulers.single())      .flatMap(i ->          executeGet(uri + requestUri)            .publishOn(Schedulers.elastic())            .map(response -> {              heavyTask();              return true;            })        , concurrency)      .subscribe();    blockForever();  }  private void blockForever() {    Object monitor = new Object();    synchronized (monitor) {      try {        monitor.wait();      } catch (InterruptedException ex) {      }    }  }  private Mono<ClientResponse> executeGet(String path) {    System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));    return webClient      .get()      .uri(path)      .exchange();  }  private void heavyTask() {    try {      System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));      Thread.sleep(TimeUnit.SECONDS.toMillis(20));    } catch (InterruptedException ex) {    }  }}
随时随地看视频慕课网APP

相关分类

Java
我要回答