ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ArrayBlockingQueue

2021-08-22 22:33:44  阅读:183  来源: 互联网

标签:putIndex 队列 lock public items ArrayBlockingQueue final


目录
阻塞队列可以用于线程池的等待队列,生产者消费者的通信通道,本文讲解ArrayBlockingQueue。参考Collection之BlockingQueue)

根据类名,可以知道这个数据结构是队列,因此数据的进出顺序是FIFO;阻塞的含义为,当需要生产/消费时,队列没有空间/数据,则对应的操作会阻塞住,直到其他操作解除这个阻塞状态

ArrayBlockingQueue作为阻塞队列,特点是内部使用Object数组(使用成了环形),并且创建时需要指定大小,有两个指针分别对应生产和消费操作

里面的迭代器弄个专题一起看吧

成员变量:

/** 队列里元素数量 */
int count;
/** 存储结构 */
final Object[] items;

/** 为下一次执行 take, poll, peek or remove 操作提供的index */
int takeIndex;

/** 为下一次执行 put, offer, or add 操作提供的index */
int putIndex;

/** 进行并发控制,以及线程间通信的锁(通信主要靠condition实现) */
final ReentrantLock lock;

/** 当队列为空时获取等待 */
private final Condition notEmpty;

/** 当队列满时插入等待 */
private final Condition notFull;

主要方法:

// 在不超过队列容量的情况下立即插入指定的元素,成功后返回true,如果队列已满则抛出IllegalStateException。
public boolean add(E e)

// 在不超过队列容量的情况下立即在队列末尾插入指定的元素,如果成功则返回true,如果队列已满则返回false。此方法通常比add(E)方法更好,后者插入元素失败只能抛出异常。
public boolean offer(E e)

// 将指定的元素插入到此队列的末尾,如果队列已满则等待直到有可用的空间。
public void put(E e) throws InterruptedException

// 将指定的元素插入到此队列的末尾,如果队列已满,则在指定的超时时间之内等待空间可用,超时返回false。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

// 检索并删除此队列的头,如果此队列为空,则返回null。
public E poll()

// 检索并删除此队列的头,如有必要则等待,直到某个元素可用为止。
public E take() throws InterruptedException

// 检索并删除此队列的头,如果有必要则在指定的等待时间之内等待元素可用,超时返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException

// 检索但不删除此队列的头,或在此队列为空时返回null。
public E peek()

// 返回此队列中的元素数量。
public int size()

// 返回此队列在理想情况下(在没有内存或资源约束的情况下)可以不阻塞地接受的新元素的数量。它总是等于这个队列的初始容量减去这个队列的当前大小。
public int remainingCapacity()

// 如果指定元素存在,则从此队列中移除该元素的单个实例。更正式地说,如果队列中包含一个或多个这样的元素,则只删除匹配到的第一个元素
public boolean remove(Object o)

// 如果此队列包含至少一个指定的元素,则返回true。
public boolean contains(Object o)

// 返回一个数组,该数组包含此队列中的所有元素,按适当的顺序排列。返回的数组将是“安全的”,因为此队列不维护对它的引用。
public Object[] toArray()

// 返回一个数组,该数组包含此队列中的所有元素,按适当的顺序排列;返回数组的运行时类型是指定数组的运行时类型。
public <T> T[] toArray(T[] a)

// 返回此集合的字符串表示形式。
public String toString()

// 删除此队列中的所有元素。此调用返回后,队列将为空。
public void clear()

// 从此队列中删除所有可用元素并将它们添加到给定集合中。此操作可能比重复轮询此队列更有效。在试图将元素添加到集合c时遇到失败抛出相关异常时可能会导致:元素不在原集合或者集合c中,或者两个集合中都没有。
public int drainTo(Collection<? super E> c)

// 从该队列中最多删除给定数量的可用元素,并将它们添加到给定集合中。异常情况同上
public int drainTo(Collection<? super E> c, int maxElements)

// 按适当的顺序返回此队列中元素的迭代器。元素将按从第一个(head)到最后一个(tail)的顺序返回。返回的迭代器是弱一致的。
public Iterator<E> iterator()

// 返回该队列中元素的Spliterator。返回的spliterator是弱一致的。
public Spliterator<E> spliterator()

方法中能和阻塞联系起来的,是put和take,同时offer和poll,也提供了对应的超时控制,重点关注这四个方法

checkNotNull(Object v)

    /**
     * Throws NullPointerException if argument is null.
     *
     * @param v the element
     */
    private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
    }

enqueue(E x)

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        //获取到Object[]的引用
        final Object[] items = this.items;
        //将putIndex位置上位置为x
        items[putIndex] = x;
        //把数组当成了一个环形的去使用
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //通知说现在队列有数据了
        notEmpty.signal();
    }

put(E e)

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        //对e进行非空判断
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            //核心是这个while循环,阻塞在notFull上,等待notFull.signal
            //关于conditon,还需要再搞搞AQS的源码,不然不懂为啥这里是while,不能用if
            while (count == items.length)
                notFull.await();
            //将e入队
            //这里为什么不需要对迭代器进行操作呢?
            enqueue(e);
        } finally {
            //释放锁
            lock.unlock();
        }
    }

offer(E e, long timeout, TimeUnit unit)

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * up to the specified wait time for space to become available if
     * the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                //这个方法也是在AQS里面实现的,后面再专门弄下吧
                //我猜测是等待nanos时间,到点了就返回一个《=0的值
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

dequeue()

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //保存takeIndex对应的数据
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        //当作一个环形数组使用
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        //这里对我来说也是很不熟练的地方,关于迭代器,后面也得专门弄一弄,比如ArrayList,Map之类的
        if (itrs != null)
            itrs.elementDequeued();
        //发送notFull的信号
        notFull.signal();
        //返回保存的中间结果
        return x;
    }

take()

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                //当前队列没有数据,则等待直到notEmpty发来信号
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

poll(long timeout, TimeUnit unit)

和前面offer一样,都只是加上了一个等待时间的机制

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

removeAt(final int removeIndex)

    /**
     * Deletes item at array index removeIndex.
     * Utility for remove(Object) and iterator.remove.
     * Call only when holding lock.
     */
    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        //如果要删除的下标刚好是takeIndex,当作一次普通的出队即可
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            //对迭代器进行操作
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            //把removeIndex后面的数据,都向前挪动一位
            for (int i = removeIndex;;) {
                int next = i + 1;
                //根据环形队列,来获取i的next下标
                if (next == items.length)
                    next = 0;
                //如果next不是putIndex,说明不为null,向前挪动
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    //此时 i == putIndex,将i处的数据置为null(上一步已经向前挪动了)
                    //将putIndex更新为i
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            //更新迭代器
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        //发送notFull的信号
        notFull.signal();
    }

remove(Object o)

特定删除,服用了removeAt()方法

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    //先找到o对应的下标i
                    if (o.equals(items[i])) {
                        //针对具体的下标,进行删除
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex) //如果i遍历到了putIndex,说明队列中没有o;
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

标签:putIndex,队列,lock,public,items,ArrayBlockingQueue,final
来源: https://www.cnblogs.com/zjytrhy/p/15173889.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有