手记

java基础之AQS

Java开发中,我们的应用程序经常会使用多线程提高程序的运行效率,多线程情况下访问线程共享变量可能会带来并发问题,此时就需要并发锁解决并发问题。Java提供了两种类型的并发控制机制:synchonrized关键字和AQS框架,二者各有优势,不过在加锁解锁场景比较灵活的情况下,我们往往会采用AQS框架来解决并发问题。本文会对Java中的AQS框架的结构和源码进行简单介绍。

AQS结构

AQS的全称是AbstractQueuedSynchronizer(抽象的队列式的同步器),AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch等。

如下图所示,AQS主要包含两部分内容:共享资源和等待队列。AQS底层已经对这两部分内容提供了很多方法。

  1. 共享资源:共享资源是一个volatile的int类型变量。
  2. 等待队列:等待队列是一个线程安全的队列,当线程拿不到锁时,会被park并放入队列。
  3. 新线程:非公平情况下,新线程会先尝试直接获取资源,获取不到才进入队列。

核心思想

同步器的核心方法是acquire和release操作,其背后的思想也比较简洁明确。

acquire操作是这样的:

// acquire操作
while (当前同步器的状态不允许获取操作) {
    如果当前线程不在队列中,则将其插入队列
    阻塞当前线程
}
如果线程位于队列中,则将其移出队列

release操作是这样的:

更新同步器的状态
if (新的状态允许某个被阻塞的线程获取成功)
    解除队列中一个或多个线程的阻塞状态

从这两个操作中的思想中我们可以提取出三大关键操作:同步器的状态变更、线程阻塞和释放、插入和移出队列。所以为了实现这两个操作,需要协调三大关键操作引申出来的三个基本组件:

  • 同步器状态的原子性管理;
  • 线程阻塞与解除阻塞;
  • 队列的管理;

同步器状态的原子性管理

AQS类使用单个int(32位)来保存同步状态,并暴露出getState、setState以及compareAndSet操作来读取和更新这个同步状态。其中属性state被声明为volatile,并且通过使用CAS指令来实现compareAndSetState,使得当且仅当同步状态拥有一个一致的期望值的时候,才会被原子地设置成新值,这样就达到了同步状态的原子性管理,确保了同步状态的原子性、可见性和有序性。

线程阻塞与解除阻塞

直到JSR166,阻塞线程和解除线程阻塞都是基于Java的内置管程,没有其它非基于Java内置管程的API可以用来达到阻塞线程和解除线程阻塞。唯一可以选择的是Thread.suspend和Thread.resume,但是它们都有无法解决的竞态问题,所以也没法用,目前该方法基本已被抛弃。具体不能用的原因可以官方给出的答复。

j.u.c.locks包提供了LockSupport类来解决这个问题。方法LockSupport.park阻塞当前线程直到有个LockSupport.unpark方法被调用。unpark的调用是没有被计数的,因此在一个park调用前多次调用unpark方法只会解除一个park操作。另外,它们作用于每个线程而不是每个同步器。一个线程在一个新的同步器上调用park操作可能会立即返回,因为在此之前可以有多余的unpark操作。但是,在缺少一个unpark操作时,下一次调用park就会阻塞。虽然可以显式地取消多余的unpark调用,但并不值得这样做。在需要的时候多次调用park会更高效。park方法同样支持可选的相对或绝对的超时设置,以及与JVM的Thread.interrupt结合 ,可通过中断来unpark一个线程。

队列的管理

整个框架的核心就是如何管理线程阻塞队列,该队列是严格的FIFO队列,因此不支持线程优先级的同步。同步队列的最佳选择是自身没有使用底层锁来构造的非阻塞数据结构,业界主要有两种选择,一种是MCS锁,另一种是CLH锁。其中CLH一般用于自旋,但是相比MCS,CLH更容易实现取消和超时,所以同步队列选择了CLH作为实现的基础。

CLH队列实际并不那么像队列,它的出队和入队与实际的业务使用场景密切相关。它是一个链表队列,通过AQS的两个字段head(头节点)和tail(尾节点)来存取,这两个字段是volatile类型,初始化的时候都指向了一个空节点。

条件队列

上一节的队列其实是AQS的同步队列,这一节的队列是条件队列,队列的管理除了有同步队列,还有条件队列。AQS只有一个同步队列,但是可以有多个条件队列。AQS框架提供了一个ConditionObject类,给维护独占同步的类以及实现Lock接口的类使用。

ConditionObject类实现了Condition接口,Condition接口提供了类似Object管程式的方法,如await、signal和signalAll操作,还扩展了带有超时、检测和监控的方法。ConditionObject类有效地将条件与其它同步操作结合到了一起。该类只支持Java风格的管程访问规则,这些规则中,当且仅当当前线程持有锁且要操作的条件(condition)属于该锁时,条件操作才是合法的。这样,一个ConditionObject关联到一个ReentrantLock上就表现的跟内置的管程(通过Object.wait等)一样了。两者的不同仅仅在于方法的名称、额外的功能以及用户可以为每个锁声明多个条件。

ConditionObject类和AQS共用了内部节点,有自己单独的条件队列。signal操作是通过将节点从条件队列转移到同步队列中来实现的,没有必要在需要唤醒的线程重新获取到锁之前将其唤醒。

源码分析

我们主要通过独占式同步状态的获取和释放、共享式同步状态的获取和释放来看下AQS是如何实现的。

