手记

利用forkjoin写快排

java7提供的forkjoinpool可以有很好的方式解决可以拆解的任务。更好的利用多核的特性,目前看到的例子都不是特别明显的分治法的。下面就尝试写一个比较经典的场景,快速排序。
快排的过程这里就不详细叙述了。
我们首先选择使用哪种task,因为快排不需要给返回值。我们这里就继承RecursiveAction。
利用forkjoin的能力,我们主要的任务就分拆每个task要做的事情。
快排我们可以拆出的任务主要是在每个分到的数组区间内,把数字小的放在一边,数字大的放在另外一边。然后再生成任务,循环的去执行。直到任务不能再拆解了,说明排序也就结束了。
我们可以了解到任务主要是数组的边界和数组组成。因为是多线程执行,例如区分出左边和右边之后,可以并发的去左边的继续拆分,右边的继续拆分。这里就可以做并发的任务操作了。

    private int start;
    private int end;
    private AtomicIntegerArray atomicIntegerArray;

这里会涉及多线程操作数组,所以使用AtomicIntegerArray来进行数组的操作。
拿到任务的时候就可以先计算拆分的边界,然后再继续做任务的拆分。

    @Override
    protected void compute() {
        if (start < end) {
            int adjust = adjust();
            QuickSortTask quickSortTask1 = new QuickSortTask(start, adjust - 1, atomicIntegerArray);
            quickSortTask1.fork();
            QuickSortTask quickSortTask2 = new QuickSortTask(adjust + 1, end, atomicIntegerArray);
            quickSortTask2.fork();
            quickSortTask1.join();
            quickSortTask2.join();

        }
    }

fork之后用join表示任务得执行完。父任务要等子任务执行完才算真的结束了。但是这里的join不同于线程的join,这里就先不细讲了。
adjust的部分就是我们熟悉的快排的核心操作。进行左右的排序。

   private int adjust() {
        int i = start;
        int j = end;
        int mid = atomicIntegerArray.get(i);
        while (i < j) {
            while (i < j && atomicIntegerArray.get(j) > mid) {
                j--;
            }
            if (i < j) {
                atomicIntegerArray.set(i, atomicIntegerArray.get(j));
                i++;
            }
            while (i < j && atomicIntegerArray.get(i) < mid) {
                i++;
            }
            if (i < j) {
                atomicIntegerArray.set(j, atomicIntegerArray.get(i));
                j--;
            }
        }
        atomicIntegerArray.set(i, mid);
        return i;
    }

这里有两种方法提交forkjoinpool的任务。
第一种是直接提交。

        int[] datas = new int[]{20, 100, 30, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(datas);
        QuickSortTask quickSortTask = new QuickSortTask(0, datas.length - 1, atomicIntegerArray);
        ForkJoinPool forkJoinPool = new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                        null, true);
        forkJoinPool.invoke(quickSortTask);
        quickSortTask.join();

这种就是先建立一个自己的forkJoinPool。然后把任务提交进去。
还有一种是直接用任务的fork方法。

        quickSortTask.fork();

就是在forkjoinpool的静态方法里,会默认创建一个forkjoinpool>

        common = AccessController.doPrivileged(new PrivilegedAction<>() {
            public ForkJoinPool run() {
                return new ForkJoinPool((byte)0); }});

所以只要任务默认调用fork的都会在这个common池里运行。
这个案例就比较明显了,每个任务之间如何继续拆分子任务,然后如何再次合并起来。我们可以更关注每个任务,然后处理好多线程的并发问题。相当于我们自己写的例如递归或者循环程序。只要保证每个执行的任务是并发安全的,那么就可以直接使用forkjoinpool来完成。

0人推荐
随时随地看视频
慕课网APP