前言
CountDownLatch
允许一个或多个线程等待其他线程完成操作.
本文代码地址: 源码下载
例子
package com.sourcecode.concurrencytools;public class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { System.out.println(1); c.countDown(); System.out.println(2); //c.countDown(); /** * 打开注释 会依次打印1,2,3 * 关闭注释 会依次打印1,2 Main线程会阻塞在await()方法 */ } }).start(); c.await(); System.out.println("3"); } }
可以通过打开注释和关闭注释观察一下各自区别,进而可以简单理解
CountDownLatch
的作用.
实现思路
源码如下: 其实源码(总共也就一百来行)没有太多要分析的,逻辑也非常简单,主要依靠的还是
AQS
.
package com.sourcecode.concurrencytools;import com.sourcecode.reentrantreadwritelock.AbstractQueuedSynchronizer;import java.util.concurrent.TimeUnit;public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } // 返回当前AQS的状态值 int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { // 其实跟传入的参数acquires没有什么实质的作用 // 根据当前AQS的状态值是否为0,如果为0就获得锁,如果不为0会进入到AQS中的acquireSharedInterruptibly方法中 // 具体的操作需要了解AQS return (getState() == 0) ? 1 : -1; } // 释放 逻辑非常简单 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
思路如下:
1.CountDownLatch c = new CountDownLatch(n)
此时AQS
也就是sync
对象的状态值为n
.
2.c.await()
函数会使任何当前线程阻塞当sync
的状态值不为0
时,所有调用c.await()
方法的线程都会被加入到sync
的同步等待队列中并且节点类型为shared
. 如果sync
的状态值为0
时,c.await()
函数会使不会阻塞,当前线程会正常执行下面的代码.
3.c.countDown()
会使sync
的状态值减1
,如果状态值减为0
的时候,tryReleaseShared
会返回true
,此时会唤醒所有调用c.await()
方法而阻塞的线程.
针对第三点做一点补充说明:看看如何唤醒所有线程的
1.
releaseShared
会调用Sync
父类AbstractQueuedSynchronizer
的releaseShared(int arg)
方法如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
2. 调用
Sync
重写父类的tryReleaseShared(arg)
当状态值为0
的时候,该方法会返回true
进而会调用父类中的doReleaseShared()
方法唤醒同步队列中的一个线程.
3. 2中唤醒的线程会从AbstractQueuedSynchronizer
中的doAcquireSharedInterruptibly
中的parkAndCheckInterrupt()
中返回进而通过tryAcquireShared
去尝试获得锁,此时由于当前状态值为0
,会返回1
,表示获得锁,然后调用setHeadAndPropagate(node, r)
(其中r=1
)方法去设置头节点并且尝试去唤醒同步队列后面的线程.
4.setHeadAndPropagate(node, r)
方法在满足以下条件的情况下又会调用doReleaseShared()
从而进入到1.中一步步释放所有由于c.await()
方法而阻塞的线程.
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 记录一下旧的头节点 setHead(node); // 将当前节点设置为头节点 /** * 如果propagate > 0 说明锁还可以被别的线程拿到 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
例子2: 关注异常退出
await()
除了上面讲的正常退出外,还有就是在阻塞过程中被别的线程中断的时候也会退出. 如下图所示,先启动一个自定义线程并调用await()
方法并且捕获异常,在主线程中断该线程.
package com.sourcecode.concurrencytools;import java.util.concurrent.TimeUnit;public class CountDownLatchTest3 { static CountDownLatch c = new CountDownLatch(1); public static void main(String[] args) throws InterruptedException { Thread thread = new MyThread(); thread.start(); TimeUnit.SECONDS.sleep(1); thread.interrupt(); //c.countDown(); System.out.println(Thread.currentThread() + "----->finished!"); } static class MyThread extends Thread { public void run() { try { System.out.println(Thread.currentThread() + "----->before await"); c.await(); System.out.println(Thread.currentThread() + "----->after await"); } catch (InterruptedException e) { System.out.println(Thread.currentThread() + "----->in interrupted exception."); } System.out.println(Thread.currentThread() + "----->finished!"); } } }
结果如下: 可以看到当主线程中断线程
thread
时,线程thread
从await()
方法中返回. 至此可以看到await
方法是响应中断的.
Thread[Thread-0,5,main]----->before await Thread[main,5,main]----->finished! Thread[Thread-0,5,main]----->in interrupted exception. Thread[Thread-0,5,main]----->finished!
作者:nicktming
链接:https://www.jianshu.com/p/298a598806fb