手记

JDK8源码分析之AbstractQueuedSynchronizer

前言

源码分析我认为主要有两个作用:满足好奇心,我想每一个有追求的人都不会满足于仅仅做一个API Caller实现功能就好,我们也想知道它到底是怎么实现的;借鉴与升华,当我们明白了一个类的设计原理,在一定的情境下我们可以借鉴其设计哲学,甚至针对我们自己特殊的业务场景对其进行改良与优化。

下面我就以这篇文章开启我的源码阅读之旅。总体而言,我会从这个类基本结构入手,然后分析原理,再看看已有的应用,并进行分析与理解。

我之前一篇文章里提到过java的显示锁ReentrantLock。此外,如果你编写过并发程序那你一般也应该用过CountDownLatch,Semaphore等等,这些都是同步器,而它们都基于AbstractQueuedSynchronizer(简称AQS)实现的,那么我们今天就来看看这个牛逼的AQS是怎么实现这么多功能的。

首先打开IDEA,随便新建一个类,然后输入CountDownLatch,在它上面敲下Ctrl+B,就打开了CountDownLatch的源码,然后发现有一个非常重要的静态内部类Sync继承了AbstractQueuedSynchronizer,再次Ctrl+B,我们就打开了AQS的源码,马上就可以解开它的神秘面纱了,哼哼。

映入眼帘的首先就是大段大段的文档,大意就是这个类 提供了一个基于FIFO队列的实现了阻塞锁和相关同步器(信号量,事件等)的框架...... 读完了大概就了解这个类到底是怎么工作的了。下面我们开始分类型研究源码,当然不可能全部分析一遍,这里只把重点的列出来。
实际代码分析中,我一般先看看这个结构图:

然后读一读开始的综述文档,然后从实例开始,像方法调用那样依次深入查看,就能依次看到相关的方法、内部类和属性,还是Ctrl+B大法好啊,这属于自底向上的源码分析方法。如果直接从上面那张图开始,对属性、方法、内部类挨个分析就属于自顶向下的分析法了。我觉得对一个陌生的东西要想有清晰的认知最好先自底向上捋一遍,便于搞清楚一个个具体功能的实现机制,然后再自顶向下看一遍,便于把控整体架构,宏观把握。这样走两遍再来总结一下就能比较透彻的掌握该技术了。

一、方法与属性

方法中,protected类型的一般要求具体的同步器子类来实现但是有些也可以直接用,public类型一般都是可以直接使用的当然也可以自己实现,private就是AQS自己的内部实现了,与具体子类无关。

state相关

一个private volatile int state;属性代表了线程之间争用的资源。与之相关的方法有三个

protected final int getState()
protected final void setState(int newState)
protected final boolean compareAndSetState(int expect, int update)//CAS原子性地修改state

都是protected类型,可见我们可以进行Override,来定义state的获取与释放从而实现我们自定义的同步器。非常简单就不把全部源码摆出来了。

同步队列queue相关

这个queue是一个FIFO的队列,每个节点都是下面的内部类Node类型,等待着state这个资源,主要由两个属性决定private transient volatile Node head;private transient volatile Node tail; 与之相关的方法有:

// 节点node进入队列,采用CAS的方式,返回其前驱
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列为空,先初始化
            if (compareAndSetHead(new Node()))//设置头结点
                tail = head;
        } else {// 队列不为空
            node.prev = t;// 插入节点至队列尾部
            if (compareAndSetTail(t, node)) {//CAS修改队尾为node,之所以CAS是因为可能有多个线程争相入队
                t.next = node;
                return t;
            }
        }
    }
}
// 将当前线程以mode的方式(EXCLUSIVE或者SHARED)构成新节点并入队,返回这个新节点
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 更快的入队方式,如果失败再采用较慢的标准入队方式enq
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
// 把node设置为新的头,老的头出队
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

资源获取与释放相关

资源获取分为EXCLUSIVESHARED两种模式,对应acquirereleaseacquireSharedreleaseShared

首先是EXCLUSIVE资源获取:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这里tryAcquire需要继承类自己实现(成功true,失败false),如果tryAcquire成功则直接返回,否则addWaiter将当前线程以独占节点的方式置于同步队列尾部等待。acquireQueued使得该节点等待获取资源,一直获取到资源才返回,整个等待过程中如果有中断是不响应的,但是获取资源后会用selfInterrupt补上。

// 节点获得资源才能返回否则一直自旋,中断该线程不会实时响应,但是如果被中断过会返回true,否则返回false
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {// node前驱是头结点,那么便可以尝试去获取资源了
                setHead(node);// 获取成功,可以把node设为头结点,也就是说头结点是独占资源的唯一拥有者
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 走到这里说明获取失败,检查是否应该阻塞和中断
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);//如果失败了,就把waitStatus置为CANCELLED表示取消了
    }
}

