手记

串讲AbstractQueuedSynchronizer及JUC工具类

Concurrent包下有许多工具类,包括共享语义和独占语义的,他们都是以AbstractQueuedSynchronizer(以下简称AQS)为基础构建的,在类内部都包含了一个继承AQS的类。


这样的代码在concurrent包下绝不少见

AQS在JDK中的子类

AbstractQueuedSynchronizer


我们先谈一下AQS这个类,其中我认为最重要的一个变量如下

  • synchronization state
    private volatile int state;
    为什么会有共享和独占类的区分?所谓共享,就是同时可以有n个线程一起访问,当n==1时,“共享”也就变成了“独占”,那么这样来说,共享和独占之间的界限并不是不可逾越的。

上面我所说的也就是我对state这个变量含义的理解。为什么ReentrantLock和Semaphore可以由同一个基类去实现(这两个类可以互相实现),甚至用的是同一个状态变量?原因就在于此。

如何看待、处理这个state值的变化,造就了五花八门的工具类。

显然,AQS内部还需要维护了一个队列,毕竟你管并发,那么多线程,总要排个队把。

static final class Node {        volatile int waitStatus;         //表示该节点的等待状态,如下
        static final int CANCELLED =  1; //该节点对应的线程已经取消
        static final int SIGNAL    = -1; //要唤醒的后继结点
        static final int CONDITION = -2; //该节点的线程正在condition队列中
        static final int PROPAGATE = -3; //共享模式下会一直向后传播“唤醒”的动作

        volatile Node prev;        volatile Node next;        volatile Thread thread;
    }

AQS所用的队列是CLH队列,这个队列也叫做syn队列,只有申请资源发现资源不够的线程会加入这个syn队列(比如Lock.lock()获取锁失败),不用太关注这个队列的实现的细节,这是一个双向链表实现的FIFO队列,某一节点的状态(自旋,挂起,唤醒)都由自己的前驱结点的状态决定,这也是waitStatus的意义所在。

入队出队时挂起线程、唤醒线程都借助的是LockSupport这个辅助类,这个类最关键的两个方法就是park和unpark方法,也就是挂起和唤醒线程,这个类在这就不多展开了。

然而在AQS中,还有一个内部类,也引用了这个Node类,也就是说还有一个队列

Condition也维护了一个队列

AQS内部为了两个队列,一个就是我们上面提到过的syn队列,一个就是这个condition队列了,任何线程要么获取了资源,要么就在syn队列,要么就在condition队列,用一个很简答的例子展示下

public class DEMO {    private static ReentrantLock lock =new ReentrantLock();    private static Condition condition=lock.newCondition();    public static void main(String[] args) throws InterruptedException {
        Thread t1=new Thread(new Runnable() {            @Override
            public void run() {
                 lock.lock();         //1.线程1获取锁成功
                try {
                    condition.await();//3.线程1释放锁,并且加入condition队列,此时syn队列头节点即线程2被唤醒
                                      //6.线程1被唤醒继续执行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        });
        t1.start();
        Thread.sleep(1000);
        Thread t2=new Thread(new Runnable() {            @Override
            public void run() {                try{
                    lock.lock();       //2.线程2申请获取锁资源,失败,加入syn队列
                    condition.signal();//4.线程2唤醒condition队列中的头节点即线程1,线程1进入syn队列
                }finally {
                    lock.unlock();     //5.线程2释放锁资源,唤醒syn队列头节点
                }
            }
        });
        t2.start();
    }

其实这两个队列都很好理解,syn队列装的是要去争取某个资源的线程,condition队列装的都是等待condition.signal的线程。

关于线程在两个队列之间的切换及condition方法的一些细节就不在这里多展开了,有兴趣的读者可以自己在ide里面阅读ConditionObject这个类的方法,非常好懂,直接可以从名字的语义读出这个类方法的思路。在这里我就想谈一下在队列内部关于自旋还是挂起的一些细节。

获取资源的入口都是acquire方法

请求资源的统一入口

tryAcquire方法由子类实现,由逻辑短路特性得知,只有在tryAcuqire失败后,后面的acquQueued才会执行,由名字也能看出来,这个方法是判断是否要入队的。

final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {//在无限循环里不停判断是否要挂起,其实就是自旋等待直至挂起,在这一串的调用链里面很常见
                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;                    return interrupted;
                }                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//这!!!!
                    interrupted = true;
            }
        } finally {            if (failed)
                cancelAcquire(node);
        }
    }
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

单独把这一行逻辑拿出来,这一行代码就是判断挂起时机的关键,同样因为逻辑表达式的短路特性,只有第一个判断该节点确实要挂起,才会执行第二个park操作,我们看下第一个表达式

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        int ws = pred.waitStatus;        if (ws == Node.SIGNAL)            return true; //因为前驱结点是SIGNAL,所以后续节点可以放心挂起
        if (ws > 0) {//ws>0代表前驱结点已被取消,不断向前移动跳过这类节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {//代表前驱结点状态为0或者为PROPAGATE态
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //我们设置前驱结点的状态为SIGNAL,下一次访问的时候再挂起。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }        return false;
    }

梳理下逻辑,就是如果当前节点的前驱结点是SIGNAL状态,那么我可以安心的挂起,但是如果是其他状态那么就要下一次再尝试了。而尝试的过程,按我的理解,类似于自旋吧,就是不急于把自己挂起,特别是在上述方法的else分支,体现的淋漓尽致。
parkAndCheckInterrupt()方法比较简单,就不多说了

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);        return Thread.interrupted();
    }

