手记

【Java数据结构及算法实战】系列010:Java队列04——链表实现的阻塞队列LinkedBlockingQueue

2022-04-29 21:16:001878浏览

老卫

1实战 · 202手记 · 2推荐

LinkedBlockingQueue是一种基于链表实现的可选边界的阻塞队列,该队列排序元素FIFO。队列的队首是在该队列上停留时间最长的元素,队列的队尾是在该队列上停留最短时间的元素。在队列尾部插入新的元素,队列检索操作在队列的头部获取元素。



 



在大多数并发应用程序中,基于链表实现的队列通常具有比基于数组实现的队列更高的吞吐量,但性能上未必占优势。



 



LinkedBlockingQueue在初始化时可以指定容量也可以不指定容量。当初始化LinkedBlockingQueue指定容量时,是有界队列;当初始化LinkedBlockingQueue未指定容量时,其内部会以Integer.MAX_VALUE值作为容量。当然,因为Integer.MAX_VALUE值非常大,近似无限大,因此LinkedBlockingQueue未指定容量时也可以近似认为是无界队列。



 



为防止队列的过度的扩展,建议在LinkedBlockingQueue初始化时指定容量。LinkedBlockingQueue内部的链接节点在每次入队元素时动态创建,除非这会使队列超过容量。



 



LinkedBlockingQueue类及其迭代器实现了Collection和Iterator接口的所有可选方法。LinkedBlockingQueue是Java Collections Framework的一个成员。



 



 



1.   LinkedBlockingQueue的声明


LinkedBlockingQueue的接口和继承关系如下



 



public class LinkedBlockingQueue<E> extends AbstractQueue<E>



        implements BlockingQueue<E>, java.io.Serializable {



   …



}



 



 



完整的接口继承关系如下图所示。



 







 



 



 



 



 



从上述代码可以看出,LinkedBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,又继承了java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,此处不再赘述。



 



 



 



2.   LinkedBlockingQueue的成员变量和构造函数


 



 



以下是LinkedBlockingQueue的构造函数和成员变量。



 



 



    // 容量



    private final int capacity;



 



    // 当前元素个数



    private final AtomicInteger count = new AtomicInteger();



 



    // 链表头结点



    // 不变式: head.item == null



    transient Node<E> head;



 



    // 链表尾结点



    // 不变式: last.next == null



    private transient Node<E> last;



 



    // 用于锁住take、poll等操作



    private final ReentrantLock takeLock = new ReentrantLock();



 



    // 队列非空,唤醒消费者



    private final Condition notEmpty = takeLock.newCondition();



 



    // 用于锁住put、offer等操作



    private final ReentrantLock putLock = new ReentrantLock();



 



    // 队列非满,唤醒生产者



private final Condition notFull = putLock.newCondition();



 



public LinkedBlockingQueue() {



        this(Integer.MAX_VALUE);



    }



 



    public LinkedBlockingQueue(int capacity) {



        if (capacity <= 0) throw new IllegalArgumentException();



        this.capacity = capacity;



        last = head = new Node<E>(null);



    }



 



    public LinkedBlockingQueue(Collection<? extends E> c) {



        this(Integer.MAX_VALUE);



        final ReentrantLock putLock = this.putLock;



        putLock.lock();  // 只锁可见,不互斥



        try {



            int n = 0;



            for (E e : c) {



                if (e == null)



                    throw new NullPointerException();



                if (n == capacity)



                    throw new IllegalStateException("Queue full");



                enqueue(new Node<E>(e));



                ++n;



            }



            count.set(n);



        } finally {



            putLock.unlock();



        }



 }



 



 



从上述代码可以看出,构造函数有三种。构造函数中的参数含义如下



 



l  capacity用于设置队列容量。该参数是可选的,如果未设置,则取Integer.MAX_VALUE值作为容量



l  c用于设置最初包含给定集合的元素,按集合迭代器的遍历顺序添加



 



类成员last和head分别指代链表的尾结点和头结点。链表中的结点用Node类型表示,代码如下:



 



 



static class Node<E> {



        E item;



 



        /**



         * next有以下三种场景:



         * - 真正的后继结点



         * - 当前结点是头结点,则后继结点是head.next



         * - 值为null,表示当前结点是尾结点,没有后继结点



         */



