手记

【Java数据结构及算法实战】系列012:Java队列06——数组实现的优先级阻塞队列PriorityBlockingQueue

2022-05-06 22:09:059490浏览

老卫

2实战 · 249手记 · 2推荐

PriorityBlockingQueue是基于数组实现的无界优先级阻塞队列。PriorityBlockingQueue与PriorityQueue类似,其中的元素按其自然顺序排序,或由队列构造时提供的比较器根据所使用的构造函数排序。优先级队列不允许空元素,依赖自然顺序的优先级队列也不允许插入不可比较的对象。相比于PriorityQueue而言,PriorityBlockingQueue一个最大的优势是线程安全的。


 


PriorityBlockingQueue是Java Collections Framework的一个成员。


 


 


1.   PriorityBlockingQueue的声明


PriorityBlockingQueue的接口和继承关系如下


 


public class PriorityBlockingQueue<E> extends AbstractQueue<E>


    implements BlockingQueue<E>, java.io.Serializable {   …


}


 


 


完整的接口继承关系如下图所示。


 



 


 


 


 


 


从上述代码可以看出,PriorityBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,又继承了java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,核心代码如下。


 


 


2.   PriorityBlockingQueue的成员变量和构造函数


 


 


以下是PriorityBlockingQueue的构造函数和成员变量。


 


// 默认数组容量


private static final int DEFAULT_INITIAL_CAPACITY = 11;


 


// 最大数组容量


    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;


 


// 元素数组


    private transient Object[] queue;


 


// 队列中的元素个数


    private transient int size;


 


    // 比较器


    private transient Comparator<? super E> comparator;


 


// 操作数组确保原子性的锁


    private final ReentrantLock lock = new ReentrantLock();


 


// 数组非空的条件判断


    private final Condition notEmpty = lock.newCondition();


 


// 分配用Spinlock,通过CAS获取


    private transient volatile int allocationSpinLock;


 


    public PriorityBlockingQueue() {


        this(DEFAULT_INITIAL_CAPACITYnull);


    }


 


    public PriorityBlockingQueue(int initialCapacity) {


        this(initialCapacity, null);


    }


 


    public PriorityBlockingQueue(int initialCapacity,


                                 Comparator<? super E> comparator) {


        if (initialCapacity < 1)


            throw new IllegalArgumentException();


        this.comparator = comparator;


        this.queue = new Object[Math.max(1, initialCapacity)];


    }


 


    public PriorityBlockingQueue(Collection<? extends E> c) {


        boolean heapify = true; // true if not known to be in heap order


        boolean screen = true;  // true if must screen for nulls


        if (c instanceof SortedSet<?>) {


            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;


            this.comparator = (Comparator<? super E>) ss.comparator();


            heapify = false;


        }


        else if (c instanceof PriorityBlockingQueue<?>) {


            PriorityBlockingQueue<? extends E> pq =


                (PriorityBlockingQueue<? extends E>) c;


            this.comparator = (Comparator<? super E>) pq.comparator();


            screen = false;


            if (pq.getClass() == PriorityBlockingQueue.class) // exact match


                heapify = false;


        }


        Object[] es = c.toArray();


        int n = es.length;


        // If c.toArray incorrectly doesn't return Object[], copy it.


        if (es.getClass() != Object[].class)


            es = Arrays.copyOf(es, n, Object[].class);


        if (screen && (n == 1 || this.comparator != null)) {


            for (Object e : es)


                if (e == null)


                    throw new NullPointerException();


        }


        this.queue = ensureNonEmpty(es);


        this.size = n;


        if (heapify)


            heapify();


    }


 


 


从上述代码可以看出,构造函数有4种。构造函数中的参数含义如下


 


l  initialCapacity用于设置队列中内部数组的容量。如果没有指定,则会使用默认数组容量DEFAULT_INITIAL_CAPACITY的值。


l  comparator为比较器


l  c用于设置最初包含给定集合的元素,按集合迭代器的遍历顺序添加


 