关于资源的释放也可以对比着分析,和获取的过程是十分相似的,所有资源的释放调用链都是从release()开始,再到tryRelease(),释放时也是对节点的waitStatus进行判断,最后也还是借助LockSupport,为了避免文章冗余,在这就不多重复。

这就是关于AQS这个基础类本身我想细讲的全部了,下面就分析下以这个类为基础,Concurrent包下的一些工具类。

基于AbstractQueuedSynchronizer的共享独占工具类


所有的工具类,都是在内部实现一个syn类去继承AQS,并实现一些方法,此外所有资源的请求(如Lock.lock()或Semaphore.acquire())都是以AQS的acquire系列方法为入口的,共享类走acquireShared(),独占类走acquire()

独占类入口

共享类入口


显然他们下一步都是调用tryXXX方法,但是在AQS里面,这两个方法都是直接抛异常

这也是AQS精心设计所在,既然要以AQS为基础实现共享独占类,那么子类就要去实现这两个方法之一,为什么不用abstract方法呢?因为AQS要作为两种类的基类,如果用abstract方法,那么当我实现某一功能的时候,这两个方法都要实现,大大降低了代码的可读性,也毫无意义。

关于释放,和获取如出一辙


这里我先提纲挈领的描述了下子类实现以AQS为工具类的入口和出口方法,下面我们通过具体的工具类来分析。

ReentrantLock && ReentrantReadWriteLock(独占类分析)


  • ReentrantLock
    从独占类开始分析,就拿ReentrantLock开刀吧,先分析公平模式(其实就是开头有点差异,后面基本差不多),这个类也是大家最常见的类之一

    lock.lock();    
    try {         //do something as you want
     } catch (InterruptedException e) {         //deal with the Exception
     }finally {
        lock.unlock();
     }

这几乎及时ReentrantLock的模板使用方法,我们着重分析下lock和unlock方法到底发生了什么

第一步调用内部syn类的lock方法

第二步就走acquire入口了

第三步就是tryAcquire了

我们就看看ReentrantLock自己实现的tryAcquire发生了什么

两个syn都实现了这两个方法

 protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();//如何看待处理这个state决定了这是一个什么样的工具类
            if (c == 0) {      //若当前资源无人占有    
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);//那么就占为己有吧
                    return true;//返回true表示资源申请获取成功
                }
            }            else if (current == getExclusiveOwnerThread()) {//这个分支处理重入的情况
                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");
                setState(nextc);                return true;
            }            return false;//代表资源已经被别人占有了,返回失败
        }

又是这张图,眼熟吧


如果刚刚的tryAcquire成功了,那么也没后面acquireQueued的事了,如果失败了,那么就要进syn队列了,acquireQueued方法我们前面也花大篇幅分析了,这就是lock方法,其实很简单吧?

我们再看下unlock方法

老生常谈了 第一步调用syn的release方法

关键就是tryRelease方法

protected final boolean tryRelease(int releases) {            int c = getState() - releases; //将state减去1,意为释放一份资源
            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {//如果减掉了1以后state为0说明现在没人占有资源
                free = true;
                setExclusiveOwnerThread(null);//释放当前线程
            }
            setState(c);            return free;//返回释放资源成功
        }

回到release方法

public final boolean release(int arg) {        if (tryRelease(arg)) {//如果释放成功了就在A处唤醒后面等待的线程节点
            Node h = head;            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//A
            return true;
        }        return false;//否则不作为
    }

ReentrantLock基于AQS的实现的思想就是对于state状态,看作持有锁的对象的数目,如果是0那么别人可以去持有这个锁,不然就是挂起等待,释放的时候如果state变为0,就去syn队列唤醒等待者,不然就是什么也不做。ReentrantLock中还有可中断的lockInterruptibly()以及带超市的tryAcquire方法,其实本质上就是多了对线程中断的检测和对执行时间的检测,在这里不重复描述了,有心的读者可以自己在源码中查看,和上述的方法几乎没什么差异,主要逻辑都是相同的。

对于ReentrantLock最后要提的就是公平非公平模式,非公平模式的性能较高,因为线程被唤醒和实际获得执行权中间是有较大的时间差的,非公平模式可以利用这段时间。

非公平模式的lock方法多了一个分支去抢占

ReentrantLock的分析也就到此为止,我们再看看ReentrantReadWriteLock

  • ReentrantReadWriteLock

对于这个类有一点要解释的及时他并没有同时实现共享独占的入口方法,而是内部实现了一个ReadLock和一个WriteLock,如图所示



