ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

BlockingQueue源码分析

2022-03-02 19:05:57  阅读:161  来源: 互联网

标签:分析 队列 元素 阻塞 源码 线程 null final BlockingQueue


一、阻塞队列简介

队列常被用来解决生产——消费者问题,Java中定义了Queue接口以及通用的一些抽象方法

public interface Queue<E> extends Collection<E> {
    // 添加一个元素,添加成功返回true,如果队列满了就抛出异常
    boolean add(E e);

    //添加一个元素,添加成功返回true,如果队列满了返回false
    boolean offer(E e);

    // 删除并返回队首元素,队列为空则抛出异常
    E remove();

    // 移除并返回队首元素,队列为空则返回null
    E poll();

    // 返回队首元素,但并不移除,队列为空则抛出异常
    E element();

    // 返回队首元素,但并不移除,队列为空则返回null
    E peek();
}

上面所列举出来的只是普通的队列的通用方法,而Java中的阻塞队列BlockingQueue,继承了Queue接口,同时又添加了两个具有阻塞功能的抽象方法,同时又提供了offer()poll两个可阻塞的重载方法:

通过下面阻塞方法的定义可以看出,只要是会被阻塞的方法,都会抛出InterruptedException异常

public interface BlockingQueue<E> extends Queue<E> {
    // 添加元素,队列满时,插入线程会被阻塞,直到队列不满
    void put(E e) throws InterruptedException;
    
    // 移除并返回元素,队列为空时,获取元素线程会被阻塞,直到队列非空
    E take() throws InterruptedException;
    
	// 可以指定添加元素时线程被阻塞的超时时间
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    
    // 可以指定获取元素时线程被阻塞的超时时间
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
}

BlockingQueue的常用方法做一个归纳如下:

方法抛出异常返回特定值阻塞指定阻塞时间
入队add(e)offer(e)put(e)offer(e,time,unit)
出队remove()poll()take()poll(time,unit)
获取队首元素element()peek()不支持不支持

阻塞队列除了具有可阻塞的特性之外,还有另外一个重要的特性就是容量大小,分为有界和无界。没有绝对意义的上的无界,只是这个界限很大,可以放很多元素。以LinkedBlockingQueue为例,它的容量大小为Integer.MAX_VALUE,这是一个非常大的数字,我们通常认为它就是无界的。也有一些阻塞队列是有界的,比如ArrayBlockingQueue,如果达到最大容量之后,也不会进行扩容。所以一旦满了就无法再往里面放数据了。

BlockingQueue同时也是线程安全的,它可以保证多线程的情况下,保证生产者和消费者的线程安全,其内部大多都是采用CASReentrantLock来保证线程安全,业务代码无需再关注多线程安全的问题,直接向队列里面放或者取就可以了,如图所示:

同时,阻塞队列还启动了资源隔离的作用,在复杂业务中,业务A完成后,只需要将结果丢到队列中即可,不需要关心后面的步骤,业务B会从队列中获取任务来执行对应的业务,实现了业务之间的解耦,也可以提高安全性。

下面就介绍一些常用的阻塞队列和部分核心源码

二、常用阻塞队列及核心源码分析

2.1 ArrayBlockingQueue

ArrayBlockingQueue是一个典型的有界的线程安全的阻塞队列,初始化时需要指定其容量大小,其内部元素使用数组进行存储,以put()方法为例,使用ReentrantLock来保证线程安全,通过条件队列的两个条件notEmptynotFull来进行阻塞和唤醒

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {

    final Object[] items;
    int takeIndex;
    int putIndex;
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    // 加锁保证线程安全
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 使用while而不是if,是为了防止虚假唤醒
            while (count == items.length)
                // 队列满了,生产者阻塞
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        // 如果添加一个元素队列满了之后,会被putIndex置为0,典型的环形数组的实现
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 条件队列转同步队列并唤醒线程
        notEmpty.signal();
    }
}

由于ArrayBlockingQueueput()take()方法使用ReentrantLock进行同步,同时只有一个方法可以执行,所以在高并发的情况下,性能会比较差。

思考ArrayBlockingQueue为什么采用双指针环形数组的方式?