类成员queue是一个数组,用于存储队列中的元素。size用于记录队列中的元素个数。


 


通过ReentrantLock和加锁条件notEmpty来实现并发控制。


 


 


3.   PriorityBlockingQueue的核心方法


以下对PriorityBlockingQueue常用核心方法的实现原理进行解释。


 


 


3.1.     offer(e)


执行offer(e)方法后有两种结果


 


l  队列未达到容量时,返回 true


l  队列达到容量时,先扩容,再返回 true


 


PriorityBlockingQueue的offer (e)方法源码如下:


 


public boolean offer(E e) {


        if (e == null)


            throw new NullPointerException();


        final ReentrantLock lock = this.lock;


        lock.lock();  // 加锁


        int n, cap;


        Object[] es;


        while ((n = size) >= (cap = (es = queue).length))


            tryGrow(es, cap);  // 扩容


        try {


            final Comparator<? super E> cmp;


            if ((cmp = comparator) == null)


                siftUpComparable(n, e, es);


            else


                siftUpUsingComparator(n, e, es, cmp);


            size = n + 1;


            notEmpty.signal();  // 唤醒等待中的线程


        } finally {


            lock.unlock();  // 解锁


        }


        return true;


    }


 


从上面代码可以看出,执行offer(e)方法时,分为以下几个步骤:


 


l  为了确保并发操作的安全先做了加锁处理。


l  判断待入队的元素e是否为null。为null则抛出NullPointerException异常。


l  判断当前队列中的元素是否已经大于等于队列的容量,如果是则证明队列已经满了,需要先通过tryGrow()方法扩容。


l  通过siftUpComparable ()或者siftUpUsingComparator()方法插入数据元素。


l  通过执行notEmpty.signal()方法来唤醒等待中的线程。


l  最后解锁。


 


tryGrow()方法源码如下:


 


 


private void tryGrow(Object[] array, int oldCap) {


        lock.unlock(); // 必须释放并重新获取锁


        Object[] newArray = null;


        if (allocationSpinLock == 0 &&


            ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {


            try {


                int newCap = oldCap + ((oldCap < 64) ?


                                       (oldCap + 2) :


                                       (oldCap >> 1));


                if (newCap - MAX_ARRAY_SIZE > 0) {


                    int minCap = oldCap + 1;


                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)


                        throw new OutOfMemoryError();


                    newCap = MAX_ARRAY_SIZE;


                }


                if (newCap > oldCap && queue == array)


                    newArray = new Object[newCap];


            } finally {


                allocationSpinLock = 0;


            }


        }


        if (newArray == null)


            Thread.yield();


        lock.lock();


        if (newArray != null && queue == array) {


            queue = newArray;


            System.arraycopy(array, 0, newArray, 0, oldCap);


        }


}


 


siftUpComparable()方法和siftUpUsingComparator()方法源码如下:


 


private static <T> void siftUpComparable(int k, T x, Object[] es) {


        Comparable<? super T> key = (Comparable<? super T>) x;


        while (k > 0) {


            int parent = (k - 1) >>> 1;


            Object e = es[parent];


            if (key.compareTo((T) e) >= 0)


                break;


            es[k] = e;


            k = parent;


        }


        es[k] = key;


    }


 


    private static <T> void siftUpUsingComparator(


        int k, T x, Object[] es, Comparator<? super T> cmp) {


        while (k > 0) {


            int parent = (k - 1) >>> 1;


            Object e = es[parent];


            if (cmp.compare(x, (T) e) >= 0)


                break;


            es[k] = e;


            k = parent;


        }


        es[k] = x;


    }


 


在上述代码中,在位置k处插入项x,通过向上提升x到树形结构中来维护堆的不变性,直到x大于或等于它的父节点或根节点。


 


3.2.     put(e)


执行put(e)方法后有两种结果:


•      


l  队列未满时,直接插入没有返回值


l  队列满时,会扩容后再插入


 