        Node<E> next;



 



        Node(E x) { item = x; }



}



 



 



访问策略是通过ReentrantLock来实现的。通过两个加锁条件notEmpty、notFull来实现并发控制。与ArrayBlockingQueue所不同的是,LinkedBlockingQueue使用了takeLock和putLock两把锁来分别锁住出队操作和入队操作。



 



count用于记录当前队列里面的元素个数。



3.   LinkedBlockingQueue的核心方法


以下对LinkedBlockingQueue常用核心方法的实现原理进行解释。



 



 



3.1.     offer(e)


执行offer(e)方法后有两种结果



l  队列未满时,返回 true



l  队列满时,返回 false



 



LinkedBlockingQueue的offer (e)方法源码如下:



 



public boolean offer(E e) {



        if (e == null) throw new NullPointerException();  // 判空



        final AtomicInteger count = this.count;



        if (count.get() == capacity)



            return false;



        final int c;



        final Node<E> node = new Node<E>(e);



        final ReentrantLock putLock = this.putLock;



        putLock.lock();  // 加锁



        try {



            if (count.get() == capacity)



                return false;



            enqueue(node); // 入队



            c = count.getAndIncrement();



            if (c + 1 < capacity)



                notFull.signal(); // 标识当前队列非满



        } finally {



            putLock.unlock(); // 解锁



        }



        if (c == 0)



            signalNotEmpty();  // 标识当前队列已经是非空



        return true;



    }



 



从上面代码可以看出,执行offer(e)方法时,分为以下几个步骤:



 



l  判断待入队的元素e是否为null。为null则抛出NullPointerException异常。



l  判断count是否超过了容量的限制,如果是则证明队列已经满了,直接返回false。



l  为了确保并发操作的安全先做了加锁处理。



l  再次判断count是否超过了容量的限制,如果是则证明队列已经满了,直接返回false;否则将元素e做入队处理,并返回true。



l  解锁。



l  c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。



 



enqueue(node)方法代码如下:



 



private void enqueue(Node<E> node) {



        last = last.next = node;



    }



 



enqueue(node)方法就在链表的尾部插入数据元素。



 



signalNotEmpty()方法代码如下:



 



private void signalNotEmpty() {



        final ReentrantLock takeLock = this.takeLock;



        takeLock.lock();



        try {



            notEmpty.signal();



        } finally {



            takeLock.unlock();



        }



    }



 



思考:细心的读者可能会发现,在offer (e)方法方法中做了两次判断count是否超过了容量的限制。那么为什么要判断两次呢?



3.2.     put(e)


执行put(e)方法后有两种结果:



•      



l  队列未满时,直接插入没有返回值



l  队列满时,会阻塞等待,一直等到队列未满时再插入



 



LinkedBlockingQueue的put(e)方法源码如下:



 



 



public void put(E e) throws InterruptedException {



        if (e == null) throw new NullPointerException();



        final int c;



        final Node<E> node = new Node<E>(e);



        final ReentrantLock putLock = this.putLock;



        final AtomicInteger count = this.count;



        putLock.lockInterruptibly();  // 获取锁



        try {



            while (count.get() == capacity) {



                notFull.await();  // 使线程等待



            }



            enqueue(node);



            c = count.getAndIncrement();



            if (c + 1 < capacity)



                notFull.signal();  // 标识当前队列非满



        } finally {



            putLock.unlock();  // 解锁



        }



        if (c == 0)



            signalNotEmpty();  // 标识当前队列已经是非空



}



 



从上面代码可以看出,put(e)方法的实现,分为以下几个步骤:



 



l  先是要获取锁。



l  而后判断count是否等于容量,如果是则证明队列已经满了,就等待;否则执行enqueue(e)方法做元素的入队。



l  解锁。



l  c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。



3.3.     offer(e,time,unit)


offer(e,time,unit)方法与offer(e)方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false。执行offer(e,time,unit)方法有两种结果:



•      



l  队列未满时,返回 true



l  队列满时,会阻塞等待,如果在指定时间内还不能往队列中插入数据则返回 false



 



