手记

Java并发(6)- CountDownLatch、Semaphore与AQS

 

引言

上一篇文章中详细分析了基于AQS的ReentrantLock原理ReentrantLock通过AQS中的state变量0和1之间的转换代表了独占锁。那么可以思考一下当state变量大于1时代表了什么J.U.C中是否有基于AQS的这种实现呢如果有那他们都是怎么实现的呢这些疑问通过详细分析J.U.C中的Semaphore与CountDownLatch类后将会得到解答。

  1. Semaphore与CountDownLatch的共享逻辑

  2. Semaphore与CountDownLatch的使用示例
    2.1 Semaphore的使用
    2.2 CountDownLatch的使用

  3. 源码分析
    3.1 AQS中共享锁的实现
    3.2 Semaphore源码分析
    3.3 CountDownLatch源码分析

  4. 总结

1. Semaphore与CountDownLatch的共享方式

独占锁意味着只能有一个线程获取锁其他的线程在锁被占用的情况下都必须等待锁释放后才能进行下一步操作。由此类推共享锁是否意味着可以由多个线程同时使用这个锁不需要等待呢如果是这样那锁的意义也就不存在了。在J.U.C中共享意味着有多个线程可以同时获取锁但这个多个是有限制的并不是无限个J.U.C中通过Semaphore与CountDownLatch来分别实现了两种有限共享锁。

Semaphore又叫信号量他通过一个共享的’信号包‘来给每个使用他的线程来分配信号当信号包中的信号足够时线程可以获取锁反之信号包中信号不够了则不能获取到锁需要等待足够的信号被释放才能获取。

CountDownLatch又叫计数器他通过一个共享的计数总量来控制线程锁的获取当计数器总量大于0时线程将被阻塞不能够获取锁只有当计数器总量为0时所有被阻塞的线程同时被释放。

可以看到Semaphore与CountDownLatch都有一个共享总量这个共享总量就是通过state来实现的。

2. Semaphore与CountDownLatch的使用示例

在详细分析Semaphore与CountDownLatch的原理之前先来看看他们是怎么使用的这样方便后续我们理解他们的原理。先知道他是什么然后再问为什么下面通过两个示例来详细说明Semaphore与CountDownLatch的使用。

2.1 Semaphore的使用

