这篇文章,介绍一些比较常用的控制并发流程的工具类:CountDownLatch、CyclicBarrier、Semaphore、Exchange、Condition。
通过这些工具类,我们可以更好地控制并发流程,多线程之间更好地协同工作。
常用的控制并发流程的工具类简介:
| 类 | 描述 | 
|---|---|
| CountDownLatch  倒数门闩  | 
是一个或一组线程等待其他的线程完成工作以后再执行。 | 
| CyclicBarrier  循环栅栏  | 
让一组线程达到某个屏障,被阻塞,等到指定数量的线程都达到屏障时,屏障开放,所有被阻塞的线程会继续运行 | 
| Semaphore  信号量  | 
用来限制或管理数量有限的资源的使用,例如流量控制 | 
| Exchange  数据交换器  | 
两个线程间的数据交换 | 
| Condition  条件对象  | 
可以控制线程的“等待”和“唤醒”,实现线程间通信,使用上类似 Object 中的 wait() 和 notify() | 
2、 CountDownLatch 倒计时门闩
>CountDownLatch 是一组线程等待其他的线程完成工作以后再执行。
CountDownLatch常用API:
| 方法 | 描述 | 
|---|---|
| CountDownLatch(int count) | 创建 CountDownLatch,并且指定数量count,count不能小于0 | 
| void await() | 当前线程阻塞等待直到闩锁的计数倒数到0。如果线程被中断 interrupted,则抛出InterruptedException | 
| boolean await (long timeout, TimeUnit unit)  | 
指定等待时间,阻塞当前线程,直到闩锁的计数倒数到0 或者 超时。如果线程被中断 interrupted,则抛出InterruptedException | 
| void countDown() | 闩锁的计数减 1 ,如果计数为零,则释放所有等待的线程。 | 
| long getCount() | 闩锁的当前计数 | 
注意:
- 可以多个地方同时调用 await()
 - 可以在同个线程中多次 countDown()
 
2.1 CountDownLatch 用法一(一等多):一个线程等待多个线程都执行完毕,再继续自己的工作
/**
 * Description: 赛跑前,5个运动员准备完毕,才鸣发令枪
 *
 * @author Xander
 */
public class CountDownLatchDemo1 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("运动员 No." + no + "准备好了。");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("等待所有运动员热身准备.....");
        latch.await();
        System.out.println("所有人准备完毕,鸣发令枪,开跑...");
    }
}
运行结果:
等待所有运动员热身准备.....
运动员 No.3准备好了。
运动员 No.4准备好了。
运动员 No.1准备好了。
运动员 No.5准备好了。
运动员 No.2准备好了。
所有人准备完毕,鸣发令枪,开跑...
2.2 CountDownLatch 用法二(多等一):多个线程等待某一个线程的信号,同时开始执行
/**
 * Description: 模拟赛跑,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。
 *
 * @author Xander
 */