PriorityBlockingQueue的put (e)方法源码如下:


 


public void put(E e) {


        offer(e); // 不会阻塞


    }


 


从上面代码可以看出,put(e)方法的实现等同于offer(e),因此队列满时会自动扩容,再插入元素,不会阻塞队列。


 


3.3.     offer(e,time,unit)


offer(e,time,unit)方法与offer(e)方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false。执行offer(e,time,unit)方法有两种结果:


•      


l  队列未满时,返回 true


l  队列满时,先扩容,再返回 true


 


PriorityBlockingQueue的put (e)方法源码如下:


 


public boolean offer(E e, long timeout, TimeUnit unit) {


        return offer(e); // 不会阻塞


}


 


从上面代码可以看出,offer(e,time,unit)方法的实现等同于offer(e),因此队列满时会自动扩容,再插入元素,不会阻塞队列。


 


3.4.     add(e)


执行add(e)方法后有有两种结果


 


l  队列未达到容量时,返回 true


l  队列达到容量时,先扩容,再返回 true


 


PriorityBlockingQueue的add(e)方法源码如下:


 


    public boolean add(E e) {


        return offer(e);


}


 


从上面代码可以看出,add(e)方法等同于offer(e)方法的实现。



 


 


 


3.5.     poll ()


执行poll()方法后有两种结果:


 


l  队列不为空时,返回队首值并移除


l  队列为空时,返回 null


 


 


PriorityBlockingQueue的poll()方法源码如下:


 


public E poll() {


        final ReentrantLock lock = this.lock;


        lock.lock();  // 加锁


        try {


            return dequeue(); // 出队


        } finally {


            lock.unlock();  // 解锁


        }


}


 


从上面代码可以看出,执行poll()方法时,分为以下几个步骤:


 


l  为了确保并发操作的安全先做了加锁处理。


l  执行dequeue()方法做元素的出队。


l  最后解锁。


 


dequeue()方法源码如下:


 


 


 


private E dequeue() {


        final Object[] es;


        final E result;


 


        if ((result = (E) ((es = queue)[0])) != null) {


            final int n;


            final E x = (E) es[(n = --size)];


            es[n] = null;


            if (n > 0) {


                final Comparator<? super E> cmp;


                if ((cmp = comparator) == null)


                    siftDownComparable(0, x, es, n);


                else


                    siftDownUsingComparator(0, x, es, n, cmp);


            }


        }


        return result;


    }


 


private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {


        Comparable<? super T> key = (Comparable<? super T>)x;


        int half = n >>> 1;


        while (k < half) {


            int child = (k << 1) + 1;


            Object c = es[child];


            int right = child + 1;


            if (right < n &&


                ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)


                c = es[child = right];


            if (key.compareTo((T) c) <= 0)


                break;


            es[k] = c;


            k = child;


        }


        es[k] = key;


    }


 


    private static <T> void siftDownUsingComparator(


        int k, T x, Object[] es, int n, Comparator<? super T> cmp) {


        int half = n >>> 1;


        while (k < half) {


            int child = (k << 1) + 1;


            Object c = es[child];


            int right = child + 1;


            if (right < n && cmp.compare((T) c, (T) es[right]) > 0)


                c = es[child = right];


            if (cmp.compare(x, (T) c) <= 0)


                break;


            es[k] = c;


            k = child;


        }


        es[k] = x;


}


 


出队的原理是是这样的,在位置k处插入项x,通过反复将x降级到树中来维护堆的不变性,直到它小于或等于其子项或是一个叶子。


 


 


3.6.     take()


执行take()方法后有两种结果:


 


l  队列不为空时,返回队首值并移除


l  队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值


 


PriorityBlockingQueue的take ()方法源码如下:


 


public E take() throws InterruptedException {


        final ReentrantLock lock = this.lock;


        lock.lockInterruptibly();  // 获取锁


        E result;


        try {


            while ( (result = dequeue()) == null)  // 出队


                notEmpty.await();  // 使线程等待


        } finally {


            lock.unlock();  // 解锁


        }


        return result;


    }