LinkedBlockingQueue的offer(e,time,unit)方法源码如下:



 



public boolean offer(E e, long timeout, TimeUnit unit)



        throws InterruptedException {



 



        if (e == null) throw new NullPointerException();



        long nanos = unit.toNanos(timeout);



        final int c;



        final ReentrantLock putLock = this.putLock;



        final AtomicInteger count = this.count;



        putLock.lockInterruptibly();  // 获取锁



        try {



            while (count.get() == capacity) {



                if (nanos <= 0L)



                    return false;



                nanos = notFull.awaitNanos(nanos); // 使线程等待指定的时间



            }



            enqueue(new Node<E>(e));



            c = count.getAndIncrement();



            if (c + 1 < capacity)



                notFull.signal();  // 标识当前队列非满



        } finally {



            putLock.unlock();  // 解锁



        }



        if (c == 0)



            signalNotEmpty();  // 标识当前队列已经是非空



        return true;



}



 



从上面代码可以看出,offer(e,time,unit)方法的实现,分为以下几个步骤:



 



l  先是要获取锁。



l  而后判断count是否等于容量,如果是则证明队列已经满了,就等待;否则执行enqueue(e)方法做元素的入队。



l  解锁。



l  c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。



 



3.4.     add(e)


执行add(e)方法后有两种结果



 



l  队列未满时,返回 true



l  队列满时,则抛出异常



 



ArrayBlockingQueue的add(e)方法源码如下:



 



    public boolean add(E e) {



        return super.add(e);



    }



 



 



从上面代码可以看出,add(e)方法的实现,直接是调用了父类AbstractQueue的add(e)方法。而AbstractQueue的add(e)方法源码如下:



 



 



public boolean add(E e) {



        if (offer(e))



            return true;



        else



            throw new IllegalStateException("Queue full");



}



 



 



从上面代码可以看出,add(e)方法又调用了offer(e)方法。offer(e)方法此处不再赘述。



 



 



 



 



3.5.     poll ()


执行poll ()方法后有两种结果:



 



l  队列不为空时,返回队首值并移除



l  队列为空时,返回 null



 



 



LinkedBlockingQueue的poll ()方法源码如下:



 



public E poll() {



        final AtomicInteger count = this.count;



        if (count.get() == 0)



            return null;



        final E x;



        final int c;



        final ReentrantLock takeLock = this.takeLock;



        takeLock.lock();  // 加锁



        try {



            if (count.get() == 0)



                return null;



            x = dequeue();  // 出队



            c = count.getAndDecrement();



            if (c > 1)



                notEmpty.signal();  // 标识当前队列非空



        } finally {



            takeLock.unlock();  // 解锁



        }



        if (c == capacity)



            signalNotFull();  // 标识当前队列已经是非满



        return x;



    }



从上面代码可以看出,执行poll()方法时,分为以下几个步骤:



 



l  先是判断count是否等于0,如果等于0则证明队列为空,直接返回null。



l  为了确保并发操作的安全先做了加锁处理。



l  再次判断count是否等于0,如果等于0则证明队列为空,直接返回null;否则执行dequeue()方法做元素的出队。



l  解锁。



l  c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。



 



dequeue()方法源码如下:



 



 



private E dequeue() {



        Node<E> h = head;



        Node<E> first = h.next;



        h.next = h; // 利于GC



        head = first;



        E x = first.item;



        first.item = null;



        return x;



}



 



上面代码比较简单,就是移除链表的头结点。



 



3.6.     take()


执行take()方法后有两种结果:



 



l  队列不为空时,返回队首值并移除



l  队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值



 



LinkedBlockingQueue的take ()方法源码如下:



 



public E take() throws InterruptedException {



        final E x;



        final int c;



        final AtomicInteger count = this.count;



        final ReentrantLock takeLock = this.takeLock;



        takeLock.lockInterruptibly();  // 获取锁



        try {



            while (count.get() == 0) {



                notEmpty.await();  // 使线程等待



            }



            x = dequeue();  // 出队



            c = count.getAndDecrement();



            if (c > 1)



                notEmpty.signal();  // 标识当前队列非空



        } finally {



            takeLock.unlock();  // 解锁



        }



        if (c == capacity)



            signalNotFull();  // 标识当前队列已经是非满



        return x;



    }



 