public class CountDownLatchDemo2 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + "准备完毕,等待发令枪");
                    try {
                        begin.await();
                        System.out.println("No." + no + "开始跑步了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.submit(runnable);
        }
        //裁判员检查发令枪...
        Thread.sleep(5000);
        System.out.println("发令枪响,比赛开始!");
        begin.countDown();
    }
}
运行结果:
No.1准备完毕,等待发令枪
No.3准备完毕,等待发令枪
No.4准备完毕,等待发令枪
No.2准备完毕,等待发令枪
No.5准备完毕,等待发令枪
发令枪响,比赛开始!
No.1开始跑步了
No.3开始跑步了
No.5开始跑步了
No.2开始跑步了
No.4开始跑步了
2.3 CountDownLatch 用法三(多等多):多个线程await等待,多个线程的countDown减一
public class CountDownLatchDemo3 {
    static CountDownLatch latch = new CountDownLatch(3);
    public static void main(String[] args) throws InterruptedException {
        // 启动业务线程,需要等待 CountDownLatch 计数为0 才能执行它的业务
        new Thread(new BusiThread(),"Thread1").start();
        // 模拟一个线程中可以多次 countDown()
        // 这个线程有 2 步操作,假设每步操作完成后都需要扣减 1 次
        new Thread(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName() + " ready init work step 1st......");
                //第一步工作完成,扣减一次
                latch.countDown();
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println("begin step 2nd.......");
                System.out.println(Thread.currentThread().getName() + " ready init work step 2nd......");
                //第二步工作完成,再扣减一次
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread1").start();
        // 另外一条子线程,执行一次 countDown()
        new Thread(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(10);
                System.out.println(Thread.currentThread().getName() + " countDown......");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread2").start();
        // Main线程指定超时时间阻塞等待
        System.out.println("Main await--");
        latch.await(10,TimeUnit.MILLISECONDS);
        // Main线程等待超时,放弃继续等待,继续执行业务
        System.out.println("time out,Main do its work........");
    }
    //业务线程
    private static class BusiThread implements Runnable {
        @Override
        public void run() {
            try {
                System.out.println("BusiThread await--");
                latch.await();
                System.out.println("BusiThread do business-----");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
运行结果:
BusiThread await--
Main await--
Thread1 ready init work step 1st......
Thread2 countDown......
time out,Main do its work........
begin step 2nd.......
Thread1 ready init work step 2nd......
BusiThread do business-----
3、CyclicBarrier 循环栅栏
>CyclicBarrier 循环栅栏和 CountDownLatch 很类似,都能阻塞一组线程
>让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行
当有大量线程相互配合,分别计算不同任务,并且需要最后统一汇总的时候,我们可以使用 CyclicBarrier 。CyclicBarrier可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务。
举例:全班同学明天中午在校门口集合,都到齐后,一起去郊游。
CyclicBarrier 常用API:
| 方法 | 描述 | 
|---|---|
| CyclicBarrier(int parties) | 创建一个新的CyclicBarrier,当给定数量的参与方(线程)正在等待它时,它将跳闸(打开屏障),并且在屏障被打开时不执行预定义的操作,所有被阻塞的线程会继续运行 | 
| CyclicBarrier (int parties, Runnable barrierAction)  | 
创建一个新的CyclicBarrier,当给定数量的参与方(线程)正在等待它时,它将跳闸(打开屏障),并且在屏障被打开时执行给定的屏障操作 barrierAction,由最后一个进入屏障的线程执行,barrierAction 线程执行完成后,其他使用 CyclicBarrier.await() 的线程,才继续执行。 | 
| void await() | 当前线程阻塞等待直到指定数量的线程调用了await() | 
| int await (long timeout, TimeUnit unit)  | 
指定等待时间,阻塞当前线程,直到指定数量的线程调用了await() 或者 超时 | 
CyclicBarrier Demo
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
            // 在屏障被打开时执行给定的屏障操作, 由最后一个进入屏障的线程执行
                System.out.println("线程" + Thread.currentThread().getName() + ":所有人都到场了, 大家统一出发!");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier),"Thread"+i).start();
        }
    }
    static class Task implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;
        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + ":现在前往集合地点");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + Thread.currentThread().getName() + ":到了集合地点,开始等待其他人到达");
                cyclicBarrier.await();
                System.out.println("线程" + Thread.currentThread().getName() + ":出发了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
运行结果:
线程Thread0:现在前往集合地点
线程Thread4:现在前往集合地点
线程Thread2:现在前往集合地点
线程Thread1:现在前往集合地点
线程Thread6:现在前往集合地点
线程Thread3:现在前往集合地点
线程Thread5:现在前往集合地点
线程Thread8:现在前往集合地点
线程Thread7:现在前往集合地点
线程Thread9:现在前往集合地点
线程Thread7:到了集合地点,开始等待其他人到达
线程Thread4:到了集合地点,开始等待其他人到达
线程Thread1:到了集合地点,开始等待其他人到达
线程Thread8:到了集合地点,开始等待其他人到达
线程Thread9:到了集合地点,开始等待其他人到达
线程Thread9:所有人都到场了, 大家统一出发!
线程Thread9:出发了
线程Thread7:出发了
线程Thread4:出发了
线程Thread1:出发了
线程Thread8:出发了
线程Thread0:到了集合地点,开始等待其他人到达
线程Thread5:到了集合地点,开始等待其他人到达
线程Thread2:到了集合地点,开始等待其他人到达
线程Thread3:到了集合地点,开始等待其他人到达
线程Thread6:到了集合地点,开始等待其他人到达
线程Thread6:所有人都到场了, 大家统一出发!
线程Thread6:出发了
线程Thread0:出发了
线程Thread5:出发了
线程Thread2:出发了
线程Thread3:出发了
CountDownLatch和CyclicBarrier辨析
- 
作用不同: CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待数字到 0,也就是说, CountDownLatch 用于事件,但是 CyclicBarrier 是用于线程的。
 - 
可重用性不同:CountDownLatch 在倒数到 0 并触发门闩打开后,就不能再次使用了,除非新建新的实例;而 CyclicBarrier 可以重复使用。
 - 
CountDownLatch 是否往下执行,由其它线程觉得,CyclicBarrier放行由一组线程本身控制
 
4、Semaphore 信号量
用来限制或管理数量有限的资源的使用,例如流量控制
>信号量的作用是维护一个“许可证”的计数,线程可以“获取”许可证,那信号量剩余的许可就减一,线程也可以“释放”一个许可证,那信号量剩余的许可证就加一,当信号量所拥有的许可证数量为0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证。
Semaphore 常用API:
| 方法 | 描述 | 
|---|---|
| Semaphore(int permits) | 指定许可证的数量,创建一个非公平的Semaphore | 
| Semaphore (int permits, boolean fair)  | 
指定许可证和设置是否使用公平策略, true表示公平: Semaphore 把等待的线程放到 FIFO 的队列中,当有了新的许可证,可以发给等待最长时间的线程。  | 
| void acquire() | 从Semaphore获取一个许可证,如果没有则阻塞直到有一个可用 | 
| void acquire (int permits)  | 
从Semaphore获取指定数量的许可证,如果没有则阻塞直到指定数量的许可证可用 | 
| void acquireUninterruptibly() | 从Semaphore获取一个许可证,如果没有则阻塞直到有一个可用,就是当前线程被中断(interrupted)也不抛出InterruptedException,继续阻塞等待 | 
| void release() | 释放一个许可证 | 
| void release (int permits)  | 
释放指定数量的许可证 | 
| boolean tryAcquire() | 如果当前Semaphore有可用的则获取一个许可证,不阻塞当前线程 true: 获取成功 false: 获取失败  | 
| boolean tryAcquire (int permits)  | 
尝试获取指定数量的许可证,不阻塞当前线程 true: 获取成功 false: 获取失败  | 
| boolean tryAcquire(long timeout, TimeUnit unit) | 指定等待时间,尝试获取一个许可证,如果超时,则放弃等待 true: 获取成功 false: 获取失败  | 
| boolean tryAcquire(int permits, long timeout, TimeUnit unit) | 指定等待时间,尝试获取指定数量的许可证,如果超时,则放弃等待 true: 获取成功 false: 获取失败  | 
4.1 使用流程
- 初始化 Semaphore并指定许可证的数量
 - 在需要被限制的代码前加 
acquire()或者acquireUninterruptibly()方法 - 在任务执行结束后,调用 
release()来释放许可证。 
Semaphore Demo
public class SemaphoreDemo {
    static Semaphore semaphore = new Semaphore(3, true);
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }
    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "准备获取许可证");
                semaphore.acquire(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "拿到了许可证------剩余许可证:"+semaphore.availablePermits());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            semaphore.release(1);
            System.out.println(Thread.currentThread().getName() + "释放了许可证+++++++剩余许可证:"+semaphore.availablePermits());
        }
    }
}
运行结果:
pool-1-thread-1准备获取许可证
pool-1-thread-3准备获取许可证
pool-1-thread-3拿到了许可证------剩余许可证:1
pool-1-thread-2准备获取许可证
pool-1-thread-2拿到了许可证------剩余许可证:0
pool-1-thread-4准备获取许可证
pool-1-thread-1拿到了许可证------剩余许可证:2
pool-1-thread-5准备获取许可证
pool-1-thread-1释放了许可证+++++++剩余许可证:3
pool-1-thread-4拿到了许可证------剩余许可证:2
pool-1-thread-3释放了许可证+++++++剩余许可证:3
pool-1-thread-2释放了许可证+++++++剩余许可证:3
pool-1-thread-5拿到了许可证------剩余许可证:1
pool-1-thread-4释放了许可证+++++++剩余许可证:2
pool-1-thread-5释放了许可证+++++++剩余许可证:3
4.2 注意点
- 一般场景下,获取和释放的许可证数量要一致。
 - 注意在初始化 semaphore 的时候设置公平性,一般设置为 true 会更合理
 - 并不是必须由许可证的线程释放那个许可证,事实上,获取和释放许可证的线程并无要求,也许是 A 获取了,然后由 B 释放了,只要逻辑合理即可。
 - 信号量的作用,除了控制临界区最多同时有N个线程访问外,另一个作用是可以实现“条件等待”,例如线程1需要在线程2完成准备工作后才能开始工作,那么线程1 acquire() ,而线程2完成任务后 release() ,这样的话,相当于是轻量级的 CountDownLatch.
 