从上面代码可以看出,执行take()方法时,分为以下几个步骤:


 


l  先是要获取锁。


l  执行dequeue()方法做元素的出队。如果出队元素是null,则线程等待。


l  最后解锁。


 


dequeue()方法此处不再赘述。


 


3.7.     poll(time,unit)


poll(time,unit)方法与poll()方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内队列还为空,则返回null。执行poll(time,unit)方法后有两种结果:


 


l  队列不为空时,返回队首值并移除


l  队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null


 


PriorityBlockingQueue的poll(time,unit)方法源码如下:


 


public E poll(long timeout, TimeUnit unit) throws InterruptedException {


        long nanos = unit.toNanos(timeout);


        final ReentrantLock lock = this.lock;


        lock.lockInterruptibly();  // 获取锁


        E result;


        try {


            while ( (result = dequeue()) == null && nanos > 0) // 出队


                nanos = notEmpty.awaitNanos(nanos);  // 使线程等待指定的时间


        } finally {


            lock.unlock();  // 解锁


        }


        return result;


}


 


从上面代码可以看出,执行poll(time,unit)方法时,分为以下几个步骤:


 


l  先是要获取锁。


l  执行dequeue()方法做元素的出队。如果出队元素是null,则线程等待。


l  最后解锁。


 


dequeue()方法此处不再赘述。


 


 


3.8.     remove()


执行remove()方法后有两种结果:


 


l  队列不为空时,返回队首值并移除


l  队列为空时,抛出异常


 


PriorityBlockingQueue的remove()方法其实是调用了父类AbstractQueue的remove ()方法,源码如下:


 


public E remove() {


        E x = poll();


        if (x != null)


            return x;


        else


            throw new NoSuchElementException();


}


 


从上面代码可以看出,remove()直接调用了poll()方法。如果poll()方法返回结果为null,则抛出NoSuchElementException异常。


 


poll()方法此处不再赘述。


 


 


 


3.9.     peek()


执行peek()方法后有两种结果:


 


l  队列不为空时,返回队首值但不移除


l  队列为空时,返回null


 


 


peek()方法源码如下:


 


public E peek() {


        final ReentrantLock lock = this.lock;


        lock.lock();  // 加锁


        try {


            return (E) queue[0];


        } finally {


            lock.unlock();  // 解锁


        }


}


 


从上面代码可以看出,peek()方法比较简单,直接就是获取了数组里面的索引为0的元素。


 


3.10.            element()


执行element()方法后有两种结果:


 


l  队列不为空时,返回队首值但不移除


l  队列为空时,抛出异常


 


 


element()方法其实是调用了父类AbstractQueue的element()方法,源码如下:


 


public E element() {


        E x = peek();


        if (x != null)


            return x;


        else


            throw new NoSuchElementException();


}


 


从上面代码可以看出,执行element()方法时,先是获取peek()方法的结果,如果结果是null,则抛出NoSuchElementException异常。


 


 


4.   PriorityBlockingQueue的单元测试


 


PriorityBlockingQueue的单元测试如下:


 


 


 


 


package com.waylau.java.demo.datastructure;


 


import static org.junit.jupiter.api.Assertions.assertEquals;


import static org.junit.jupiter.api.Assertions.assertNotNull;


import static org.junit.jupiter.api.Assertions.assertNull;


import static org.junit.jupiter.api.Assertions.assertThrows;


import static org.junit.jupiter.api.Assertions.assertTrue;


 


import java.util.NoSuchElementException;


import java.util.Queue;


import java.util.concurrent.BlockingQueue;


import java.util.concurrent.PriorityBlockingQueue;


import java.util.concurrent.TimeUnit;


 


import org.junit.jupiter.api.Test;


 


/**


 * PriorityBlockingQueue Tests


 *


 * @since 1.0.0 2020年5月24日


 * @author <a href="https://waylau.com">Way Lau</a>


 */