//初始化10个信号量在信号包中让ABCD4个线程分别去获取public static void main(String[] args) throws InterruptedException {
    Semaphore semaphore = new Semaphore(10);
    SemaphoreTest(semaphore);
}private static void SemaphoreTest(final Semaphore semaphore) throws InterruptedException {    //线程A初始获取了4个信号量然后分3次释放了这4个信号量
    Thread threadA = new Thread(new Runnable() {

        @Override        public void run() {            try {
                semaphore.acquire(4);
                System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
                semaphore.release(1);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
                semaphore.release(1);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
                semaphore.release(2);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadA.setName("threadA");    //线程B初始获取了5个信号量然后分2次释放了这5个信号量
    Thread threadB = new Thread(new Runnable() {

        @Override        public void run() {            try {
                semaphore.acquire(5);
                System.out.println(Thread.currentThread().getName() + " get 5 semaphore");
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
                semaphore.release(2);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " release 3 semaphore");
                semaphore.release(3);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadB.setName("threadB");    //线程C初始获取了4个信号量然后分1次释放了这4个信号量
    Thread threadC = new Thread(new Runnable() {

        @Override        public void run() {            try {
                semaphore.acquire(4);
                System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " release 4 semaphore");
                semaphore.release(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadC.setName("threadC");    
    //线程D初始获取了10个信号量然后分1次释放了这10个信号量
    Thread threadD = new Thread(new Runnable() {

        @Override        public void run() {            try {
                semaphore.acquire(10);
                System.out.println(Thread.currentThread().getName() + " get 10 semaphore");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " release 10 semaphore");
                semaphore.release(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadD.setName("threadD");    
    //线程A和线程B首先分别获取了4个和5个信号量总信号量变为了1个
    threadA.start();
    threadB.start();
    Thread.sleep(1);    //线程C尝试获取4个发现不够则等待
    threadC.start();
    Thread.sleep(1);    //线程D尝试获取10个发现不够则等待
    threadD.start();
}

执行结果如下

threadB get 5 semaphore
threadA get 4 semaphore
threadA release 1 semaphore
threadB release 2 semaphore
threadC get 4 semaphore
threadA release 1 semaphore
threadC release 4 semaphore
threadB release 3 semaphore
threadA release 2 semaphore
threadD get 10 semaphore
threadD release 10 semaphore

可以看到threadA和threadB在获取了9个信号量之后threadC和threadD之后等待信号量足够时才能继续往下执行。而threadA和threadB在信号量足够时是可以同时执行的。

其中有一个问题当threadD排队在threadC之前时信号量如果被释放了4个threadC会先于threadD执行吗还是需要排队等待呢这个疑问在详细分析了Semaphore的源码之后再来给大家答案。

2.2 CountDownLatch的使用

//初始化计数器总量为2public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(2);
    CountDownLatchTest(countDownLatch);
}private static void CountDownLatchTest(final CountDownLatch countDownLatch) throws InterruptedException {    //threadA尝试执行计数器为2被阻塞
    Thread threadA = new Thread(new Runnable() {        @Override
        public void run() {            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " await");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadA.setName("threadA");    //threadB尝试执行计数器为2被阻塞
    Thread threadB = new Thread(new Runnable() {        @Override
        public void run() {            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " await");
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadB.setName("threadB");    
    //threadC在1秒后将计数器数量减1
    Thread threadC = new Thread(new Runnable() {        @Override
        public void run() {            try {
                Thread.sleep(1000);
                countDownLatch.countDown();
                
                System.out.println(Thread.currentThread().getName() + " countDown");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadC.setName("threadC");    
    //threadD在5秒后将计数器数量减1
    Thread threadD = new Thread(new Runnable() {        @Override
        public void run() {            try {
                Thread.sleep(5000);
                countDownLatch.countDown();
                
                System.out.println(Thread.currentThread().getName() + " countDown");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    });
    threadD.setName("threadD");
    
    threadA.start();
    threadB.start();
    threadC.start();
    threadD.start();
}

执行结果如下

threadC countDown
threadD countDown
threadA awaitthreadB await

threadA和threadB在尝试执行时由于计数器总量为2被阻塞当threadC和threadD将计数器总量减为0后threadA和threadB同时开始执行。

总结一下Semaphore就像旋转寿司店共有10个座位当座位有空余时等待的人就可以坐上去。如果有只有2个空位来的是一家3口那就只有等待。如果来的是一对情侣就可以直接坐上去吃。当然如果同时空出5个空位那一家3口和一对情侣可以同时上去吃。CountDownLatch就像大型商场里面的临时游乐场每一场游乐的时间过后等待的人同时进场玩而一场中间会有不爱玩了的人随时出来但不能进入一旦所有进入的人都出来了新一批人就可以同时进场。

3. 源码分析

明白了Semaphore与CountDownLatch是做什么的怎么使用的。接下来就来看看Semaphore与CountDownLatch底层时怎么实现这些功能的。

3.1 AQS中共享锁的实现

上篇文章通过对ReentrantLock的分析得倒了AQS中实现独占锁的几个关键方法

//状态量独占锁在0和1之间切换private volatile int state;//调用tryAcquire获取锁获取失败后加入队列中挂起等操作AQS中实现public final void acquire(int arg);//独占模式下尝试获取锁ReentrantLock中实现protected boolean tryAcquire(int arg);//调用tryRelease释放锁以及恢复线程等操作AQS中实现public final boolean release(int arg);//独占模式下尝试释放锁ReentrantLock中实现protected boolean tryRelease(int arg);

其中具体的获取和释放独占锁的逻辑都放在ReentrantLock中自己实现AQS中负责管理获取或释放独占锁成功失败后需要具体处理的逻辑。那么共享锁的实现是否也是遵循这个规律呢由此我们在AQS中发现了以下几个类似的方法

//调用tryAcquireShared获取锁获取失败后加入队列中挂起等操作AQS中实现public final void acquireShared(int arg)//共享模式下尝试获取锁protected int tryAcquireShared(int arg)//调用tryReleaseShared释放锁以及恢复线程等操作AQS中实现public final boolean releaseShared(int arg)//共享模式下尝试释放锁protected boolean tryReleaseShared(int arg)

共享锁和核心就在上面4个关键方法中先来看看Semaphore是怎么调用上述方法来实现共享锁的。

3.2 Semaphore源码分析

首先是Semaphore的构造方法同ReentrantLock一样他有两个构造方法这样也是为了实现公平共享锁和非公平共享锁大家可能有疑问既然是共享锁为什么还分公平和非公平的呢这就回到了上面那个例子后面的疑问前面有等待的线程时后来的线程是否可以直接获取信号量还是一定要排队。等待当然是公平的插队就是非公平的。

还是用旋转寿司的例子来说现在只有2个空位已经有一家3口在等待这时来了一对情侣公平共享锁的实现就是这对情侣必须等待只到一家3口上桌之后才轮到他们而非公平共享锁的实现是可以让这对情况直接去吃因为刚好有2个空位让一家3口继续等待好像是很不公平......这种情况下非公平共享锁的好处就是可以最大化寿司店的利润好像同时也得罪了等待的顾客......也是Semaphore默认的实现方式。

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore的例子中使用了两个核心方法acquire和release分别调用了AQS中的acquireSharedInterruptibly和releaseShared方法

//获取permits个信号量public void acquire(int permits) throws InterruptedException {    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}//释放permits个信号量public void release(int permits) {    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量
        doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起}public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) { //尝试释放arg个信号量
        doReleaseShared();        return true;
    }    return false;
}

Semaphore在获取和释放信号量的流程都是通过AQS来实现具体怎么算获取成功或释放成功则由Semaphore本身实现。

//公平共享锁尝试获取acquires个信号量protected int tryAcquireShared(int acquires) {    for (;;) {        if (hasQueuedPredecessors()) //前面是否有排队有则返回获取失败
            return -1;        int available = getState(); //剩余的信号量旋转寿司店剩余的座位
        int remaining = available - acquires;        if (remaining < 0 ||
            compareAndSetState(available, remaining)) // 剩余信号量不够够的情况下尝试获取旋转寿司店座位不够或者同时来两对情况抢座位
            return remaining;
    }
}//非公平共享锁尝试获取acquires个信号量final int nonfairTryAcquireShared(int acquires) {    for (;;) {        int available = getState(); //剩余的信号量旋转寿司店剩余的座位
        int remaining = available - acquires;        if (remaining < 0 ||
            compareAndSetState(available, remaining)) // 剩余信号量不够够的情况下尝试获取旋转寿司店座位不够或者同时来两对情侣抢座位
            return remaining;
    }
}

可以看到公平共享锁和非公平共享锁的区别就在是否需要判断队列中是否有已经等待的线程。公平共享锁需要先判断非公平共享锁直接插队尽管前面已经有线程在等待。

为了验证这个结论稍微修改下上面的示例

threadA.start();
threadB.start();
Thread.sleep(1);
threadD.start(); //threadD已经在排队
Thread.sleep(3500);
threadC.start(); //3500毫秒后threadC来插队

结果输出

threadB get 5 semaphore
threadA get 4 semaphore
threadB release 2 semaphore
threadA release 1 semaphore
threadC get 4 semaphore //threadC先与threadD获取到信号量
threadA release 1 semaphore
threadB release 3 semaphore
threadC release 4 semaphore
threadA release 2 semaphore
threadD get 10 semaphore
threadD release 10 semaphore

这个示例很好的说明了当为非公平锁时会先尝试获取共享锁然后才排队。

当获取信号量失败之后会去排队排队这个操作通过AQS中的doAcquireSharedInterruptibly方法来实现

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {    final Node node = addWaiter(Node.SHARED); //加入等待队列
    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor(); //获取当前节点的前置节点
            if (p == head) {                int r = tryAcquireShared(arg); //前置节点是头节点时说明当前节点是第一个挂起的线程节点再次尝试获取共享锁
                if (r >= 0) {
                    setHeadAndPropagate(node, r); //与ReentrantLock不同的地方获取共享锁成功设置头节点同时通知下一个节点
                    p.next = null; // help GC
                    failed = false;                    return;
                }
            }            if (shouldParkAfterFailedAcquire(p, node) && //非头节点或者获取锁失败检查节点状态查看是否需要挂起线程
                parkAndCheckInterrupt()) //挂起线程当前线程阻塞在这里
                throw new InterruptedException();
        }
    } finally {        if (failed)
            cancelAcquire(node);
    }
}

这一段代码和ReentrantLock中的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法基本一样说下两个不同的地方。一是加入等待队列时这里加入的是Node.SHARED类型的节点。二是获取锁成功后会通知下一个节点也就是唤醒下一个线程。以旋转寿司店的例子为例前面同时走了5个客人空余5个座位一家3口坐进去之后会告诉后面的一对情侣让他们也坐进去这样就达到了共享的目的。shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法在上一篇文章中都有详细说明这里就做解释了。

再来看看releaseShared方法时怎么释放信号量的首先调用tryReleaseShared来尝试释放信号量释放成功后调用doReleaseShared来判断是否需要唤醒后继线程

protected final boolean tryReleaseShared(int releases) {    for (;;) {        int current = getState();        int next = current + releases;        if (next < current) // overflow //释放信号量过多
            throw new Error("Maximum permit count exceeded");        if (compareAndSetState(current, next)) //cas操作设置新的信号量
            return true;
    }
}private void doReleaseShared() {    for (;;) {
        Node h = head;        if (h != null && h != tail) {            int ws = h.waitStatus;            if (ws == Node.SIGNAL) { //SIGNAL状态下唤醒后继节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;            // loop to recheck cases
                unparkSuccessor(h); //唤醒后继节点
            }            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop on failed CAS
        }        if (h == head)                   // loop if head changed
            break;
    }
}

释放的逻辑很好理解相比ReentrantLock只是在state的数量上有点差别。

3.3 CountDownLatch源码分析

CountDownLatch相比Semaphore在实现逻辑上要简单的多同时他也没有公平和非公平的区别因为当计数器达到0的时候所有等待的线程都会释放不为0的时候所有等待的线程都会阻塞。直接来看看CountDownLatch的两个核心方法await和countDown。

public void await() throws InterruptedException {    //和Semaphore的不同在于参数为1其实这个参数对CountDownLatch来说没什么意义因为后面CountDownLatch的tryAcquireShared实现是通过getState() == 0来判断的
    sync.acquireSharedInterruptibly(1); 
}public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {    //这里加入了一个等待超时控制超过时间后直接返回false执行后面的代码不会长时间阻塞
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 
}public void countDown() {
    sync.releaseShared(1); //每次释放1个计数}public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量
        doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起}protected int tryAcquireShared(int acquires) {    return (getState() == 0) ? 1 : -1; //奠定了同时获取锁的基础无论State初始为多少只能计数等于0时触发}

和Semaphore区别有两个一是State每次只减少1同时只有为0时才释放所有等待线程。二是提供了一个超时等待方法。acquireSharedInterruptibly方法跟Semaphore一样就不细说了这里重点说下tryAcquireSharedNanos方法。

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}//最小自旋时间static final long spinForTimeoutThreshold = 1000L;private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {    if (nanosTimeout <= 0L)        return false;    final long deadline = System.nanoTime() + nanosTimeout; //计算了一个deadline
    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor();            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime(); 
            if (nanosTimeout <= 0L) //超时后直接返回false继续执行
                return false;            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold) //大于最小cas操作时间则挂起线程
                LockSupport.parkNanos(this, nanosTimeout); //挂起线程也有超时限制
            if (Thread.interrupted())                throw new InterruptedException();
        }
    } finally {        if (failed)
            cancelAcquire(node);
    }
}

重点看标了注释的几行代码首先计算了一个超时时间当超时后直接退出等待继续执行。如果未超时并且大于最小的cas操作时间这里定义的是1000ns则挂起同时挂起操作也有超时限制。这样就实现了超时等待。

4.总结

至此关于AQS的共享锁的两个实现Semaphore与CountDownLatch就分析完了他们与非共享最大的区别就在于是否能多个线程同时获取锁。看完后希望大家能对Semaphore与CountDownLatch有深刻的理解不明白的时候想想旋转寿司店和游乐场的例子如果对大家有帮助觉得写的好的话可以点个赞当然更希望大家能积极指出文中的错误和提出积极的改进意见。

原文出处https://www.cnblogs.com/konck/p/9473591.html

0人推荐
随时随地看视频
慕课网APP