考虑以下代码:
AtomicInteger counter1 = new AtomicInteger();
AtomicInteger counter2 = new AtomicInteger();
Flux<Object> source = Flux.generate(emitter -> {
emitter.next("item");
});
Executor executor1 = Executors.newFixedThreadPool(32);
Executor executor2 = Executors.newFixedThreadPool(32);
Flux<String> flux1 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(1);
return "1_" + counter1.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));
Flux<String> flux2 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(100);
return "2_" + counter2.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor2)));
Flux.merge(flux1, flux2).subscribe(System.out::println);
您可以看到一个发布者比另一个发布者快 100 倍。不过,在运行代码时,似乎所有数据都已打印,但两个发布者之间存在巨大差距,这会增加加班时间。
有趣的是,当更改数字时executer2会有1024线程,并且executer1只有1线程,然后我们仍然会看到随着时间的推移越来越大的差距。
我的期望是,在相应地调整线程池之后,发布者将得到平衡。
我想在发布者之间取得平衡(相对于线程池大小和处理时间)
如果我等的时间足够长会发生什么?换句话说,是否会发生背压?(默认情况下,我猜这是一个运行时异常,对吧?)
我不想丢弃物品,也不想出现运行时异常。相反,正如我提到的,我希望系统在其拥有的资源和处理时间方面取得平衡——上面的代码是否承诺了这一点?
暮色呼如
相关分类