简介
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。
对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
关键方法与参数
核心参数
/** 要屏障的线程数 */private final int parties;/* 当线程都到达barrier,运行的 barrierCommand*/private final Runnable barrierCommand;//-------------------------函数列表------------------------------//构造函数,指定参与线程数public CyclicBarrier(int parties)//构造函数,指定参与线程数,并在所有线程到达barrier之后执行给定的barrierAction逻辑public CyclicBarrier(int parties, Runnable barrierAction);//等待所有的参与者到达barrierpublic int await();//等待所有的参与者到达barrier,或等待给定的时间public int await(long timeout, TimeUnit unit);//获取参与等待到达barrier的线程数public int getParties();//查询barrier是否处于broken状态public boolean isBroken();//重置barrier为初始状态public void reset();//返回等待barrier的线程数量public int getNumberWaiting();
Generation:每个使用中的barrier都表示为一个
generation
实例。当barrier触发trip条件或重置时generation
随之改变。使用barrier时有很多generation
与线程关联,由于不确定性的方式,锁可能分配给等待的线程。但是在同一时间只有一个是活跃的generation
(通过count
变量确定),并且其余的要么被销毁,要么被trip条件等待。如果有一个中断,但没有随后的重置,就不需要有活跃的generation
。CyclicBarrier
的可重用特性就是通过Generation
来实现,每一次触发tripped都会new一个新的Generation。barrierCommand:
CyclicBarrier
的另一个特性是在所有参与线程到达barrier触发一个自定义函数,这个函数就是barrierCommand
,在CyclicBarrier
的构造函数中初始化。
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行
源码解析
在CyclicBarrier中,最重要的方法就是await(),在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
await()
await内部调用dowait()
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //独占锁 final ReentrantLock lock = this.lock; //获取独占锁 lock.lock(); try { //保存当前"Generation" final Generation g = generation; //当前generation“已损坏”,抛BrokenBarrierException //抛该异常一般因为某个线程在等待某个处于“断开”状态的CyclicBarrier if (g.broken) throw new BrokenBarrierException(); //当前线程中断,通过breakBarrier终止CyclicBarrier if (Thread.interrupted()) { //线程被中断,终止Barrier,唤醒所有等待线程 breakBarrier(); throw new InterruptedException(); } //计数器自减 int index = --count; //如果计数器 == 0 //表示所有线程都已经到位,触发动作(是否执行某项任务) if (index == 0) { // tripped boolean ranAction = false; try { //barrierCommand线程要执行的任务 final Runnable command = barrierCommand; //执行的任务!=null,执行任务 if (command != null) command.run(); ranAction = true; //唤醒所有等待线程,并更新generation nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //循环一直执行,直到下面三个if一个条件满足才会退出循环 //自旋等待 所有parties到达 | generation被销毁 | 线程中断 | 超时 for (;;) { try { //如果不是超时等待,则调用await等待 if (!timed) trip.await(); //调用awaitNanos等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } //当前generation“已损坏”,抛出BrokenBarrierException异常 //抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrier if (g.broken) throw new BrokenBarrierException(); //generation已经更新,返回index if (g != generation) return index; //“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //释放独占锁 lock.unlock(); } }
dowait()的处理逻辑
首先判断该barrier是否已经断开了,如果断开则抛出BrokenBarrierException异常
判断计数器index是否等于0,如果等于0,则表示所有的线程准备就绪,已经到达某个公共屏障点了,barrier可以进行后续工作了(是否执行某项任务(构造函数决定));然后调用nextGeneration方法进行更新换代工作(其中会唤醒所有等待的线程);
通过for循环(for(;;))使线程一直处于等待状态。直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生。
说明:dowait()
是await()
的实现函数,它的作用就是让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。当所有parties到达barrier(count=0
),如果barrierCommand
不为空,则执行barrierCommand
。然后调用nextGeneration()
进行换代操作。
在for(;;)
自旋中。timed
是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()
进行等待;否则,调用awaitNanos()
进行超时等待。
在dowait中有Generation这样一个对象。该对象是CyclicBarrier的一个成员变量
Generation描述着CyclicBarrier的更新换代。
在CyclicBarrier中,同一批线程属于同一代。
当有parties个线程到达barrier,generation就会被更新换代。
其中broken标识该当前CyclicBarrier是否已经处于中断状态。
对于中断,CyclicBarrier是通过
breakBarrier()
实现的
在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。
在超时的判断中,CyclicBarrier根据timed的值来执行不同的wait。await、awaitNanos都是Condition中的方法。
当index = --count
等于0时,标志"有parties个线程到达barrier",临界条件到达,则执行相应的动作。执行完动作后,则调用nextGeneration
更新换代
CountDownLatch和CyclicBarrier的区别与联系
作用
CountDownLatch的作用是允许1或n个线程等待其他线程完成执行
CyclicBarrier则是允许n个线程相互等待等满足一定条件之后才能继续执行后续操作
都使用计数器实现
CountDownLatch的计数器无法被重置,只能使用一次
CyclicBarrier的计数器可以被
reset
重置后使用,因此被称为是循环的barrier
应用场景
多线程环境计算数据,最后合并计算结果
小结
CyclicBarrier
主要通过独占锁ReentrantLock
和Condition
配合实现。类本身实现很简单,重点是分清CyclicBarrier
和CountDownLatch
的用法及区别,还有在jdk1.7新增的另外一个与它们相似的同步锁Phaser
,在后面文章中会详细讲解。
作者:芥末无疆sss
链接:https://www.jianshu.com/p/486d6aa30629
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。