PriorityBlockingQueue是基于数组实现的无界优先级阻塞队列。PriorityBlockingQueue与PriorityQueue类似,其中的元素按其自然顺序排序,或由队列构造时提供的比较器根据所使用的构造函数排序。优先级队列不允许空元素,依赖自然顺序的优先级队列也不允许插入不可比较的对象。相比于PriorityQueue而言,PriorityBlockingQueue一个最大的优势是线程安全的。
PriorityBlockingQueue是Java Collections Framework的一个成员。
1. PriorityBlockingQueue的声明
PriorityBlockingQueue的接口和继承关系如下
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable { …
}
完整的接口继承关系如下图所示。
从上述代码可以看出,PriorityBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,又继承了java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,核心代码如下。
2. PriorityBlockingQueue的成员变量和构造函数
以下是PriorityBlockingQueue的构造函数和成员变量。
// 默认数组容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大数组容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 元素数组
private transient Object[] queue;
// 队列中的元素个数
private transient int size;
// 比较器
private transient Comparator<? super E> comparator;
// 操作数组确保原子性的锁
private final ReentrantLock lock = new ReentrantLock();
// 数组非空的条件判断
private final Condition notEmpty = lock.newCondition();
// 分配用Spinlock,通过CAS获取
private transient volatile int allocationSpinLock;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.comparator = comparator;
this.queue = new Object[Math.max(1, initialCapacity)];
}
public PriorityBlockingQueue(Collection<? extends E> c) {
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] es = c.toArray();
int n = es.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (es.getClass() != Object[].class)
es = Arrays.copyOf(es, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (Object e : es)
if (e == null)
throw new NullPointerException();
}
this.queue = ensureNonEmpty(es);
this.size = n;
if (heapify)
heapify();
}
从上述代码可以看出,构造函数有4种。构造函数中的参数含义如下
l initialCapacity用于设置队列中内部数组的容量。如果没有指定,则会使用默认数组容量DEFAULT_INITIAL_CAPACITY的值。
l comparator为比较器
l c用于设置最初包含给定集合的元素,按集合迭代器的遍历顺序添加
类成员queue是一个数组,用于存储队列中的元素。size用于记录队列中的元素个数。
通过ReentrantLock和加锁条件notEmpty来实现并发控制。
3. PriorityBlockingQueue的核心方法
以下对PriorityBlockingQueue常用核心方法的实现原理进行解释。
3.1. offer(e)
执行offer(e)方法后有两种结果
l 队列未达到容量时,返回 true
l 队列达到容量时,先扩容,再返回 true
PriorityBlockingQueue的offer (e)方法源码如下:
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
int n, cap;
Object[] es;
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap); // 扩容
try {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftUpComparable(n, e, es);
else
siftUpUsingComparator(n, e, es, cmp);
size = n + 1;
notEmpty.signal(); // 唤醒等待中的线程
} finally {
lock.unlock(); // 解锁
}
return true;
}
从上面代码可以看出,执行offer(e)方法时,分为以下几个步骤:
l 为了确保并发操作的安全先做了加锁处理。
l 判断待入队的元素e是否为null。为null则抛出NullPointerException异常。
l 判断当前队列中的元素是否已经大于等于队列的容量,如果是则证明队列已经满了,需要先通过tryGrow()方法扩容。
l 通过siftUpComparable ()或者siftUpUsingComparator()方法插入数据元素。
l 通过执行notEmpty.signal()方法来唤醒等待中的线程。
l 最后解锁。
tryGrow()方法源码如下:
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // 必须释放并重新获取锁
Object[] newArray = null;
if (allocationSpinLock == 0 &&
ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) :
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) {
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null)
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
siftUpComparable()方法和siftUpUsingComparator()方法源码如下:
private static <T> void siftUpComparable(int k, T x, Object[] es) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = es[parent];
if (key.compareTo((T) e) >= 0)
break;
es[k] = e;
k = parent;
}
es[k] = key;
}
private static <T> void siftUpUsingComparator(
int k, T x, Object[] es, Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = es[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
es[k] = e;
k = parent;
}
es[k] = x;
}
在上述代码中,在位置k处插入项x,通过向上提升x到树形结构中来维护堆的不变性,直到x大于或等于它的父节点或根节点。
3.2. put(e)
执行put(e)方法后有两种结果:
•
l 队列未满时,直接插入没有返回值
l 队列满时,会扩容后再插入
PriorityBlockingQueue的put (e)方法源码如下:
public void put(E e) {
offer(e); // 不会阻塞
}
从上面代码可以看出,put(e)方法的实现等同于offer(e),因此队列满时会自动扩容,再插入元素,不会阻塞队列。
3.3. offer(e,time,unit)
offer(e,time,unit)方法与offer(e)方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false。执行offer(e,time,unit)方法有两种结果:
•
l 队列未满时,返回 true
l 队列满时,先扩容,再返回 true
PriorityBlockingQueue的put (e)方法源码如下:
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // 不会阻塞
}
从上面代码可以看出,offer(e,time,unit)方法的实现等同于offer(e),因此队列满时会自动扩容,再插入元素,不会阻塞队列。
3.4. add(e)
执行add(e)方法后有有两种结果
l 队列未达到容量时,返回 true
l 队列达到容量时,先扩容,再返回 true
PriorityBlockingQueue的add(e)方法源码如下:
public boolean add(E e) {
return offer(e);
}
从上面代码可以看出,add(e)方法等同于offer(e)方法的实现。
。
3.5. poll ()
执行poll()方法后有两种结果:
l 队列不为空时,返回队首值并移除
l 队列为空时,返回 null
PriorityBlockingQueue的poll()方法源码如下:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
return dequeue(); // 出队
} finally {
lock.unlock(); // 解锁
}
}
从上面代码可以看出,执行poll()方法时,分为以下几个步骤:
l 为了确保并发操作的安全先做了加锁处理。
l 执行dequeue()方法做元素的出队。
l 最后解锁。
dequeue()方法源码如下:
private E dequeue() {
final Object[] es;
final E result;
if ((result = (E) ((es = queue)[0])) != null) {
final int n;
final E x = (E) es[(n = --size)];
es[n] = null;
if (n > 0) {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftDownComparable(0, x, es, n);
else
siftDownUsingComparator(0, x, es, n, cmp);
}
}
return result;
}
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = es[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
c = es[child = right];
if (key.compareTo((T) c) <= 0)
break;
es[k] = c;
k = child;
}
es[k] = key;
}
private static <T> void siftDownUsingComparator(
int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = es[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
c = es[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
es[k] = c;
k = child;
}
es[k] = x;
}
出队的原理是是这样的,在位置k处插入项x,通过反复将x降级到树中来维护堆的不变性,直到它小于或等于其子项或是一个叶子。
3.6. take()
执行take()方法后有两种结果:
l 队列不为空时,返回队首值并移除
l 队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值
PriorityBlockingQueue的take ()方法源码如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 获取锁
E result;
try {
while ( (result = dequeue()) == null) // 出队
notEmpty.await(); // 使线程等待
} finally {
lock.unlock(); // 解锁
}
return result;
}
从上面代码可以看出,执行take()方法时,分为以下几个步骤:
l 先是要获取锁。
l 执行dequeue()方法做元素的出队。如果出队元素是null,则线程等待。
l 最后解锁。
dequeue()方法此处不再赘述。
3.7. poll(time,unit)
poll(time,unit)方法与poll()方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内队列还为空,则返回null。执行poll(time,unit)方法后有两种结果:
l 队列不为空时,返回队首值并移除
l 队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null
PriorityBlockingQueue的poll(time,unit)方法源码如下:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 获取锁
E result;
try {
while ( (result = dequeue()) == null && nanos > 0) // 出队
nanos = notEmpty.awaitNanos(nanos); // 使线程等待指定的时间
} finally {
lock.unlock(); // 解锁
}
return result;
}
从上面代码可以看出,执行poll(time,unit)方法时,分为以下几个步骤:
l 先是要获取锁。
l 执行dequeue()方法做元素的出队。如果出队元素是null,则线程等待。
l 最后解锁。
dequeue()方法此处不再赘述。
3.8. remove()
执行remove()方法后有两种结果:
l 队列不为空时,返回队首值并移除
l 队列为空时,抛出异常
PriorityBlockingQueue的remove()方法其实是调用了父类AbstractQueue的remove ()方法,源码如下:
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
从上面代码可以看出,remove()直接调用了poll()方法。如果poll()方法返回结果为null,则抛出NoSuchElementException异常。
poll()方法此处不再赘述。
3.9. peek()
执行peek()方法后有两种结果:
l 队列不为空时,返回队首值但不移除
l 队列为空时,返回null
peek()方法源码如下:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
return (E) queue[0];
} finally {
lock.unlock(); // 解锁
}
}
从上面代码可以看出,peek()方法比较简单,直接就是获取了数组里面的索引为0的元素。
3.10. element()
执行element()方法后有两种结果:
l 队列不为空时,返回队首值但不移除
l 队列为空时,抛出异常
element()方法其实是调用了父类AbstractQueue的element()方法,源码如下:
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
从上面代码可以看出,执行element()方法时,先是获取peek()方法的结果,如果结果是null,则抛出NoSuchElementException异常。
4. PriorityBlockingQueue的单元测试
PriorityBlockingQueue的单元测试如下:
package com.waylau.java.demo.datastructure;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
/**
* PriorityBlockingQueue Tests
*
* @since 1.0.0 2020年5月24日
* @author <a href="https://waylau.com">Way Lau</a>
*/
class PriorityBlockingQueueTests {
@Test
void testOffer() {
// 初始化队列
Queue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列未满时,返回 true
boolean resultNotFull = queue.offer("Java");
assertTrue(resultNotFull);
// 测试队列达到容量时,会自动扩容
queue.offer("C");
queue.offer("Python");
boolean resultFull = queue.offer("C++"); // 扩容
assertTrue(resultFull);
}
@Test
void testPut() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列未满时,直接插入没有返回值;
queue.put("Java");
// 测试队列满则扩容。
queue.put("C");
queue.put("Python");
queue.put("C++");
}
@Test
void testOfferTime() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列未满时,返回 true
boolean resultNotFull = queue.offer("Java", 5, TimeUnit.SECONDS);
assertTrue(resultNotFull);
// 测试队列满则扩容,返回true
queue.offer("C");
queue.offer("Python");
boolean resultFull = queue.offer("C++", 5, TimeUnit.SECONDS); // 不会阻塞
assertTrue(resultFull);
}
@Test
void testAdd() {
// 初始化队列
Queue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列未满时,返回 true
boolean resultNotFull = queue.add("Java");
assertTrue(resultNotFull);
// 测试队列满则扩容,返回 true
queue.add("C");
queue.add("Python");
boolean resultFull = queue.add("C++"); // 扩容
assertTrue(resultFull);
}
@Test
void testPoll() throws InterruptedException {
// 初始化队列
Queue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列为空时,返回 null
String resultEmpty = queue.poll();
assertNull(resultEmpty);
// 测试队列不为空时,返回队首值并移除
queue.add("Java");
queue.add("C");
queue.add("Python");
String resultNotEmpty = queue.poll();
assertEquals("C", resultNotEmpty);
}
@Test
void testTake() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列不为空时,返回队首值并移除
queue.put("Java");
queue.put("C");
queue.put("Python");
String resultNotEmpty = queue.take();
assertEquals("C", resultNotEmpty);
// 测试队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值
queue.clear();
String resultEmpty = queue.take(); // 阻塞等待
assertNotNull(resultEmpty);
}
@Test
void testPollTime() throws InterruptedException {
// 初始化队列
BlockingQueue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列不为空时,返回队首值并移除
queue.put("Java");
queue.put("C");
queue.put("Python");
String resultNotEmpty = queue.poll(5, TimeUnit.SECONDS);
assertEquals("C", resultNotEmpty);
// 测试队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null
queue.clear();
String resultEmpty = queue.poll(5, TimeUnit.SECONDS); // 等待5秒
assertNull(resultEmpty);
}
@Test
void testRemove() throws InterruptedException {
// 初始化队列
Queue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列为空时,抛出异常
Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {
queue.remove();// 抛异常
});
assertEquals(null, excpetion.getMessage());
// 测试队列不为空时,返回队首值并移除
queue.add("Java");
queue.add("C");
queue.add("Python");
String resultNotEmpty = queue.remove();
assertEquals("C", resultNotEmpty);
}
@Test
void testPeek() throws InterruptedException {
// 初始化队列
Queue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列不为空时,返回队首值并但不移除
queue.add("Java");
queue.add("C");
queue.add("Python");
String resultNotEmpty = queue.peek();
assertEquals("C", resultNotEmpty);
resultNotEmpty = queue.peek();
assertEquals("C", resultNotEmpty);
resultNotEmpty = queue.peek();
assertEquals("C", resultNotEmpty);
// 测试队列为空时,返回null
queue.clear();
String resultEmpty = queue.peek();
assertNull(resultEmpty);
}
@Test
void testElement() throws InterruptedException {
// 初始化队列
Queue<String> queue = new PriorityBlockingQueue<String>(3);
// 测试队列不为空时,返回队首值并但不移除
queue.add("Java");
queue.add("C");
queue.add("Python");
String resultNotEmpty = queue.element();
assertEquals("C", resultNotEmpty);
resultNotEmpty = queue.element();
assertEquals("C", resultNotEmpty);
resultNotEmpty = queue.element();
assertEquals("C", resultNotEmpty);
// 测试队列为空时,抛出异常
queue.clear();
Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {
queue.element();// 抛异常
});
assertEquals(null, excpetion.getMessage());
}
}
5. PriorityBlockingQueue的应用案例:英雄战力排行榜
以下是一个英雄战力排行榜的示例。该示例模拟了6个英雄,可以根据英雄的战力由高至低排序。
以下是Hero类,用来代表英雄:
package com.waylau.java.demo.datastructure;
/**
* Hero
*
* @since 1.0.0 2020年5月23日
* @author <a href="https://waylau.com">Way Lau</a>
*/
public class Hero {
private String name;
private Integer power; // 战力
public Hero(String name, Integer power) {
this.name = name;
this.power = power;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getPower() {
return power;
}
public void setPower(Integer power) {
this.power = power;
}
@Override
public String toString() {
return "Hero [name=" + name + ", power=" + power + "]";
}
}
以下是应用主程序:
package com.waylau.java.demo.datastructure;
import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
/**
* PriorityBlockingQueue Demo
*
* @since 1.0.0 2020年5月24日
* @author <a href="https://waylau.com">Way Lau</a>
*/
public class PriorityBlockingQueueDemo {
public static void main(String[] args) {
int n = 6;
Queue<Hero> queue = new PriorityBlockingQueue<Hero>(n, new Comparator<Hero>() {
// 战力由大到小排序
@Override
public int compare(Hero hero0, Hero hero1) {
return hero1.getPower().compareTo(hero0.getPower());
}
});
queue.add(new Hero("Nemesis", 95));
queue.add(new Hero("Edifice Rex", 88));
queue.add(new Hero("Marquis of Death", 91));
queue.add(new Hero("Magneto", 96));
queue.add(new Hero("Hulk", 85));
queue.add(new Hero("Doctor Strange", 94));
for (int i = 0; i<n ; i++) {
System.out.println(queue.poll());
}
}
}
运行上述程序,输出内容如下:
Hero [name=Magneto, power=96]
Hero [name=Nemesis, power=95]
Hero [name=Doctor Strange, power=94]
Hero [name=Marquis of Death, power=91]
Hero [name=Edifice Rex, power=88]
Hero [name=Hulk, power=85]
6. 参考引用
本系列归档至《Java数据结构及算法实战》:https://github.com/waylau/java-data-structures-and-algorithms-in-action
《数据结构和算法基础(Java语言实现)》(柳伟卫著,北京大学出版社出版):https://item.jd.com/13014179.html