普通的数组,删除数组元素时需要进行移位操作,导致它的时间复杂度为O(n),而采用双指针环形数组,不需要进行移位,只需要分别移动两个指针即可。

2.2 LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    
    static class Node<E> {
        E item; // 元素内容
        Node<E> next; // 下一个元素节点 单链表结构
        Node(E x) { item = x; }
    }

    // 初始化容量,默认Integer.MAX_VALUE
    private final int capacity;
    // 元素个数,因为读写操作的锁分离,这里使用线程安全的计数变量
    private final AtomicInteger count = new AtomicInteger();
    // 链表头,本身不存储元素信息,其item为null
    transient Node<E> head;
	// 链表尾元素
    private transient Node<E> last;
	// 获取元素的锁,锁分离,提高效率
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
	// 添加元素的锁
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 使用put锁,可以被中断
        putLock.lockInterruptibly();
        try {
            // 队列满了,阻塞生产者线程
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            // 返回旧值
            c = count.getAndIncrement();
            // 可能有很多线程阻塞在notFull这个条件上,而取元素时才会唤醒notFull,此处不用等到取元素时才唤醒
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // 队列之前为空,现在新增了一个元素后,可以直接去唤醒获取元素的线程
        if (c == 0)
            signalNotEmpty();
    }

}

LinkedBlockingQueue与ArrayBlockingQueue对比

  • ArrayBlockingQueue使用一个独占锁,读写不分离,而LinkedBlockeingQueue使用两个独占锁,读写操作锁分离,性能更好
  • 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
  • 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
  • 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

2.3 LinkedBlockingDeque

LinkedBlockingDeque是对LinkedBlockingQueue的增强,其顶层接口为Deque,该接口定义了更加丰富的操作队列的方法,通过方法名就可以看出来,这些方法打破了队列先进先出的固有规则,提供了可以从头部或者尾部操作的API

public interface Deque<E> extends Queue<E> {
    void addFirst(E e);
    void addLast(E e);
    boolean offerFirst(E e);
    boolean offerLast(E e);
    E removeFirst();
    E removeLast();
    E pollFirst();
    E pollLast();
    E getFirst();
    E getLast();
    E peekFirst();
    E peekLast();
}

BlockingDeque接口继承了Deque,同时又提供了几个可阻塞的方法

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
    void putFirst(E e) throws InterruptedException;
    void putLast(E e) throws InterruptedException;
    E takeFirst() throws InterruptedException;
    E takeLast() throws InterruptedException;
}

LinkedBlockingDeque实现了BlockingDeque接口,其内部通过双向链表来记录元素,通过一把ReentrantLock来保证线程安全,该类可以看成是ArrayBlockingQueueLinkedBlockingQueue的结合与增强

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {

    static final class Node<E> {
        E item;
        Node<E> prev;
        Node<E> next;
        Node(E x) {
            item = x;
        }
    }
    transient Node<E> first;
    transient Node<E> last;
    private transient int count;
    private final int capacity;
    final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }

    private boolean linkFirst(Node<E> node) {
        // 超过容量,直接返回
        if (count >= capacity)
            return false;
        Node<E> f = first;
        node.next = f;
        first = node;
        if (last == null)
            last = node;
        else
            f.prev = node;
        ++count;
        // 唤醒被阻塞的获取元素的线程
        notEmpty.signal();
        return true;
    }
}

2.4 SynchronousQueue

SynchronousQueue是一个没有缓冲的BlockingQueue,生产者线程对元素的插入操作put()必须等待消费者的移除操作take(),其模型如下图:

如上图所示,SynchronousQueue最大的不同在于,它的容量为0,没有地方来缓存元素,这就导致了每次添加元素都会被阻塞,直到有线程来取元素;同理,取元素也是一样,取元素的线程也会被阻塞,直到有线程添加元素。

由于SynchronousQueue不需要持有元素,它的作用在于直接传递,所以它非常适用于传递性场景做交换工作,生产者线程和消费者线程同步传递某些信息、事务或任务

SynchronousQueue常见的一个场景就是在Executors.newCachedThreadPool()中,因为不确定生产者的请求数量(创建任务),而这些请求又需要被及时处理,那么使用SynchronousQueue为每个生产者线程分配一个消费者线程就是处理效率最高的方式。线程池会根据需要(新任务到来)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60s之后会被回收。