5、Exchange
>两个线程间的数据交换
这个工具类在项目中用的比较少,简单介绍下用法。
public class ExchangeDemo {
    private static final Exchanger> exchange = new Exchanger>();
    public static void main(String[] args) {
        //第一个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set setA = new HashSet();//存放数据的容器
                try {
                    // 添加数据
                    setA.add("A1");
                    setA.add("A2");
                    // 先调用 exchange() 的线程会阻塞,直到后面的线程调用 exchange()执行时,才进行数据交换,继续运行
                    setA = exchange.exchange(setA);//交换set
                    // 处理交换后的数据
                    System.out.println(Thread.currentThread().getName() + "----" + setA);
                } catch (InterruptedException e) {
                }
            }
        }, "ThreadA").start();
        //第二个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set setB = new HashSet();//存放数据的容器
                try {
                    // 添加数据
                    setB.add("B1");
                    setB.add("B2");
                    setB.add("B3");
                    // 先调用 exchange() 的线程会阻塞,直到后面的线程调用 exchange()执行时,才进行数据交换,继续运行
                    setB = exchange.exchange(setB);//交换set
                    // 处理交换后的数据
                    System.out.println(Thread.currentThread().getName() + "----" + setB);
                } catch (InterruptedException e) {
                }
            }
        }, "ThreadB").start();
    }
}
运行结果:
ThreadA----[B2, B3, B1]
ThreadB----[A1, A2]
6、Condition 接口
| 方法 | 描述 | 
|---|---|
| void await() | 当前线程阻塞等待直到被唤醒(signalled) 。如果线程被中断 interrupted,则抛出InterruptedException | 
| void awaitUninterruptibly() | 当前线程阻塞等待直到被唤醒(signalled) | 
| void signal() | 唤醒等待时间最长的线程,在从 await 返回之前,该线程必须重新获取锁 | 
| void signalAll() | 唤醒所有正在等待的线程,每个线程必须重新获取锁,然后才能从 await 返回。 | 
signalAll() 和 signal() 区别
- signalAll() 会唤起所有的正在等待的线程
 - 但是 signal() 是公平的,只会唤起那个等待时间最长的线程。
 
