ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

学习BlockingQueue的实现原理

2020-03-18 11:51:11  阅读:254  来源: 互联网

标签:队列 lock 阻塞 学习 线程 items 原理 对列 BlockingQueue


 

一:对列的基本概念

 

 

  1:对列   队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行 删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受 限制的线性表。

进行插入操作的端称为队尾,进行删除操作的端称为队头。 在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。 因为队列只允许在一端插入,在另一端删除,

所以只有最早进入队列的元素才能 最先从队列中删除,故队列又称为先进先出(FIFO—firstinfirstout)线性表

  

       2:阻塞对列

a)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程, 直到队列不满。

b)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队 列变为非空。 

 

二:常见的阻塞对列类型

·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。

·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

·DelayQueue:一个使用优先级队列实现的无界阻塞队列。

·SynchronousQueue:一个不存储元素的阻塞队列。

·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

以上的阻塞队列都实现了 BlockingQueue 接口,也都是线程安全的。

 

三:实现方式

1:ArrayBlockingQueue 

  ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对 元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指 阻塞的线程,

可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访 问队列的资格,

有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置。

示例代码:

public class AnalysisBlockingQueue {
    public static void main(String[] args) {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(1000000);
        new Thread(new MyProducer(blockingQueue),"producer-thread").start();
        new Thread(new MyConsumer(blockingQueue),"consumer-thread").start();
    }
}

class MyProducer implements Runnable{

    private BlockingQueue blockingQueue;

    MyProducer(BlockingQueue blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override public void run() {
        for(int i=0;i<100;i++){
            System.out.println(Thread.currentThread()+"正在生产第 "+i+" 个数据");
            blockingQueue.offer(i);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class MyConsumer implements Runnable{

    private BlockingQueue blockingQueue;

    MyConsumer(BlockingQueue blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override public void run() {
        while(true){
            Object obj= blockingQueue.poll();
            System.out.println("正在消费数据 "+obj);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

 

执行结果:

 

 

来看一下ArrayBlockingQueue这个对列的源码

 

 

 

 这个构造函数,支持设置容量,对列对于线程并发来说默认是非公平的

当然支持配置,也可以配置为公平的,无论对于生产者还是消费者,多个生产者线程(无论是先阻塞的还是后来的线程,抢占

对列资源是不公平的,后来的线程也可以先把数据放到对列中)

 

 

还支持将集合中的元素提前放入对列中

 

 

 

ArrayBlockingQueue只使用了一把锁,生产者和消费者在同一时刻只能进行一个操作,

 

在构造的时候对这个锁进行初始化 ,公平和非公平也是通过这把锁控制的

 

 

 

对列常用的方法总结:

 

 

 

我们先来看一下用的比较多的offer和poll:

offer代码:

首先检查放进来的元素是否为null,如果为null,则抛异常,然后在操作的时候将这个对列上锁,这时其他的线程,包括

消费者线程和生产者线程都会阻塞,因为它是全部的锁,所以效率应该很低。

如果对列中元素的数量等于数组的容量,放不下了,返回false,否则调用enqueue向对列放入数据。

 public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

  

接下来看一下enqueue入队方法:

将元素放入数组中,如果此时放入的元素是数组的最后一个元素,那么下一次要从第一个开始放,

因为消费者取数据的时候是从数组低下标开始取的。对列维持的总数量count++,然后通知阻塞的消费线程

可以消费数据。

 private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

  

再来看一下取数据的poll方法:

先上锁,进行操作,如果对列元素数量为0则返回null,否则调用dequeue取元素

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

  

在看一下dequeue方法:

从数组中取出对应下标的元素,然后将该下标指向null,如果取数据的下标到达数组的最后一个元素,则下次从

0下标开始取,对列数量count--,然后通知阻塞的生产者可以向对列放数据了。

 private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

  

这里面有个peek方法,取出数组中的下一个元素,但是不从对列中移除

 public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

  

再来看一下 add remove  和element方法:

add直接调用父类的add方法,其实现依赖于offer方法,

上面我们分析offer放成功,返回true,失败返回false,add在offer基础上又包装了一下,成功返回true,失败抛异常。

 

 

 

 

remove方法:

 

标签:队列,lock,阻塞,学习,线程,items,原理,对列,BlockingQueue
来源: https://www.cnblogs.com/warrior4236/p/12516437.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有