独占式同步状态的获取

独占式同步状态调用的方法是acquire,代码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用子类实现的tryAcquire方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造独占式同步节点(同一时刻只能有一个线程成功获取同步状态)并通过addWaiter方法将该节点加入到同步队列的尾部,最后调用acquireQueued方法,使得该节点以自旋的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

下面来首先来看下节点构造和加入同步队列是如何实现的。代码如下:

private Node addWaiter(Node mode) {
        // 当前线程构造成Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 尝试快速在尾节点后新增节点 提升算法效率 先将尾节点指向pred
        Node pred = tail;
        if (pred != null) {
            //尾节点不为空  当前线程节点的前驱节点指向尾节点
            node.prev = pred;
            //并发处理 尾节点有可能已经不是之前的节点 所以需要CAS更新
            if (compareAndSetTail(pred, node)) {
                //CAS更新成功 当前线程为尾节点 原先尾节点的后续节点就是当前节点
                pred.next = node;
                return node;
            }
        }
        //第一个入队的节点或者是尾节点后续节点新增失败时进入enq
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //尾节点为空  第一次入队  设置头尾节点一致 同步队列的初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //所有的线程节点在构造完成第一个节点后 依次加入到同步队列中
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

节点进入同步队列之后,就进入了一个自旋的过程,每个线程节点都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中并会阻塞节点的线程,代码如下:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取当前线程节点的前驱节点
                final Node p = node.predecessor();
                //前驱节点为头节点且成功获取同步状态
                if (p == head && tryAcquire(arg)) {
                    //设置当前节点为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //是否阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

再来看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt是怎么来阻塞当前线程的,代码如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驱节点的状态决定后续节点的行为
     int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*前驱节点为-1 后续节点可以被阻塞
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*前驱节点是初始或者共享状态就设置为-1 使后续节点阻塞
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
private final boolean parkAndCheckInterrupt() {
        //阻塞线程
        LockSupport.park(this);
        return Thread.interrupted();
    }

独占式同步获取锁示例图

独占式同步状态的释放

当同步状态获取成功之后,当前线程从acquire方法返回,对于锁这种并发组件而言,就意味着当前线程获取了锁。有获取同步状态的方法,就存在其对应的释放方法,该方法为release,现在来看下这个方法的实现,代码如下:

public final boolean release(int arg) {
        if (tryRelease(arg)) {//同步状态释放成功
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //直接释放头节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*寻找符合条件的后续节点
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒后续节点
            LockSupport.unpark(s.thread);
    }

独占式释放是非常简单而且明确的。

总结下独占式同步状态的获取和释放:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease方法释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态的获取

共享式同步状态调用的方法是acquireShared,代码如下:

public final void acquireShared(int arg) {
        //获取同步状态的返回值大于等于0时表示可以获取同步状态
        //小于0时表示可以获取不到同步状态  需要进入队列等待
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
private void doAcquireShared(int arg) {
        //和独占式一样的入队操作
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            //自旋
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //前驱结点为头节点且成功获取同步状态 可退出自旋
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        //退出自旋的节点变成首节点
        setHead(node);
       
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared方法可以释放同步状态,代码如下:

public final boolean releaseShared(int arg) {
        //释放同步状态
        if (tryReleaseShared(arg)) {
            //唤醒后续等待的节点
            doReleaseShared();
            return true;
        }
        return false;
    }
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        //自旋
    for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒后续节点
            unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

unparkSuccessor方法和独占式是一样的。

AQS的应用

AQS被大量的应用在了同步工具上。

ReentrantLock:ReentrantLock类使用AQS同步状态来保存锁重复持有的次数。当锁被一个线程获取时,ReentrantLock也会记录下当前获得锁的线程标识,以便检查是否是重复获取,以及当错误的线程试图进行解锁操作时检测是否存在非法状态异常。ReentrantLock也使用了AQS提供的ConditionObject,还向外暴露了其它监控和监测相关的方法。

ReentrantReadWriteLock:ReentrantReadWriteLock类使用AQS同步状态中的16位来保存写锁持有的次数,剩下的16位用来保存读锁的持有次数。WriteLock的构建方式同ReentrantLock。ReadLock则通过使用acquireShared方法来支持同时允许多个读线程。

Semaphore:Semaphore类(信号量)使用AQS同步状态来保存信号量的当前计数。它里面定义的acquireShared方法会减少计数,或当计数为非正值时阻塞线程;tryRelease方法会增加计数,在计数为正值时还要解除线程的阻塞。

CountDownLatch:CountDownLatch类使用AQS同步状态来表示计数。当该计数为0时,所有的acquire操作(对应到CountDownLatch中就是await方法)才能通过。

FutureTask:FutureTask类使用AQS同步状态来表示某个异步计算任务的运行状态(初始化、运行中、被取消和完成)。设置(FutureTask的set方法)或取消(FutureTask的cancel方法)一个FutureTask时会调用AQS的release操作,等待计算结果的线程的阻塞解除是通过AQS的acquire操作实现的。

SynchronousQueues:SynchronousQueues类使用了内部的等待节点,这些节点可以用于协调生产者和消费者。同时,它使用AQS同步状态来控制当某个消费者消费当前一项时,允许一个生产者继续生产,反之亦然。

除了这些j.u.c提供的工具,还可以基于AQS自定义符合自己需求的同步器。

0人推荐
随时随地看视频
慕课网APP