当ForkJoinPool结束时

我试图弄清楚所有 ForkJoinPool 线程何时完成其任务。我编写了这个测试应用程序(我使用 System.out 因为它只是一个快速测试应用程序,并且没有错误检查/处理):


public class TestForkJoinPoolEnd {

    private static final Queue<String> queue = new LinkedList<>();

    private static final int MAX_SIZE = 5000;

    private static final int SPEED_UP = 100;


    public static void main(String[] args) {

        ForkJoinPool customThreadPool = new ForkJoinPool(12);

        customThreadPool.submit(

                () -> makeList()

                        .parallelStream()

                        .forEach(TestForkJoinPoolEnd::process));

        enqueue("Theard pool started up");


        int counter = MAX_SIZE + 1;

        while (!customThreadPool.isTerminating()) {

            String s = dequeue();

            if (s != null) {

                System.out.println(s);

                counter--;

            }

            try {

                TimeUnit.MILLISECONDS.sleep(1);

            } catch (InterruptedException e) {


            }

        }

        System.out.println("counter = " + counter);

        System.out.println("isQuiescent = " + customThreadPool.isQuiescent()     + " isTerminating " +

                "= " + customThreadPool.isTerminating() + " isTerminated = "

                + customThreadPool.isTerminated() + " isShutdown =" +     customThreadPool.isShutdown());

    }


    static List<String> makeList() {

        return Stream.generate(() -> makeString())

                .limit(MAX_SIZE)

                .collect(Collectors.toList());

    }


    static String makeString() {

        int leftLimit = 97; // letter 'a'

        int rightLimit = 122; // letter 'z'

        int targetStringLength = 10;

        Random random = new Random();

    }

}

这段代码被卡住并且永远无法完成。如果我将 while 循环的条件更改为!customThreadPool.isQuiescent()它会终止循环,计数器和队列大小设置为 1。


我应该使用什么来确定线程何时完成?


慕侠2389804
浏览 102回答 1
1回答

LEATH

AnExecutorService不会仅仅因为一项作业(及其子作业)完成而自行终止。线程池背后的整个想法是可重用。因此,只有当应用程序调用shutdown()它时,它才会终止。您可以使用它isQuiescent()来查找是否没有待处理的作业,这仅在所有提交的作业都属于您的特定任务时才有效。使用返回的 future 来submit检查实际工作的完成情况要简洁得多。在任何一种情况下,排队任务的完成状态都不会说明您正在轮询的队列。当您了解提交结束时,您仍然需要检查队列中是否有未决元素。此外,建议使用线程安全BlockingQueue实现而不是装饰LinkedListwithsynchronized块。加上一些需要清理的其他内容,代码将如下所示:public class TestForkJoinPoolEnd {&nbsp; &nbsp; private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>();&nbsp; &nbsp; private static final int MAX_SIZE = 5000;&nbsp; &nbsp; private static final int SPEED_UP = 100;&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; ForkJoinPool customThreadPool = new ForkJoinPool(12);&nbsp; &nbsp; &nbsp; &nbsp; ForkJoinTask<?> future = customThreadPool.submit(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; () -> makeList()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .parallelStream()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .forEach(TestForkJoinPoolEnd::process));&nbsp; &nbsp; &nbsp; &nbsp; QUEUE.offer("Theard pool started up");&nbsp; &nbsp; &nbsp; &nbsp; int counter = MAX_SIZE + 1;&nbsp; &nbsp; &nbsp; &nbsp; while (!future.isDone()) try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String s = QUEUE.poll(1, TimeUnit.MILLISECONDS);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (s != null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(s);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; counter--;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; } catch (InterruptedException e) {}&nbsp; &nbsp; &nbsp; &nbsp; for(;;) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String s = QUEUE.poll();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (s == null) break;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(s);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; counter--;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("counter = " + counter);&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("isQuiescent = " + customThreadPool.isQuiescent()&nbsp; &nbsp; &nbsp;+ " isTerminating " +&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "= " + customThreadPool.isTerminating() + " isTerminated = "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + customThreadPool.isTerminated() + " isShutdown =" +&nbsp; &nbsp; &nbsp;customThreadPool.isShutdown());&nbsp; &nbsp; &nbsp; &nbsp; customThreadPool.shutdown();&nbsp; &nbsp; }&nbsp; &nbsp; static List<String> makeList() {&nbsp; &nbsp; &nbsp; &nbsp; return IntStream.range(0, MAX_SIZE)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .mapToObj(i -> makeString())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .collect(Collectors.toList());&nbsp; &nbsp; }&nbsp; &nbsp; static String makeString() {&nbsp; &nbsp; &nbsp; &nbsp; int targetStringLength = 10;&nbsp; &nbsp; &nbsp; &nbsp; Random random = new Random();&nbsp; &nbsp; &nbsp; &nbsp; StringBuilder buffer = new StringBuilder(targetStringLength);&nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i < targetStringLength; i++) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int randomLimitedInt = random.nextInt('z' - 'a' + 1) + 'a';&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; buffer.append((char) randomLimitedInt);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return buffer.toString();&nbsp; &nbsp; }&nbsp; &nbsp; static int toSeed(String s) {&nbsp; &nbsp; &nbsp; &nbsp; return s.chars().sum() / SPEED_UP;&nbsp; &nbsp; }&nbsp; &nbsp; static void process(String s) {&nbsp; &nbsp; &nbsp; &nbsp; long start = System.nanoTime();&nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TimeUnit.MILLISECONDS.sleep(toSeed(s));&nbsp; &nbsp; &nbsp; &nbsp; } catch (InterruptedException e) {&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; long end = System.nanoTime();&nbsp; &nbsp; &nbsp; &nbsp; QUEUE.offer(s + " slept for " + (end - start)/1000000 + " milliseconds");&nbsp; &nbsp; }}如果您sleep在接收端的呼叫应该模拟一些工作量而不是等待新项目,您也可以使用int counter = MAX_SIZE + 1;while (!future.isDone()) {&nbsp; &nbsp; String s = QUEUE.poll();&nbsp; &nbsp; if (s != null) {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(s);&nbsp; &nbsp; &nbsp; &nbsp; counter--;&nbsp; &nbsp; }&nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; TimeUnit.MILLISECONDS.sleep(1);&nbsp; &nbsp; } catch (InterruptedException e) {}}但逻辑没有改变。future.isDone()返回后true,我们必须重新检查队列中待处理的元素。我们只保证不会有新的项目到达,而不保证队列已经空了。作为旁注,该makeString()方法可以进一步改进static String makeString() {&nbsp; &nbsp; int targetStringLength = 10;&nbsp; &nbsp; ThreadLocalRandom random = ThreadLocalRandom.current();&nbsp; &nbsp; StringBuilder buffer = new StringBuilder(targetStringLength);&nbsp; &nbsp; for (int i = 0; i < targetStringLength; i++) {&nbsp; &nbsp; &nbsp; &nbsp; int randomLimitedInt = random.nextInt('a', 'z' + 1);&nbsp; &nbsp; &nbsp; &nbsp; buffer.append((char)randomLimitedInt);&nbsp; &nbsp; }&nbsp; &nbsp; return buffer.toString();}甚至static String makeString() {&nbsp; &nbsp; int targetStringLength = 10;&nbsp; &nbsp; return ThreadLocalRandom.current()&nbsp; &nbsp; &nbsp; &nbsp; .ints(targetStringLength, 'a', 'z'+1)&nbsp; &nbsp; &nbsp; &nbsp; .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)&nbsp; &nbsp; &nbsp; &nbsp; .toString();}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java