// 获取资源失败后,当前节点是否应该阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)// 前驱pred获得资源后会通知当前节点node,所以可以放心的阻塞了(waitStatus会在下面内部类解释)
        return true;
    if (ws > 0) {// 前驱取消了资源获取,那么当前节点就要找到前面最近一个正在等待的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;//此处 pred.waitStatus < 0,亦即pred 还在等待尝试获取资源
    } else {// 前驱正在等待,则设置其状态为SIGNAL,让他获取资源后通知本节点,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        // 但是本节点不能马上阻塞,因为设置不一定能成功,需要下次再次检查
    }
    return false;
}

// 阻塞本线程。被唤醒后要返回本线程是否被中断过。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

然后是EXCLUSIVE资源释放:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

与上面相对应,这里tryRelease也需要继承类自己实现(成功true,失败false),如果释放成功,则调用unparkSuccessor唤醒后继节点返回true,否则返回false。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)// 可能需要释放通知信号,把状态置零,允许失败
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;// 找到后继节点
    if (s == null || s.waitStatus > 0) {// 如果后继节点为空或者已经取消
        s = null;// 确保该节点的释放
        for (Node t = tail; t != null && t != node; t = t.prev)// 从队尾开始找到需要通知的最近的后继节点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)// 如果需唤醒的后继节点存在则唤醒之
        LockSupport.unpark(s.thread);
}

再看SHARED资源获取:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

这里tryAcquireShared也需要自己实现(负值说明失败,非负值表示获取成功后剩下的可用资源数),如果获取失败就调用doAcquireShared进入同步队列等待。

// 等待获取共享资源时不响应中断,但是获取资源成功后会用selfInterrupt补上
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);// 入队尾
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {// 处于队列第二个位置,可以尝试获取资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {// 获取成功
                    setHeadAndPropagate(node, r);// 将自己设为队列头,并唤醒可能获取资源的后面几个节点
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 同acquireQueued的分析
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 旧的头
    setHead(node); // 设置新的头
    // 如果还有资源,则唤醒下一个,采用保守策略,多唤醒几次即使没获取到资源也无所谓,尽量做到不漏掉资源
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {        
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

最后SHARED资源释放:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

这里tryReleaseShared依然要自己实现(如果可以允许下一个节点获得资源则返回true,否则false),如果释放成功则调用doReleaseShared唤醒后继节点。需要注意的是tryReleaseShared由于可能多个线程并发操作所以一般需要CAS而tryRelease不需要。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {// 需要唤醒
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//设置WaitStatus失败
                    continue;            
                unparkSuccessor(h);// 一定要设置成功才唤醒
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;// CAS设置失败则继续循环
        }
        if (h == head)// 头变了,不需要继续唤醒
            break;
    }
}

此外,资源获取除了一直等待的方式之外还有对应的限制等待时间的方法如tryAcquiretryAcquireNanos,不必多言,释放就只有一直等而没有限制等待时间的了。也有响应中断与不响应的对应,如acquireInterruptiblyacquire,差别不大,不必多言。

二、内部类

Node

等待队列的节点类,等待队列是CLH(Craig,Landin,Hagersten)锁队列的一种变体,CLH锁通常用来作为自旋锁。

每个节点主要维护了下面一些状态

  • 对应的线程thread
  • 等待状态waitStatus 含,0:初始状态;CANCELLED 1:被取消;SIGNAL -1:当前线程释放资源或取消后需要唤醒后继节点;CONDITION -2:条件等待;PROPAGATE -3:下一个acquireShared操作应该被无条件传播。实际使用中,一般只关注正负,非负数就意味着节点不需要释放信号
  • 资源获取模式有SHARED(默认)和EXCLUSIVE两个
  • 同步队列中的前驱后继节点prevnext
  • 作为同步队列节点时,nextWaiter有:EXCLUSIVESHARED标识当前节点是独占模式还是共享模式;与ConditionObject搭配使用作为条件等待队列节点时,nextWaiter保存后继节点。所以实际上这个Node类是被复用了,既用于同步队列,也用于条件等待队列

ConditionObject