从上面代码可以看出,执行take()方法时,分为以下几个步骤:



 



l  先是要获取锁。



l  而后判断count是否等于0,如果等于0则证明队列为空,会阻塞等待;否则执行dequeue()方法做元素的出队。



l  解锁。



l  c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。



 



dequeue()和signalNotFull ()方法此处不再赘述。



 



3.7.     poll(time,unit)


poll(time,unit)方法与poll()方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内队列还为空,则返回null。执行poll(time,unit)方法后有两种结果:



 



l  队列不为空时,返回队首值并移除



l  队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null



 



LinkedBlockingQueue的poll(time,unit)方法源码如下:



 



public E poll(long timeout, TimeUnit unit) throws InterruptedException {



        final E x;



        final int c;



        long nanos = unit.toNanos(timeout);



        final AtomicInteger count = this.count;



        final ReentrantLock takeLock = this.takeLock;



        takeLock.lockInterruptibly();  // 获取锁



        try {



            while (count.get() == 0) {



                if (nanos <= 0L)



                    return null;



                nanos = notEmpty.awaitNanos(nanos); // 使线程等待指定的时间



            }



            x = dequeue();  // 出队



            c = count.getAndDecrement();



            if (c > 1)



                notEmpty.signal();  // 标识当前队列非空



        } finally {



            takeLock.unlock();  // 解锁



        }



        if (c == capacity)



            signalNotFull();  // 标识当前队列已经是非满



        return x;



}



 



 



从上面代码可以看出,执行poll(time,unit)方法时,分为以下几个步骤:



 



l  先是要获取锁。



l  而后判断count是否等于0,如果等于0则证明队列为空,会阻塞等待;否则执行dequeue()方法做元素的出队。



l  解锁。



l  c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。



 



dequeue()和signalNotFull ()方法此处不再赘述。



 



 



3.8.     remove()


执行remove()方法后有两种结果:



 



l  队列不为空时,返回队首值并移除



l  队列为空时,抛出异常



 



LinkedBlockingQueue的remove()方法其实是调用了父类AbstractQueue的remove ()方法,源码如下:



 



public E remove() {



        E x = poll();



        if (x != null)



            return x;



        else



            throw new NoSuchElementException();



}



 



从上面代码可以看出,remove()直接调用了poll()方法。如果poll()方法返回结果为null,则抛出NoSuchElementException异常。



 



poll()方法此处不再赘述。



 



3.9.     peek()


执行peek()方法后有两种结果:



 



l  队列不为空时,返回队首值但不移除



l  队列为空时,返回null



 



 



peek()方法源码如下:



 



public E peek() {



        final AtomicInteger count = this.count;



        if (count.get() == 0)



            return null;



        final ReentrantLock takeLock = this.takeLock;



        takeLock.lock();  // 加锁



        try {



            return (count.get() > 0) ? head.next.item : null;  // 空则返回null



        } finally {



            takeLock.unlock();  // 解锁



        }



}



 



从上面代码可以看出,peek()方法比较简单,直接就是获取了链表里面头结点的元素值。



 



3.10.            element()


执行element()方法后有两种结果:



 



l  队列不为空时,返回队首值但不移除



l  队列为空时,抛出异常



 



 



element()方法其实是调用了父类AbstractQueue的element()方法,源码如下:



 



public E element() {



        E x = peek();



        if (x != null)



            return x;



        else



            throw new NoSuchElementException();



}



 



从上面代码可以看出,执行element()方法时,先是获取peek()方法的结果,如果结果是null,则抛出NoSuchElementException异常。



 



 



 



4.   LinkedBlockingQueue的单元测试


 



LinkedBlockingQueue的单元测试如下:



 



 



 



package com.waylau.java.demo.datastructure;



 



import static org.junit.jupiter.api.Assertions.assertEquals;



import static org.junit.jupiter.api.Assertions.assertFalse;



import static org.junit.jupiter.api.Assertions.assertNotNull;



import static org.junit.jupiter.api.Assertions.assertNull;



import static org.junit.jupiter.api.Assertions.assertThrows;



import static org.junit.jupiter.api.Assertions.assertTrue;



 