Condition与 Object 的 wait, notify, notifyAll 对比:
- Condition 的 await,signal, singalAll 与 Object 的 wait, notify, notifyAll 都可以实现线程之间的通信,两者在使用上也是非常类似,都需要先获取锁之后才能调用,而不同的是 Object wait,notify 对应的是 synchronized 方式的锁,Condition await,singal 则对应的是 ReentrantLock (实现 Lock 接口的锁对象)对应的锁
 - Condition是通过 Lock接口的 lock.newCondition() 新建一个Condition实例,而且 Lock实例可以新建多个Condition实例
 
注意: 在执行 Condition 的 await 和 signal 等方法之前,得要先持有condition相关联的 Lock。
简述Conditiond使用步骤:
- 线程1 持有condition相关联的lock,当要等待某个条件时候,它就去执行 
condition.await()方法,一旦执行了 await() 方法,线程1就会释放当前线程所占用的lock,进入阻塞状态。 - 通常会有另外一个线程,假设是线程2,抢到condition相关联的lock,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行 
condition.signal()方法,这时JVM就会从被阻塞的线程里找,找到那些等待该 condition 的线程,线程1 如果能够从await()方法返回的话一定是该线程获取了与condition相关联的lock,它的线程状态就会变成Runnable可执行状态。 
6.1 Condition 演示生产者消费者
/**
 * Description: Condition 演示生产者消费者
 * <p>
 * 生产者生产产品,可生产的最大数量 maxCount,消费者消费产品
 *
 * @author Xander
 * datetime: 2020-11-24 15:00
 */
