猿问

具有匿名线程的反应器调度程序

我正在测试 reactor 的工作原理,创建了与 reactor 文档中的代码非常相似的代码。


import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

import reactor.core.publisher.Mono;

import reactor.core.scheduler.Scheduler;

import reactor.core.scheduler.Schedulers;


@SpringBootTest

@RunWith(SpringRunner.class)

public class ReactorApplicationTests {


  @Test

  public void publishOnThreadTest() {

    Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);


    final Mono<String> mono = Mono.just("Publish on test: \n")

            .map(msg -> msg + "before: " + Thread.currentThread() )

            .publishOn(s)

            .map(msg -> msg + "\nafter: " + Thread.currentThread());


    new Thread(() -> mono.subscribe(System.out::println)).start();

  }

}

我不能让它运行,我做错了什么?只需订阅它就可以工作,但我想看看使用的线程并玩一下它。


慕尼黑5688855
浏览 108回答 1
1回答

凤凰求蛊

您的测试程序什么也不打印的原因是它退出得太早了。它应该等到调用 substriber 的方法:@Testpublic void publishOnThreadTest() throws InterruptedException {&nbsp; &nbsp; Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);&nbsp; &nbsp; CountDownLatch latch = new CountDownLatch(1);&nbsp; &nbsp; final Mono<String> mono = Mono.just("Publish on test: \n")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(msg -> msg + "before: " + Thread.currentThread() )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .publishOn(s)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(msg -> msg + "\nafter: " + Thread.currentThread());&nbsp; &nbsp; new Thread(() -> mono.subscribe((String str) ->{&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(str);&nbsp; &nbsp; &nbsp; &nbsp; latch.countDown();&nbsp; &nbsp; })).start();&nbsp; &nbsp; latch.await();}
随时随地看视频慕课网APP

相关分类

Java
我要回答