class PriorityBlockingQueueTests {


    @Test


    void testOffer() {


        // 初始化队列


        Queue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列未满时,返回 true


        boolean resultNotFull = queue.offer("Java");


        assertTrue(resultNotFull);


 


        // 测试队列达到容量时,会自动扩容


        queue.offer("C");


        queue.offer("Python");


        boolean resultFull = queue.offer("C++"); // 扩容


        assertTrue(resultFull);


    }


 


    @Test


    void testPut() throws InterruptedException {


        // 初始化队列


        BlockingQueue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列未满时,直接插入没有返回值;


        queue.put("Java");


 


        // 测试队列满则扩容。


        queue.put("C");


        queue.put("Python");


        queue.put("C++");


    }


 


    @Test


    void testOfferTime() throws InterruptedException {


        // 初始化队列


        BlockingQueue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列未满时,返回 true


        boolean resultNotFull = queue.offer("Java", 5, TimeUnit.SECONDS);


        assertTrue(resultNotFull);


 


        // 测试队列满则扩容,返回true


        queue.offer("C");


        queue.offer("Python");


        boolean resultFull = queue.offer("C++", 5, TimeUnit.SECONDS); // 不会阻塞


        assertTrue(resultFull);


    }


 


   


    @Test


    void testAdd() {


        // 初始化队列


        Queue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列未满时,返回 true


        boolean resultNotFull = queue.add("Java");


        assertTrue(resultNotFull);


 


        // 测试队列满则扩容,返回 true


        queue.add("C");


        queue.add("Python");


        boolean resultFull = queue.add("C++"); // 扩容


        assertTrue(resultFull);


    }


 


    @Test


    void testPoll() throws InterruptedException {


        // 初始化队列


        Queue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列为空时,返回 null


        String resultEmpty = queue.poll();


        assertNull(resultEmpty);


 


        // 测试队列不为空时,返回队首值并移除


        queue.add("Java");


        queue.add("C");


        queue.add("Python");


        String resultNotEmpty = queue.poll();


        assertEquals("C", resultNotEmpty);


    }


 


    @Test


    void testTake() throws InterruptedException {


        // 初始化队列


        BlockingQueue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列不为空时,返回队首值并移除


        queue.put("Java");


        queue.put("C");


        queue.put("Python");


        String resultNotEmpty = queue.take();


        assertEquals("C", resultNotEmpty);


 


        // 测试队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值


        queue.clear();


        String resultEmpty = queue.take(); // 阻塞等待


        assertNotNull(resultEmpty);


    }


 


    @Test


    void testPollTime() throws InterruptedException {


        // 初始化队列


        BlockingQueue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列不为空时,返回队首值并移除


        queue.put("Java");


        queue.put("C");


        queue.put("Python");


        String resultNotEmpty = queue.poll(5, TimeUnit.SECONDS);


        assertEquals("C", resultNotEmpty);


 


        // 测试队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null


        queue.clear();


        String resultEmpty = queue.poll(5, TimeUnit.SECONDS); // 等待5秒


        assertNull(resultEmpty);


    }


   


    @Test


    void testRemove() throws InterruptedException {


        // 初始化队列


        Queue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列为空时,抛出异常


        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {


            queue.remove();// 抛异常


        });


 


        assertEquals(null, excpetion.getMessage());


 


        // 测试队列不为空时,返回队首值并移除


        queue.add("Java");


        queue.add("C");


        queue.add("Python");


        String resultNotEmpty = queue.remove();


