手记

java并发面试常识之LinkedBlockingQueue

谈到ArrayBlockingQueue的特色就是循环队列,然后一把锁,2个条件,完成了功能。本来以为LinkedBlockingQueue也是这样的,结果和预期不一样,LinkedBlockingQueue利用了链表的特点,使用了两把锁,两个条件来控制。是一个锁分离的应用,下面就说说,他的实现,以及为什么ArrayBlockingQueue就不适合锁分离。
主要成员变量

    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
    private final AtomicInteger count = new AtomicInteger();
 除了两个锁,两个条件外,我这里专门列举了计数器。这个计数器很重要,重要到锁分离要依赖他才能正常运行。

锁分离

    使用双锁分离就得注意一点,那就是防止线程夯死。生产线程要唤醒生产线程,消费线程也要唤醒生产线程,消费线程唤醒消费线程,消费线程也要唤醒生产线程。
  public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        //唤醒标记
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                //阻塞生产线程
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //唤醒生产线程
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //唤醒消费线程
        if (c == 0)
            signalNotEmpty();
    }

生产线程唤醒消费线程

基于上面的介绍,我们来看代码,唤醒标记就是为了生产唤醒消费的,因为可能出现消费线程全部都已经等待了,此时生产线程运作,但是消费线程并不能自己唤醒自己,于是就有了signalNotEmpty()的操作。这里的c是getAndIncrement的值,就是获取计数之前的值。c==0的满足条件就有1个元素,在这种情况下才去唤醒消费线程。

生产线程唤醒生产线程

在获取锁后,如果发现容量达到上限,就阻塞了,等待被唤醒,如果可以加入,就执行enqueue方法,是个很简单的链表添加节点的方法。就是在原来last节点后加节点,然后更新last节点。

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

在计数器自增后,判断唤醒标记,如果还能继续生产,就去唤醒生产线程。

消费的方案思想和生产类似,这里就不说代码了。

删除

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

删除代码相对比较简单,主要是要获取两把锁,才能进行删除操作就是fullyLock()和fullyUnlock(),删除掉元素后,还要唤醒生产线程。

    void unlink(Node<E> p, Node<E> trail) {
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

ArrayBlockingQueue为何不适合锁分离

    这个主要是循环队列的原因,主要是数组和链表不同,链表队列的添加和头部的删除,都是只和一个节点相关,添加只往后加就可以,删除只从头部去掉就好。为了防止head和tail相互影响出现问题,这里就需要原子性的计数器,头部要移除,首先得看计数器是否大于0,每个添加操作,都是先加入队列,然后计数器加1,这样保证了,队列在移除的时候,长度是大于等于计数器的,通过原子性的计数器,双锁才能互不干扰。数组的一个问题就是位置的选择没有办法原子化,因为位置会循环,走到最后一个位置后就返回到第一个位置,这样的操作无法原子化,所以只能是加锁来解决。

适用场景

    LinkedBlockingQueue的优点是锁分离,那就很适合生产和消费频率差不多的场景,这样生产和消费互不干涉的执行,能达到不错的效率,尽量不使用remove操作,获取两把锁的效率更低,可以使用size方法(就是计数器直接返回),这个还是比较重要的,有些集合不适合使用size,例如ConcurrentLinkedQueue,正确应该使用isEmpty()。
1人推荐
随时随地看视频
慕课网APP