手记

CountDownLatch源码分析

CountDownLatch源码分析


简介

CountDownLatch,我们称之为闭锁或倒计时器,它允许一个或多个线程必须等待其他线程全部执行完之后才能执行。例如:部门周会,必须等所有人到齐才能开。其本身是基于AQS的共享模式实现,示例图如下:

方法摘要

method description
public void countDown() 在许可可用之前阻塞当前线程。
public void await() throws InterruptedException 阻塞式地等待,并且是响应中断的
public boolean await(long timeout, TimeUnit unit) throws InterruptedException 带超时等待,可响应中断

源码分析

private final Sync sync;

syncCountDownLatch的核心属性,它继承自AQS,同时重写了tryAcquireSharedtryReleaseShared,以完成具体的同步状态的获取与释放的逻辑。

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        /**
         * 构造方法里面用`count`值初始化`AQS`里的`state`,来表示计数器的总数
         */
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
        
        /**
         * state为0表示尝试获取同步状态成功,其他情况均为失败
         * 没有使用入参
         */
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        /**
         * 每调用一次,state减1,直到state值为0表示释放同步状态成功
         * 自旋是为了防止CAS失败,没有使用入参
         */
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

countDown方法

    public void countDown() {
        //每次
        sync.releaseShared(1);
    }

await方法

    public void await() throws InterruptedException {
        //调用可中断的获取同步状态方法
        sync.acquireSharedInterruptibly(1);
    }
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        //调用可超时的获取同步状态的方法
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

示例

模拟一下刚开始那张图的场景:

    public static void main(String[] args) {

        CountDownLatch latch = new CountDownLatch(3);
        //新建三个线程
        for (int i = 1; i < 4; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            },"r" + i).start();
        }


        for (int i = 1; i < 4; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "正在await" );
                    latch.await();
                    System.out.println(Thread.currentThread().getName() + "从await中返回" );
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },"t" + i).start();
        }

    }

结果

分析

刚开始,state的初始值为3,此时t1t2t3调用await(假设按照这个顺序,实际情况可能入队顺序不一样),await调用acquireSharedInterruptibly,接着调用tryAcquireShared,因为此时state值肯定不为0(r1,r2,r3三个线程执行3s),所以tryAcquireShared返回-1,于是执行doAcquireSharedInterruptibly,然后调用 parkAndCheckInterrupt被挂起,此时t1t2t3均加入同步队列,且对应的前驱waitStatus值置为-1,等待被唤醒。

然后假设此时r1调用countDown,然后调用releaseShared,接着调用tryReleaseSharedstate值-1变为2,不等于0,返回false,直到r3线程调用countDown完毕之后,state变为0,返回true,此时调用doReleaseShared,然后调用unparkSuccessor唤醒t1,最后相继唤醒t2t3

0人推荐
随时随地看视频
慕课网APP