        assertEquals("C", resultNotEmpty);


    }


 


    @Test


    void testPeek() throws InterruptedException {


        // 初始化队列


        Queue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列不为空时,返回队首值并但不移除


        queue.add("Java");


        queue.add("C");


        queue.add("Python");


        String resultNotEmpty = queue.peek();


        assertEquals("C", resultNotEmpty);


        resultNotEmpty = queue.peek();


        assertEquals("C", resultNotEmpty);


        resultNotEmpty = queue.peek();


        assertEquals("C", resultNotEmpty);


 


        // 测试队列为空时,返回null


        queue.clear();


        String resultEmpty = queue.peek();


        assertNull(resultEmpty);


    }


 


    @Test


    void testElement() throws InterruptedException {


        // 初始化队列


        Queue<String> queue = new PriorityBlockingQueue<String>(3);


 


        // 测试队列不为空时,返回队首值并但不移除


        queue.add("Java");


        queue.add("C");


        queue.add("Python");


        String resultNotEmpty = queue.element();


        assertEquals("C", resultNotEmpty);


        resultNotEmpty = queue.element();


        assertEquals("C", resultNotEmpty);


        resultNotEmpty = queue.element();


        assertEquals("C", resultNotEmpty);


 


        // 测试队列为空时,抛出异常


        queue.clear();


        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {


            queue.element();// 抛异常


        });


 


        assertEquals(null, excpetion.getMessage());


    }


}


 


 


5.   PriorityBlockingQueue的应用案例:英雄战力排行榜


以下是一个英雄战力排行榜的示例。该示例模拟了6个英雄,可以根据英雄的战力由高至低排序。


 


以下是Hero类,用来代表英雄:


 


 


package com.waylau.java.demo.datastructure;


 


 


/**


 * Hero


 *


 * @since 1.0.0 2020年5月23日


 * @author <a href="https://waylau.com">Way Lau</a>


 */


public class Hero {


 


    private String name;


   


    private Integer power; // 战力


   


    public Hero(String name, Integer power) {


        this.name = name;


        this.power = power;


    }


   


    public String getName() {


        return name;


    }


 


    public void setName(String name) {


        this.name = name;


    }


 


    public Integer getPower() {


        return power;


    }


 


    public void setPower(Integer power) {


        this.power = power;


    }


 


    @Override


    public String toString() {


        return "Hero [name=" + name + ", power=" + power + "]";


    }


 


}


 


 


以下是应用主程序:


 


 


 


 


 


package com.waylau.java.demo.datastructure;


 


import java.util.Comparator;


import java.util.Queue;


import java.util.concurrent.PriorityBlockingQueue;


 


/**


 * PriorityBlockingQueue Demo


 *


 * @since 1.0.0 2020年5月24日


 * @author <a href="https://waylau.com">Way Lau</a>


 */


public class PriorityBlockingQueueDemo {


    public static void main(String[] args) {


        int n = 6;


       


        Queue<Hero> queue = new PriorityBlockingQueue<Hero>(n, new Comparator<Hero>() {


            // 战力由大到小排序


            @Override


            public int compare(Hero hero0, Hero hero1) {


                return hero1.getPower().compareTo(hero0.getPower());


            }


        });


 


        queue.add(new Hero("Nemesis", 95));


        queue.add(new Hero("Edifice Rex", 88));


        queue.add(new Hero("Marquis of Death", 91));


        queue.add(new Hero("Magneto", 96));


        queue.add(new Hero("Hulk", 85));


        queue.add(new Hero("Doctor Strange", 94));


 


       


        for (int i = 0; i<n ; i++) {


            System.out.println(queue.poll());


        }


    }


 


}


 


 


 


 


运行上述程序,输出内容如下:


 


Hero [name=Magneto, power=96]


Hero [name=Nemesis, power=95]


Hero [name=Doctor Strange, power=94]


Hero [name=Marquis of Death, power=91]


Hero [name=Edifice Rex, power=88]


Hero [name=Hulk, power=85]


6.   参考引用


本系列归档至《Java数据结构及算法实战》:https://github.com/waylau/java-data-structures-and-algorithms-in-action
《数据结构和算法基础(Java语言实现)》(柳伟卫著,北京大学出版社出版):https://item.jd.com/13014179.html


0人推荐
随时随地看视频
慕课网APP