import java.util.NoSuchElementException;



import java.util.concurrent.LinkedBlockingQueue;



import java.util.concurrent.BlockingQueue;



import java.util.concurrent.TimeUnit;



 



import org.junit.jupiter.api.Test;



 



/**



 * LinkedBlockingQueue Test



 *



 * @since 1.0.0 2020年5月24日



 * @author <a href="https://waylau.com">Way Lau</a>



 */



class LinkedBlockingQueueTests {



    @Test



    void testOffer() {



        // 初始化队列



        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列未满时,返回 true



        boolean resultNotFull = queue.offer("Java");



        assertTrue(resultNotFull);



 



        // 测试队列满则,返回 false



        queue.offer("C");



        queue.offer("Python");



        boolean resultFull = queue.offer("C++");



        assertFalse(resultFull);



    }



 



    @Test



    void testPut() throws InterruptedException {



        // 初始化队列



        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列未满时,直接插入没有返回值;



        queue.put("Java");



 



        // 测试队列满则, 会阻塞等待,一直等到队列未满时再插入。



        queue.put("C");



        queue.put("Python");



        queue.put("C++");  // 阻塞等待



    }



 



    @Test



    void testOfferTime() throws InterruptedException {



        // 初始化队列



        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列未满时,返回 true



        boolean resultNotFull = queue.offer("Java", 5, TimeUnit.SECONDS);



        assertTrue(resultNotFull);



 



        // 测试队列满则,返回 false



        queue.offer("C");



        queue.offer("Python");



        boolean resultFull = queue.offer("C++", 5, TimeUnit.SECONDS); // 等5秒



        assertFalse(resultFull);



    }



 



    @Test



    void testAdd() {



        // 初始化队列



        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列未满时,返回 true



        boolean resultNotFull = queue.add("Java");



        assertTrue(resultNotFull);



 



        // 测试队列满则抛出异常



        queue.add("C");



        queue.add("Python");



 



        Throwable excpetion = assertThrows(IllegalStateException.class, () -> {



            queue.add("C++");// 抛异常



        });



 



        assertEquals("Queue full", excpetion.getMessage());



    }



 



    @Test



    void testPoll() throws InterruptedException {



        // 初始化队列



        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列为空时,返回 null



        String resultEmpty = queue.poll();



        assertNull(resultEmpty);



 



        // 测试队列不为空时,返回队首值并移除



        queue.put("Java");



        queue.put("C");



        queue.put("Python");



        String resultNotEmpty = queue.poll();



        assertEquals("Java", resultNotEmpty);



    }



 



    @Test



    void testTake() throws InterruptedException {



        // 初始化队列



        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列不为空时,返回队首值并移除



        queue.put("Java");



        queue.put("C");



        queue.put("Python");



        String resultNotEmpty = queue.take();



        assertEquals("Java", resultNotEmpty);



 



        // 测试队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值



        queue.clear();



        String resultEmpty = queue.take(); // 阻塞等待



        assertNotNull(resultEmpty);



    }



 



    @Test



    void testPollTime() throws InterruptedException {



        // 初始化队列



        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列不为空时,返回队首值并移除



        queue.put("Java");



        queue.put("C");



        queue.put("Python");



        String resultNotEmpty = queue.poll(5, TimeUnit.SECONDS);



        assertEquals("Java", resultNotEmpty);



 



        // 测试队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null



        queue.clear();



        String resultEmpty = queue.poll(5, TimeUnit.SECONDS); // 等待5秒



        assertNull(resultEmpty);



    }



 



    @Test



    void testRemove() throws InterruptedException {



        // 初始化队列



        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列为空时,抛出异常



        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {



            queue.remove();// 抛异常



        });



 



        assertEquals(null, excpetion.getMessage());



 



        // 测试队列不为空时,返回队首值并移除



        queue.put("Java");



        queue.put("C");



        queue.put("Python");



        String resultNotEmpty = queue.remove();



        assertEquals("Java", resultNotEmpty);



}



 



