ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ArrayBlockingQueue官方文档解读

2020-03-04 14:04:08  阅读:240  来源: 互联网

标签:capacity 队列 items throws 解读 文档 lock ArrayBlockingQueue final


public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

/**
*由数组组成的有界阻塞队列。此队列对元素FIFO(先进先出)进行排序。队列的头是队列中出现时间最长的元素。队列的尾部是在队列中出现的时*间最短的元素。在队列的尾部插入新元素,队列检索操作在队列的头部获取元素。

*这是一个典型的“有界缓冲区”,其中固定大小的数组保存生产者插入的元素和消费者提取的元素。一旦创建,就不能更改容量。将元素放入完整队
*列的尝试将导致操作阻塞;从空队列获取元素的尝试将同样阻塞。

*这个类支持一个可选的公平性策略,用于排序等待的生产者和使用者线程。默认情况下,不保证此顺序。然而,公平性设置为true的队列按FIFO
*顺序授予线程访问权。公平性通常会降低吞吐量,但会减少可变性并避免饥饿。

*这个类及其迭代器实现了集合和迭代器接口的所有可选方法。
*这个类是Java集合框架的成员。
 * @param <E> the type of elements held in this queue
 * @author Doug Lea
 * @since 1.5
 */

ArrayBlockingQueue内部使用可重入锁ReentrantLock + Condition来完成多线程环境的并发操作。
以下是重要属性:

  • final Object[] items,一个定长数组,维护ArrayBlockingQueue的元素

  • int takeIndex,int,为ArrayBlockingQueue对首位置

  • int putIndex,int,ArrayBlockingQueue对尾位置

  • int count,元素个数

  • final ReentrantLock lock,锁,ArrayBlockingQueue出列入列都必须获取该锁,两个步骤公用一个锁 ,而且全局就一个锁

  • private final Condition notEmpty,出列条件

  • private final Condition notFull,入列条件

ArrayBlockingQueue共有三个构造方法,其中最重要的是:

/**
     * 创造一个给定大小和公平机制的队列
     *
     * @param capacity the capacity of this queue
     * @param fair     if {@code true} then queue accesses for threads blocked
     *                 on insertion or removal, are processed in FIFO order;
     *                 if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException 如果指定容量小于1
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
            //队列底层是用一个数组表示
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        //出列条件
        notEmpty = lock.newCondition();
        //入列条件
        notFull = lock.newCondition();
    }
   

因为其他两个构造方法默认调用此构造方法,在这构造方法中对入队出队条件初始化
看看其他两个构造方法:

 /**
     * 创造一个指定大小的对了,默认采取非公平的策略
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException 如果指定容量小于1
     */
    public ArrayBlockingQueue(int capacity) {
    //默认调用带两个参数的构造
        this(capacity, false);
    }

  /**
     * 创造一个给定大小和公平机制的队列,并将指定集合按集合的遍历顺序添加到ArrayBlockingQueue
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @param c the collection of elements to initially contain
     * @throws IllegalArgumentException if {@code capacity} is less than
     *         {@code c.size()}, or less than 1.
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
         //默认调用带两个参数的构造                      
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c)
               // 遍历集合并判断集合元素是否为空,然后添加到数组中
                    items[i++] = Objects.requireNonNull(e);
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

重要方法:

 /**
     *将元素插入队尾,如果有可用容量,插入成功返回true,队列满了抛出 IllegalStateException异常
     *建议别用此方法,因为容易抛出异常
     * @throws IllegalStateException 如果队列满了
     * @throws NullPointerException 如果插入的元素为空
     */
    public boolean add(E e) {
        return super.add(e);
    }

    /**
     *将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量
     *在成功时返回 true,如果此队列已满,则返回 false
     *
     * @throws NullPointerException 如果插入的元素为空
     */
    public boolean offer(E e) {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
        //判断队列元素个数和数组的长度,如果相同队列满了,返回false
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
        //释放锁
            lock.unlock();
        }
    }

    /**
     * 将指定的元素加入队列,如果队列满了发生阻塞
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            while (count == items.length)
            //当队列满了就阻塞
                notFull.await();
                //有可用空间就插入元素
            enqueue(e);
        } finally {
        //释放锁
            lock.unlock();
        }
    }
//看下 **enqueue(e)方法**
 
 /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
       //将元素插入数组中
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        //此时就释放出列信号,调用了notEmpty.await()的线程可以继续在队列检索元素
        notEmpty.signal();
    }
   
//在看下dequeue()方法
  /**
     * 取出并返回元素,释放入列信号
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
           // 释放入列信号,调用了 notFull.await()的线程可以继续执行并插入元素
        notFull.signal();
        return x;
    }
    
    /**
     * 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间
     * 时间易过放弃任务
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        Objects.requireNonNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0L)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

接下来是出列方法


    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        //如果队列为空返回null,否则调用dequeue()方法返回并移除队首元素
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
            //如果队列为空就阻塞,直到队列不为空 notEmpty.signal()被调用;
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0L)
                //指定时间有误直接返回null
                    return null;
                    //阻塞指定时间
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }


   /**
     * 取出队首元素但不删除,如果为队列为空,直接返回null
     *
     * @return the number of elements in this queue
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

/**
     * 从此队列中移除指定元素的单个实例(如果存在)
     * Returns {@code true} if this queue contained the specified element
     * (or equivalently, if this queue changed as a result of the call).
     *
     * <p>Removal of interior elements in circular array based queues
     * is an intrinsically slow and disruptive operation, so should
     * be undertaken only in exceptional circumstances, ideally
     * only when the queue is known not to be accessible by other
     * threads.
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final Object[] items = this.items;
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length) i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

还有些方法是重写方法比如:


 // 重写父类的方法,返回队列大小
    // greater in size than Integer.MAX_VALUE
    /**
     * Returns the number of elements in this queue.
     *
     * @return the number of elements in this queue
     */
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
    

方法过多不一一说明如果需要查看请查阅官方文档https://docs.oracle.com/javase/8/docs/api/或者看源码

标签:capacity,队列,items,throws,解读,文档,lock,ArrayBlockingQueue,final
来源: https://blog.csdn.net/qq_34800986/article/details/104647317

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有