这个类的读锁是共享锁,写锁是独占锁,共享锁我们后文会举例子,排它锁和前面分析的如出一辙,这里我想讲的就是这个ReentrantReadWriteLock类的锁降级特性,这个锁支持降级但不支持升级,至于原因也很好理解,写锁降级成读锁,大家一起进来也没事,但是如果正在读的多个锁突然给升级成写锁,也就是由共享模式变成了独占模式,那岂不是乱了套?到底排谁?死锁?至于降级,我这给出官方的例子,一看就懂


class CachedData {
    Object data;    volatile boolean cacheValid;    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();    void processCachedData() {
        rwl.readLock().lock();        if (!cacheValid) { //如果缓存没有失效,那么只需要读锁就行,可并行访问
            // Must release read lock before acquiring write lock
            rwl.readLock().unlock();
            rwl.writeLock().lock();//如果失效了,那么就要用写锁防止多个线程同时写
            try {                //这里再检查下,因为有可能其他线程已经先下手了
                if (!cacheValid) {
                    data = ...
                    cacheValid = true;
                }                // 锁降级,先上读锁
                rwl.readLock().lock();
            } finally {//最后把写锁放了
                rwl.writeLock().unlock(); // Unlock write, still hold read
            }
        }        //原则:锁交替时,读锁必须先放了自己才能上写锁,写锁可以先上读锁后再放了自己
        try {
            use(data);
        } finally {
            rwl.readLock().unlock();
        }
    }
}

当然还有用读写锁包装数据结构的

class RWDictionary {    private final Map<String, Data> m = new TreeMap<String, Data>();    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();    private final Lock r = rwl.readLock();    private final Lock w = rwl.writeLock();    public Data get(String key) {
        r.lock();        try { return m.get(key); }        finally { r.unlock(); }
    }    public String[] allKeys() {
        r.lock();        try { return m.keySet().toArray(); }        finally { r.unlock(); }
    }    public Data put(String key, Data value) {
        w.lock();        try { return m.put(key, value); }        finally { w.unlock(); }
    }    public void clear() {
        w.lock();        try { m.clear(); }        finally { w.unlock(); }
    }
}

这样就包装了一个支持并发的有序Map,相比于直接用Collections的同步包装方法,这样做能支持更大的吞吐量。

独占类的分析我们到此为止。

Semaphore && CountDownLatch(共享类分析)


关于共享类,重点就分析这两者,以我的理解,这两者真的是孪生兄弟一样,Semaphore两个方法acquire和release,CountDownLatch两个方法await和countdown,真的是一一对应的

  • Semaphore的acquire方法去查看state是否大于0,如果大于0就进入critical region执行代码,不然就挂起,CountDownLatch的await方法检测state是否为0,是的话走你,不是的话挂起

  • Semphore的release方法对state++,CountDownLatch的countdown对state--,这两个类,在我看来,真的,肯定是同卵的。

随便挑一个分析下吧,我们这就挑CountDownLatch分析,Semphore和操作系统里面学的的信号量是同一个东西,acquire和release就是PV操作

还是和前文分析独占类一样的方法,入口方法是acquireShared,然后会走tryXXX方法,最后出口是releaseShared,里面肯定有一层tryReleaseShared。我们来看看是不是把。

不多说了,你懂的

嘿嘿

第三步走到tryXXX

这里的实现非常简单,以为CountDownLatch就是这样一个类,只有countdown够次数了,acquire才算成功,代码才能走下去,不然就走doAcquireShared方法,对,又要贴那段看了好几次的代码了。

//这段代码估计都看腻了吧private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;                        return;
                    }
                }              //没错,又是这里。。没看懂的可以看上半文的分析
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())                    throw new InterruptedException();
            }
        } finally {            if (failed)
                cancelAcquire(node);
        }
    }

acquire说完了,看看release吧,也很简单,步骤我不重复,就是那几步,我本来背不下来的,写到这,真的,我可以倒着背了。

来,你说,我不说了

这结构的代码看了不知道多少遍了

CountDownLatch的try方法很好理解,就是通过CAS把state--

如果state已经是0了,也就意味着countdown次数够了,那么就要走doReleaseShared方法了

private void doReleaseShared() {       //主要的逻辑就是依次唤醒syn队列里面等待的线程,也就是那个await的线程
        for (;;) {
            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS
            }            if (h == head)                   // loop if head changed
                break;
        }
    }

CountDownLatch算是讲完了,开头我就说Semaphore和他是双胞胎,读者可以自行比对着看,几乎就是一样的,对于基于AQS实现的类,还是我强调的那句话,你怎么看state,他就是个怎么样的工具类。我用英文阐述我的观点可能更好理解

***How you see the state makes what it is ***

不知道你们能不能理解,我尽力啦,因为水平有限,如有错误,希望能指出,希望能多多交流。如果大家有什么问题,源码永远是最好的帮手。

参考:



作者:静宜君
链接:https://www.jianshu.com/p/6968ba6b473d

0人推荐
随时随地看视频
慕课网APP