@Test



    void testPeek() throws InterruptedException {



        // 初始化队列



        Queue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列不为空时,返回队首值并但不移除



        queue.add("Java");



        queue.add("C");



        queue.add("Python");



        String resultNotEmpty = queue.peek();



        assertEquals("Java", resultNotEmpty);



        resultNotEmpty = queue.peek();



        assertEquals("Java", resultNotEmpty);



        resultNotEmpty = queue.peek();



        assertEquals("Java", resultNotEmpty);



 



        // 测试队列为空时,返回null



        queue.clear();



        String resultEmpty = queue.peek();



        assertNull(resultEmpty);



    }



 



    @Test



    void testElement() throws InterruptedException {



        // 初始化队列



        Queue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 测试队列不为空时,返回队首值并但不移除



        queue.add("Java");



        queue.add("C");



        queue.add("Python");



        String resultNotEmpty = queue.element();



        assertEquals("Java", resultNotEmpty);



        resultNotEmpty = queue.element();



        assertEquals("Java", resultNotEmpty);



        resultNotEmpty = queue.element();



        assertEquals("Java", resultNotEmpty);



 



        // 测试队列为空时,抛出异常



        queue.clear();



        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {



            queue.element();// 抛异常



        });



 



        assertEquals(null, excpetion.getMessage());



    }



}



 



 



5.   LinkedBlockingQueue的应用案例


以下是一个生产者-消费者的示例。该示例模拟了1个生产者,2个消费者。当队列满时,则会阻塞生产者生产;当队列空时,则会阻塞消费者消费。



 



package com.waylau.java.demo.datastructure;



 



import java.util.concurrent.LinkedBlockingQueue;



import java.util.concurrent.BlockingQueue;



 



/**



 * LinkedBlockingQueue Demo



 *



 * @since 1.0.0 2020年5月23日



 * @author <a href="https://waylau.com">Way Lau</a>



 */



public class LinkedBlockingQueueDemo {



    public static void main(String[] args) {



        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);



 



        // 1个生产者



        Producer p = new Producer(queue);



 



        // 2个消费者



        Consumer c1 = new Consumer("c1", queue);



        Consumer c2 = new Consumer("c2", queue);



 



        // 启动线程



        new Thread(p).start();



        new Thread(c1).start();



        new Thread(c2).start();



    }



 



}



 



class Producer implements Runnable {



    private final BlockingQueue<String> queue;



 



    Producer(BlockingQueue<String> queue) {



        this.queue = queue;



    }



 



    public void run() {



        try {



            while (true) {



                // 模拟耗时操作



                Thread.sleep(1000L);



 



                queue.put(produce());



            }



        } catch (InterruptedException ex) {



            ex.printStackTrace();



        }



    }



 



    String produce() {



        String apple = "apple: " + System.currentTimeMillis();



        System.out.println("produce " + apple);



        return apple;



    }



}



 



class Consumer implements Runnable {



    private final BlockingQueue<String> queue;



 



    private final String name;



 



    Consumer(String name, BlockingQueue<String> queue) {



        this.queue = queue;



        this.name = name;



    }



 



    public void run() {



        try {



            while (true) {



                // 模拟耗时操作



                Thread.sleep(2000L);



 



                consume(queue.take());



            }



        } catch (InterruptedException ex) {



            ex.printStackTrace();



        }



    }



 



    void consume(Object x) {



        System.out.println(this.name + " consume " + x);



    }



}



 



 



运行上述程序,输出内容如下:



 



produce apple: 1590308520134



c1 consume apple: 1590308520134



produce apple: 1590308521135



c2 consume apple: 1590308521135



produce apple: 1590308522142



c1 consume apple: 1590308522142



produce apple: 1590308523147



c2 consume apple: 1590308523147



produce apple: 1590308524156



c1 consume apple: 1590308524156



produce apple: 1590308525157



c2 consume apple: 1590308525157



produce apple: 1590308526157



c1 consume apple: 1590308526157



produce apple: 1590308527157



c2 consume apple: 1590308527157



 



6.   参考引用

本系列归档至《Java数据结构及算法实战》:https://github.com/waylau/java-data-structures-and-algorithms-in-action
《数据结构和算法基础(Java语言实现)》(柳伟卫著,北京大学出版社出版):https://item.jd.com/13014179.html

0人推荐
随时随地看视频
慕课网APP