public class ConditionDemo {
    // 可生产的最大数量
    private final int maxCount = 5;
    // 当前已有数量
    private int curCount;
    // lock
    private Lock lock = new ReentrantLock();
    // lock新建一个 notFull 的 Condition,
    // 如果notFull.await()  表示已达到最大生产数量,生产者阻塞等待
    private Condition notFull = lock.newCondition();
    // lock新建一个 notEmpty 的 Condition,
    // 如果notEmpty.await()  表示当前数量为空,消费者阻塞等待
    private Condition notEmpty = lock.newCondition();
    public static void main(String[] args) {
        ConditionDemo demo = new ConditionDemo();
        for (int i = 0; i < 3; i++) {
            Producer producer = new Producer(demo);
            new Thread(producer, "生产者P" + (i+1)).start();
        }
        for (int i = 0; i < 2; i++) {
            Consumer consumer = new Consumer(demo);
            new Thread(consumer, "消费者C" + (i+1)).start();
        }
    }
    /**
     * 生产
     */
    public void produce() {
        //获取锁
        lock.lock();
        try {
            // 抢到锁
            while (curCount >= maxCount) {
                System.out.println(Thread.currentThread().getName() + "生产已满,等待消费--");
                // notFull.await()  表示已达到最大生产数量,生产者阻塞等待
                this.notFull.await();
            }
            this.curCount++;
            System.out.println(Thread.currentThread().getName() + "生产,当前数量--" + curCount);
            this.notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //释放锁
            lock.unlock();
        }
    }
    /**
     * 消费
     */
    public void consume() {
        //获取锁
        lock.lock();
        try {
            // 抢到锁
            while (curCount <= 0) {
                System.out.println(Thread.currentThread().getName() + "无货消费,等待生产====");
                // notEmpty.await()  表示当前数量为空,消费者阻塞等待
                this.notEmpty.await();
            }
            this.curCount--;
            System.out.println(Thread.currentThread().getName() + "消费,当前剩余数量--" + curCount);
            this.notFull.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //释放锁
            lock.unlock();
        }
    }
}
//生产者
class Producer implements Runnable {
    private ConditionDemo demo;
    public Producer(ConditionDemo demo) {
        this.demo = demo;
    }
    @Override
    public void run() {
        while (true) {
            try {
                // 生产者生产
                this.demo.produce();
                TimeUnit.MILLISECONDS.sleep((long) (Math.random()*1500));//模拟生产耗时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
//消费者
class Consumer implements Runnable {
    private ConditionDemo demo;
    public Consumer(ConditionDemo demo) {
        this.demo = demo;
    }
    @Override
    public void run() {
        while (true) {
            try {
                // 消费者消费
                this.demo.consume();
                TimeUnit.MILLISECONDS.sleep((long) (Math.random()*1000));//模拟消费耗时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
运行结果:
生产者P1生产,当前数量--1
生产者P2生产,当前数量--2
生产者P3生产,当前数量--3
消费者C2消费,当前剩余数量--2
消费者C1消费,当前剩余数量--1
消费者C2消费,当前剩余数量--0
生产者P2生产,当前数量--1
消费者C1消费,当前剩余数量--0
消费者C2无货消费,等待生产====
生产者P3生产,当前数量--1
消费者C2消费,当前剩余数量--0
生产者P1生产,当前数量--1
生产者P3生产,当前数量--2
生产者P2生产,当前数量--3
消费者C1消费,当前剩余数量--2
...
小结:
上面篇幅介绍了CountDownLatch、CyclicBarrier、Semaphore 和 Exchange 的API,也分别通过 demo 讲解了这几个工具类的使用,然后介绍了 Condition 与 Object 的 wait, notify, notifyAll 的对比,Condition的API,通过demo演示如何使用 Condition 实现线程间的通信(生产者消费者案例)。
>代码:
>github.com/wengxingxia/002juc.git