这个类实现了Condition接口,主要用来完成常见的条件等待、唤醒等操作。一个ConditionObject 包含一个等待队列,由firstWaiterlastWaiter决定。当前线程调用Condition.await()方法时,会被构造成为节点,然后置于条件等待队列队尾。
我们看最常用的条件等待方法

    // 条件等待,响应中断
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();// 响应中断
        Node node = addConditionWaiter();// 当前线程加入条件等待队列
        int savedState = fullyRelease(node);// 释放资源,并获得本节点需要的资源数,以便再次获取
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {// 如果当前节点不是在同步队列,也就是还在等待队列等待着条件发生
            LockSupport.park(this);// 就会一直阻塞
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//加入同步队列等待获取资源,成功后要检查中断响应
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 如果最后一个条件等待节点是取消的状态
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();// 清理整个链路的无效节点
            t = lastWaiter;
        }
        //以条件等待的方式将当前线程封装成节点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)//条件等待队列为空就初始化
            firstWaiter = node;
        else// 队列不空,插入队尾
            t.nextWaiter = node;
        lastWaiter = node;
        return node;// 返回新插入的节点
    }

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();// 本节点需要的资源数
            if (release(savedState)) {// 释放掉这么多资源
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

然后是信号方法:

    public final void signal() {
        if (!isHeldExclusively())// 要使用该方法必须先是独占线程
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);//找到第一个条件等待节点,并发出信号
    }

    // 去掉条件等待队列的节点,直到遇上没取消的或者空节点
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }

    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))// 节点已被取消
            return false;
        Node p = enq(node);// 条件等待队列的第一个节点被加入同步队列的队尾
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);// 唤醒节点对应线程
        return true;
    }
三、已有应用分析

下面用两个例子来看看AQS的具体使用场景,分别是使用独占模式的ReentrantLock和共享模式的CountDownLatch
一般使用AQS的类,都会用一个内部类Sync来继承AQS,并实现那几个protected的方法。

ReentrantLock

ReentrantLockFairSyncNonfairSync两个类来实现公平锁和非公平锁,我们看非公平锁,主要几个方法是

  • lock(),使得NonfairSync调用compareAndSetState把state从0设为1并用setExclusiveOwnerThread把当前线程设为独占线程(亦即首次获得锁),如果失败则使用acquire(1)调用nonfairTryAcquire。总体流程就是如果state为0,那么就是本线程首次获得锁,把state置为1,否则如果当前线程是独占线程则将state+1(这也是锁可重入的关键),如果都不是就进入acquireQueued流程等待获得锁了了
  • unlock(),调用AQS的release(1)方法,实际上是调用了Sync的tryRelease(1)方法,如果state-1为0,那么返回true,否则返回false。也就是说,重入锁必须释放够重入次数才算真正释放成功,但是unlock()方法本身不会管这个最终结果,只管释放
  • tryLock(),与lock()区别是不等待,立即返回,只有唤醒时就是独占线程才能返回true,实现方法是nonfairTryAcquire
  • newCondition()直接返回了了AQS的内部类ConditionObject
  • isLocked() 如果state为0则表示未加锁返回false,否则返回true

CountDownLatch

CountDownLatch 主要几个方法是

  • CountDownLatch(int count),构造方法,设置 AQS 的 state 为 count
  • await(),调用 AQS 的 acquireSharedInterruptibly(int arg) 方法,然后调用自己覆盖的tryAcquireShared(int acquires)来获得state的值是否为0,如果是0就结束等待直接返回了,如果不是0就调用 AQS 的 doAcquireSharedInterruptibly(int arg)方法,该方法会循环等待,直到state为0才返回或者被中断。
  • countDown(),调用 AQS 的 releaseShared(int arg) 方法,实际上是调用了自己覆盖的 tryReleaseShared(int releases) 方法,把 state 减了1,如果此时state为0,则调用 AQS 的doReleaseShared()方法

分析

总体而言,AQS提供了一个模板方法模式,将获得锁释放锁一些必要的流程操作都规定好了,我们只需要填充一些具体的获得与释放方法

  • getState(),setState(int newState),compareAndSetState(int expect,int update):是资源相关操作,保证原子性
  • tryAcquire(int arg):尝试独占获取资源。成功返回true,失败返回false。
  • tryRelease(int arg):尝试独占释放资源。成功返回true,失败返回false。
  • tryAcquireShared(int arg):尝试共享获取资源。负数表示失败,非负数表示成功代表剩余可用资源
  • tryReleaseShared(int arg):尝试共享释放资源。如果释放后可以唤醒后续等待结点返回true,否则返回false。
  • isHeldExclusively():代表当前线程是否独占资源,只有用到Condition之时才需要去实现它。

自定义同步器时,一般都是自己写一个 static class Sync extends AbstractQueuedSynchronizer 静态内部类来实现具体的方法。

阅读原文:MageekChiu

5人推荐
随时随地看视频
慕课网APP