下面结合源码来分析它的实现原理:

SynchronousQueue内部抽象类Transferer提供了任务传递的方法transfer(),而该方法内部包含了线程阻塞与唤醒的逻辑,而Transferer有两个实现类TransferQueueTransferStack,可以理解为存储阻塞线程的方式有两种:队列和栈。根据这两者的特性,可以分为公平和非公平的实现,队列满足FIFO(先进先出)的特性,所以是公平的实现;而栈满足LIFO(后进先出)的特性,所以是非公平的实现。

下面SynchronousQueue的构造方法,提供了公平和非公平的选项,默认为非公平实现

public SynchronousQueue() {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

下面以TransferQueue为例简要分析元素入队和出队的操作,SynchronousQueueput()take()都会去调用transfer()方法添加元素或获取元素

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

下面分析TransferQueuetransfer()方法,在分析该方法之前,先看一下它的内部类QNodeTransferQueue中使用QNode来记录元素和被阻塞的线程,其中还利用UNSAFE来获取元素和下一个节点的偏移量,直接通过CAS修改对应的数值,QNode中还有很多的CAS方法这里没有一一列举出来。

static final class TransferQueue<E> extends Transferer<E> {
    static final class QNode {
        // 下一个节点的指针
        volatile QNode next;          // next node in queue
        // 元素内容
        volatile Object item;         // CAS'ed to or from null
        // 被阻塞的线程
        volatile Thread waiter;       // to control park/unpark
        // 用于区分节点类型,false表示为取元素,true为添加元素
        final boolean isData;

        // 通过CAS修改下一个节点(多线程安全)
        boolean casNext(QNode cmp, QNode val) {
            return next == cmp &&
                UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // QNode属性的偏移量
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                // 根据Unsafe类计算属性在QNode类中的偏移量
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = QNode.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }   
    }

    // 头节点
    transient volatile QNode head;
    // 尾节点
    transient volatile QNode tail;

    // 初始化时就创建一个元素为null,数据类型为false的节点,头尾节点都指向该该节点
    TransferQueue() {
        QNode h = new QNode(null, false); // initialize to dummy node.
        head = h;
        tail = h;
    }

    // 计算头尾节点的偏移量,通过CAS直接修改(保证线程安全)
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long cleanMeOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = TransferQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
            cleanMeOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("cleanMe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

上面介绍了TransferQueue大致的内部构造,下面重点看transfer()方法实现,

E transfer(E e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        // 自旋等待初始化完成
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin
		// 为空或者当前节点
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            // 防止其他线程修改,这里再次判断
            if (t != tail)                  // inconsistent read
                continue;
            // 如果当前尾节点后面还有节点,则通过CAS把后面的节点修改为尾节点
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            // 如果需要超时阻塞,但超时时间小于0(不能阻塞),直接返回null
            // put或take方法中中断该线程并抛出中断异常
            if (timed && nanos <= 0)        // can't wait
                return null;
            // 创建一个节点,通过CAS添加到尾节点后面,这个节点可以是取元素的节点,也可以是添加元素的节点
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // failed to link in
                continue;
			// 新的节点添加完成之后,通过CAS将其修改为尾节点
            advanceTail(t, s);              // swing tail and wait
            // 自旋阻塞线程 下面重点介绍
            Object x = awaitFulfill(s, e, timed, nanos);
            
            // 如果返回的节点为当前节点,表示该节点被取消了,直接清除掉
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            // 队列不为空,且新的节点类型与队列里面的节点类型不一致(说明可以唤醒线程了)
            QNode m = h.next;               // node to fulfill
            
            // 为了保证线程安全,再次判断自旋
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            // 如果m节点已经被别的线程处理了,这里就修改头节点自旋
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
			// 当前线程去修改头节点,阻塞线程节点出队
            advanceHead(h, m);              // successfully fulfilled
            // 唤起m节点被阻塞的线程
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

transfer()中有一个重要的方法awaitFulfill,它会去进行自旋阻塞

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 根据处理器的核数计算自旋次数
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 如果当前线程被中断了,就修改S节点的item属性为当前节点
        //然后在判断节点是否被取消时就直接判断其item值是否为当前节点即可
        if (w.isInterrupted())
            s.tryCancel(e);
        // 当节点取消就返回
        Object x = s.item;
        if (x != e)
            return x;
        // 过了超时时间就取消
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        // 自旋,达到一定次数之后,填充S节点的waiter属性为当前线程,然后就阻塞
        // 至此一个节点的内容就完整了
        if (spins > 0)
            --spins;
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

2.5 PriorityBlockingQueue

PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,虽然它是无界的,但在初始化的时候,它是可以指定数组初始化容量的,它的无界是基于它可以进行动态扩容而言的。

如果没有指定初始化容量,它默认的容量为11,最大容量为Integer.MAX_VALUE - 8

private static final int DEFAULT_INITIAL_CAPACITY = 11;
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

同时,PriorityBlockingQueue是一个优先级队列,它每次出队都会返回优先级最高或最低的元素,它的构造方法中提供了自定义Comparator比较器,默认情况下使用自然顺序升序排序。

通过下面的构造方法也可以看出,该队列线程安全是由ReentrantLock来保证的,同时需要注意的是PriorityBlockingQueue不能保证同等优先级元素的顺序

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

那么PriorityBlockingQueue如果只是简单的使用数组操作来对插入元素移除进行排序,其性能将是非常低的,而它采用的是最大最小堆的方式来插入或移除数据,大小堆只是逻辑上的一种操作方式而已,其储存结构依然是数组

完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序

二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:大顶堆和小顶堆。

最大最小堆满足以下特性:

  • 最大堆:根结点的键值是所有堆结点键值中最大者
  • 最小堆:根结点的键值是所有堆结点键值中最小者

下图展示了最小二叉堆的情况:

最大最小堆按照从上到下,从左到右来一次表示索引位置,上图中右下角的数字表示该元素在数组中的索引下标

在最大最小二叉堆中,插入或移除元素时,都可能涉及到元素位置调整,而在二叉堆中,利用元素的下标索引,可以很简单的计算其父节点以及左右节点的下标(以索引下标为t的元素为例):

父节点:P(t) = (t-1) >>> 1 <=> (t-1)/2

左节点:L(t) = t <<< 1 +1 <=> t*2 +1

右节点:R(t) = t <<< 1 + 2 <=> t*2 +2

下面结合源码分析它是如何添加和移除元素的

由于PriorityBlockingQueue是无界队列,所以添加元素时线程不需要阻塞,容量不够进行扩容就可以了

public void put(E e) {
    offer(e); // never need to block
}

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    // 加锁保证线程安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 如果已经达到当前容量就进行扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        
        if (cmp == null)
            // 需要注意如果没有执行比较器,元素类必须实现Comparable接口
            siftUpComparable(n, e, array);
        else
            // 指定了比较器,就是用自定义的来做比较
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        // 添加元素后,直接唤醒被阻塞的获取元素的线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

扩容的代码如下,tryGrow()方法在offer()方法的while循环体内部,就实现了CAS+自旋的方式来实现线程安全的扩容

private void tryGrow(Object[] array, int oldCap) {
    // 释放锁,下面通过CAS来进行扩容
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 如果原来的容量小于64,则容量就扩大一倍再+2,否则容量直接扩大为原来的三倍
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            // 容量不能超过最大值
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    // 其他线程在扩容时,当前线程就让出CPU
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

核心的方法在于siftUpComparable()siftUpUsingComparator()这两个方法,这两个方法才是二叉堆入队的核心方法,以siftUpUsingComparator()为例来分析

这里面是一个while循环,进行元素的上浮操作,每次都是获取当前节点的父节点,然后与插入的元素进行比对,如果比较的结果满足最大最小堆的结构,就直接退出循环,否则就换位,继续进行比较,知道满足条件

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                   Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}

获取元素的代码如下,因为获取元素的线程会被阻塞,所以这个方法会抛出中断异常

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            // 如果没有元素就进行阻塞
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

private E dequeue() {
    // 最后一个元素的索引下标
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        // 取出最后一个元素
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                int n,
                                                Comparator<? super T> cmp) {
    if (n > 0) {
        int half = n >>> 1;
        while (k < half) {
            // 第一次进来时,取得是第二层的两个节点进行比较
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            // 如果左节点与右节点比较,满足比较条件,就把右节点的值作为与最后一个节点比较的值
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }
}

2.6 DelayQueue

DelayQueue是一个支持延时获取元素的阻塞队列,内部采用PriorityQueue存储元素,同时元素必须实现Delayed接口,接口的getDelay()方法可以返回延时时间延时的时间,方法参数为时间工具类TimeUnit

在获取元素是,只有延迟时间到了才能从队列中提取元素。

延迟队列的特点:并不是先进先出,而是按照延迟时间的长短进行排序,下一个被执行的任务排在队列的最前面

由于队列元素必须实现Delayed接口,而该接口又继承自Comparable接口,所以,元素类还要去实现compareTo()方法,这样在创建队列时就不需要在额外创建Comparator对象了,元素本身就具有了排序的能力。

下面定义了一个元素类

class DelayObject implements Delayed {
    private String name;
    private long time;   //延时时间

    public DelayObject(String name, long delayTime) {
        this.name = name;
        this.time = System.currentTimeMillis() + delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = time - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed obj) {
        if (this.time < ((DelayObject) obj).time) {
            return -1;
        }
        if (this.time > ((DelayObject) obj).time) {
            return 1;
        }
        return 0;
    }
}

使用Demo:

//实例化一个DelayQueue
BlockingQueue<DelayObject> blockingQueue = new DelayQueue<>();

//向DelayQueue添加2个元素对象,注意延时时间不同
blockingQueue.put(new DelayObject("lizhi", 1000 * 10));  //延时10秒
blockingQueue.put(new DelayObject("linan", 1000 * 30));  //延时30秒

//  取出lizhi
DelayObject lizhi = blockingQueue.take();
// 取出linan
DelayObject linan = blockingQueue.take();

下面看一下DelayQueue的构造,使用ReentrantLock来保证线程安全,取元素需要进行阻塞,底层使用PriorityQeue进行存储,这是一个优先级队列,与上面PriorityBlockingQueue是一样的,只是没有阻塞功能

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();

下面看一下具体的put()take()

public void put(E e) {
    offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    // 加锁保证线程安全
    lock.lock();
    try {
        // 调用PriorityQueue添加元素
        // 与PriorityBlockingQueue的逻辑基本一致
        q.offer(e);
        // 如果当前队列只有这一个元素,就去唤醒阻塞的线程
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

下面是take()方法,要比put()方法复杂一些

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁保证线程安全,由于线程可能会被阻塞,所以这里可中断
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 取出队列第一个元素,如果没有,直接让当前线程阻塞
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                // 如果延迟时间已到,直接取出该元素
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                // 如果有线程已经在阻塞了,就让当前线程直接去阻塞
                if (leader != null)
                    available.await();
                else {
                    // 没有线程阻塞,则记录当前线程,然后让当前线程阻塞,阻塞的时间等于最近元素的延迟时间
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        // 当前线程被唤醒后,重置leader,然后自旋
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 出队成功后,如果leader为空,并且当前对了还有元素,就去唤醒下一个被阻塞的线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

延迟队列的应用场景**:**

  • 订单超时关闭:下单后在规定时间内没有付款就取消订单
  • 异步短信通知:外卖下单成功60S之后给用户发送短信
  • 关闭空闲连接:连接池中,有一些非核心的连接在空闲一段时间后就关闭

三、选择合适的阻塞队列

我们接触的比较多的就是线程中使用阻塞队列,线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。

  • FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
  • CachedThreadPool 选取的是 SynchronousQueue
  • ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列

注:ScheduledThreadPool中使用的阻塞队列并不是DelayQueue,而是自定义实现的DelayedWorkQueue

一般从以下几个维度来选择合适的阻塞队列

  • 功能

    比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。

  • 容量

    是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。

  • 能够扩容

    因为有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。

  • 内存结构

    我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。

  • 性能

    从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。

标签:分析,队列,元素,阻塞,源码,线程,null,final,BlockingQueue
来源: https://blog.csdn.net/sermonlizhi/article/details/123237387

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有