Fork/Join 上下文中的 Phaser 与 CyclicBarrier

在尝试了解 Phaser 和 CyclicBarrier 之间的区别时,我遇到了一些链接 Difference between Phaser and CyclicBarrier和 https://www.infoq.com/news/2008/07/phasers/ 我读到 Phaser 与 Fork/ 兼容Join 接口,而 CyclicBarrier 没有,这里有一段代码来演示这一点:


移相器


 public static void main(String[] args) throws InterruptedException {


        CountDownLatch countDownLatch = new CountDownLatch(1);


        Phaser phaser = new Phaser(16){

            @Override

            protected boolean onAdvance(int phase, int registeredParties) {

                return phase ==1 || super.onAdvance(phase, registeredParties);

            }

        };


        System.out.println("Available Processors: "+Runtime.getRuntime().availableProcessors());


        ExecutorService executorService = ForkJoinPool.commonPool(); // Runtime.getRuntime().availableProcessors() -1


        for (int i = 0; i < 16; i++) {

            final int count = 0;

            executorService.submit(() -> {

                while (!phaser.isTerminated()) {

                    try {

                        Thread.sleep(ThreadLocalRandom.current().nextInt(300, 2000));

                        System.out.println(Thread.currentThread().getName() + count + " ... ");

                        phaser.arriveAndAwaitAdvance();

                        System.out.println(Thread.currentThread().getName() + count + " ... continues ... ");

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

                countDownLatch.countDown();

            });

        }

        countDownLatch.await();

    }


问题:


fork/join 如何通过 Phaser 而不是通过 CyclicBarrier 设法创建更多线程?为什么这些方法arriveAndAwaitAdvance()使线程池创建新线程,以及如何,但方法await()没有导致线程池创建更多线程?


胡说叔叔
浏览 102回答 1
1回答

白猪掌柜的

Phaser 之所以能够做到这一点,是因为它ForkJoinPool.managedBlock(ManagedBlocker)在阻塞线程时会在内部调用。任何人都可以访问 ForkJoinPool 的这个 API,因此您可以轻松地增强您的CyclicBarrier版本以使用它,并消除线程饥饿。例如,带有以下氛围的东西:ForkJoinPool.managedBlock(new ManagedBlocker() {&nbsp; &nbsp; boolean isReleasable = false;&nbsp; &nbsp; @Override&nbsp; &nbsp; public boolean block() throws InterruptedException {&nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cyclicBarrier.await();&nbsp; &nbsp; &nbsp; &nbsp; } catch (BrokenBarrierException aE) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new IllegalStateException(aE);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return isReleasable = true;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public boolean isReleasable() {&nbsp; &nbsp; &nbsp; &nbsp; return isReleasable;&nbsp; &nbsp; }});
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java