ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

并发包中的ArrayBlockingQueue和LinkedBlockingQueu源码阅读

2021-08-05 16:02:38  阅读:203  来源: 互联网

标签:count 线程 队列 lock 源码 发包 items ArrayBlockingQueue final


ArrayBlockingQueue

  • 底层基于数组实现,在对象创建时需要指定数组大小。在构建对象时,已经创建了数组。所以使用 Array 需要特别注意设定合适的队列大小,如果设置过大会造成内存浪费。如果设置内存太小,就会影响并发的性能。
  • 功能上,其内部维护了两个索引指针 putIndex 和 takeIndex。putIndex 表示下次调用 offer 时存放元素的位置,takeIndex 表示的时下次调用 take 时获取的元素。

初始化

有三个构造函数,必须设定 队列的大小, 公平和非公平可选。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。对于元素而言是FIFO的原则。

构造函数1

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);  // 默认非公平
}

构造函数2

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); // 存取用同一把锁
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

构造函数3

设定从集合中初始化队列,

public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);  // 初始化

    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);  // 保证加入的元素不为空
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

添加元素

Offer 和 Add

add方法调用offer,如果添加失败,则抛出异常。

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();  // 加锁
    try {
        if (count == items.length)  // 如果队列满了,添加失败
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

其中 enqueue 方法的如下:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;   // 在putIndex位置存放元素
    if (++putIndex == items.length)  // 更新putindex位置
        putIndex = 0;
    count++;
    notEmpty.signal();  // 通知挂载在notEmpty上的线程,去消费。
}

Put方法

put()方法添加如果不成功则会阻塞。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加可中断的锁
    try { 
        while (count == items.length)  // 如果队列满
            notFull.await();  // 释放锁,挂载到notFull条件的等待队列上
        enqueue(e);  // 入队列
    } finally {
        lock.unlock();
    }
}

还有另外一个offer方法,等待特定时间

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);  // 等待特定的纳秒直到超时,则唤醒该线程
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

取出元素

poll()方法

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();  // 退出队列
    } finally {
        lock.unlock();
    }
}

其中dequeue方法具体如下:

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;  // 指定取出的位置元素为null
    if (++takeIndex == items.length)  // 取index更新
        takeIndex = 0;
    count--;
    if (itrs != null)  // 将所有迭代器中的该元素删除
        itrs.elementDequeued();
    notFull.signal();  // 通知挂在notFull上的等待线程取获取锁。
    return x;
}

take()方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)  // 如果数量为空,则将线程挂载到notEmpty等待队列中
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

总结

对于阻塞队列通常提供的方法实现的语义:

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time, unit)
检擦方法 element() peek() 不可用 不可用

标签:count,线程,队列,lock,源码,发包,items,ArrayBlockingQueue,final
来源: https://www.cnblogs.com/hnxbp/p/15103722.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有