前言
CountDownLatch允许一个或多个线程等待其他线程完成操作.
本文代码地址: 源码下载
例子
package com.sourcecode.concurrencytools;public class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2); //c.countDown();
/**
* 打开注释 会依次打印1,2,3
* 关闭注释 会依次打印1,2 Main线程会阻塞在await()方法
*/
}
}).start();
c.await();
System.out.println("3");
}
}可以通过打开注释和关闭注释观察一下各自区别,进而可以简单理解
CountDownLatch的作用.
实现思路
源码如下: 其实源码(总共也就一百来行)没有太多要分析的,逻辑也非常简单,主要依靠的还是
AQS.
package com.sourcecode.concurrencytools;import com.sourcecode.reentrantreadwritelock.AbstractQueuedSynchronizer;import java.util.concurrent.TimeUnit;public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
} // 返回当前AQS的状态值
int getCount() { return getState();
} protected int tryAcquireShared(int acquires) { // 其实跟传入的参数acquires没有什么实质的作用
// 根据当前AQS的状态值是否为0,如果为0就获得锁,如果不为0会进入到AQS中的acquireSharedInterruptibly方法中
// 具体的操作需要了解AQS
return (getState() == 0) ? 1 : -1;
} // 释放 逻辑非常简单
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero
for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0;
}
}
} private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);
} public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
} public boolean await(long timeout, TimeUnit unit)
throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
} public void countDown() {
sync.releaseShared(1);
} public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]";
}
}思路如下:
1.CountDownLatch c = new CountDownLatch(n)此时AQS也就是sync对象的状态值为n.
2.c.await()函数会使任何当前线程阻塞当sync的状态值不为0时,所有调用c.await()方法的线程都会被加入到sync的同步等待队列中并且节点类型为shared. 如果sync的状态值为0时,c.await()函数会使不会阻塞,当前线程会正常执行下面的代码.
3.c.countDown()会使sync的状态值减1,如果状态值减为0的时候,tryReleaseShared会返回true,此时会唤醒所有调用c.await()方法而阻塞的线程.
针对第三点做一点补充说明:看看如何唤醒所有线程的
1.
releaseShared会调用Sync父类AbstractQueuedSynchronizer的releaseShared(int arg)方法如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {
doReleaseShared(); return true;
} return false;
}2. 调用
Sync重写父类的tryReleaseShared(arg)当状态值为0的时候,该方法会返回true进而会调用父类中的doReleaseShared()方法唤醒同步队列中的一个线程.
3. 2中唤醒的线程会从AbstractQueuedSynchronizer中的doAcquireSharedInterruptibly中的parkAndCheckInterrupt()中返回进而通过tryAcquireShared去尝试获得锁,此时由于当前状态值为0,会返回1,表示获得锁,然后调用setHeadAndPropagate(node, r)(其中r=1)方法去设置头节点并且尝试去唤醒同步队列后面的线程.
4.setHeadAndPropagate(node, r)方法在满足以下条件的情况下又会调用doReleaseShared()从而进入到1.中一步步释放所有由于c.await()方法而阻塞的线程.
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录一下旧的头节点
setHead(node); // 将当前节点设置为头节点
/**
* 如果propagate > 0 说明锁还可以被别的线程拿到
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; if (s == null || s.isShared())
doReleaseShared();
}
}例子2: 关注异常退出
await()除了上面讲的正常退出外,还有就是在阻塞过程中被别的线程中断的时候也会退出. 如下图所示,先启动一个自定义线程并调用await()方法并且捕获异常,在主线程中断该线程.
package com.sourcecode.concurrencytools;import java.util.concurrent.TimeUnit;public class CountDownLatchTest3 { static CountDownLatch c = new CountDownLatch(1); public static void main(String[] args) throws InterruptedException {
Thread thread = new MyThread();
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt(); //c.countDown();
System.out.println(Thread.currentThread() + "----->finished!");
} static class MyThread extends Thread { public void run() { try {
System.out.println(Thread.currentThread() + "----->before await");
c.await();
System.out.println(Thread.currentThread() + "----->after await");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread() + "----->in interrupted exception.");
}
System.out.println(Thread.currentThread() + "----->finished!");
}
}
}结果如下: 可以看到当主线程中断线程
thread时,线程thread从await()方法中返回. 至此可以看到await方法是响应中断的.
Thread[Thread-0,5,main]----->before await Thread[main,5,main]----->finished! Thread[Thread-0,5,main]----->in interrupted exception. Thread[Thread-0,5,main]----->finished!
作者:nicktming
链接:https://www.jianshu.com/p/298a598806fb