ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

在核心线程数已满的情况下如何实现阻塞队列?

2021-06-26 20:52:23  阅读:165  来源: 互联网

标签:数已 count elements 队列 元素 阻塞 线程


e0448291abf9c0e3bef0523c171ed45c.jpeg

前言

Java的线程池中,在核心线程数已满的情况下,任务会存储在阻塞队列中,那么什么是阻塞队列呢?

阻塞队列首先是个队列,在队列的基础上,支持另外两个附加操在队列为空时,获取元素的线程会等待队列变为非空

  • 在队列满时,添加元素的线程会等待队列可用

75fac189ed454b54f9b92f74e4edf155.png

那么阻塞队列是如何实现阻塞的?

自己实现一个阻塞队列

Synchronized、wait、notifyAll实现的阻塞队列

public class BlockingQueue {
    // 放置元素索引
    private int inputIndex;
    // 取出元素索引
    private int takeIndex;
    // 元素数组
    private String[] elements;
    // 数组中元素数量
    private int count;


    public BlockingQueue(int capacity) {
        elements = new String[capacity];
    }


    public Object take() throws InterruptedException {
        synchronized(this) {
            // 这里使用while的原因是线程被唤醒之后需要再判断一次数组是否已经有元素
            while (count == 0) {
                // 数组没有元素了,等待
                this.wait();
            }
            Object e = dequeue();
            this.notify();
            System.out.println("take method: " + Arrays.toString(elements));
            return e;
        }
    }


    public void put(String str) throws InterruptedException {
        synchronized (this) {
            // 这里使用while的原因是线程被唤醒之后需要再判断一次数组元素是否有空闲位置
            while (count == elements.length) {
                // 数组元素满了,等待
                this.wait();
            }
            enqueue(str);
            System.out.println("put method: " + Arrays.toString(elements));
            this.notify();
        }
    }
		
    /**
     * 入队方法
     * @param e 元素
     */
    private void enqueue(String e) {
        elements[inputIndex] = e;

        // 如果数组已满,input返回开头
        if (++inputIndex == elements.length) {
            inputIndex = 0;
        }

        count ++;
    }

    /**
     * 出队方法
     * @return
     */
    private Object dequeue() {

        Object e = elements[takeIndex];
        elements[takeIndex] = null;
        // 如果takeIndex已到数组终点,返回开头
        if (++takeIndex == elements.length) {
            takeIndex = 0;
        }
        count --;
        return e;
    }
}
public static void main(String[] args) {

        BlockingQueue queue = new BlockingQueue(10);
        // 10个线程不断放置元素
        IntStream.range(0, 10).forEach(i -> {
          Thread a = new Thread(() -> {
            try {
              queue.put("element");
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          });
          a.start();
        });

        // 10个线程取出元素
        IntStream.range(0, 10).forEach(i -> {
          Thread b = new Thread(() -> {
            try {
              queue.take();
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          });
          b.start();
        });
}

fe3fdc6d3043e00a2a761099eadc4fdd.png

condition、await、singal实现的阻塞队列

public class BlockingQueueWithCondition {

    // 放置元素索引
    private int inputIndex;
    // 取出元素索引
    private int takeIndex;
    // 元素数组
    private String[] elements;
    // 数组中元素数量
    private int count;

    ReentrantLock lock = new ReentrantLock();
    Condition notEmpty = lock.newCondition();
    Condition notFull = lock.newCondition();


    public BlockingQueueWithCondition(int capacity) {
        elements = new String[capacity];
    }


    public String take() throws InterruptedException {
        lock.lock();

        try {
            // 数组没有元素了,等待
            while (count == 0) {
                notEmpty.await();
            }
            String str = elements[takeIndex];
            elements[takeIndex] = null;
	    // 如果takeIndex已到数组终点,返回开头
            if (++takeIndex == elements.length) {
                takeIndex = 0;
            }
            notFull.signal();
            System.out.println("take method: " + Arrays.toString(elements));
            count--;
            return str;
        } finally {
            lock.unlock();
        }

    }


    public void put(String str) throws InterruptedException {
        lock.lock();

        try {
            // 数组元素满了,等待
            while (count == elements.length) {
                notFull.await();
            }
            elements[inputIndex] = str;
	    // 如果inputIndex已到数组终点,返回开头
            if (++inputIndex == elements.length) {
                inputIndex = 0;
            }
            notEmpty.signal();
            System.out.println("put method: " + Arrays.toString(elements));
            count++;
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {

        BlockingQueueWithCondition queue = new BlockingQueueWithCondition(10);
        // 10个线程不断放置元素
        IntStream.range(0, 10).forEach(i -> {
            Thread a = new Thread(() -> {
                try {
                    queue.put("element");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            a.start();
        });

        // 10个线程取出元素
        IntStream.range(0, 10).forEach(i -> {
            Thread b = new Thread(() -> {
                try {
                    queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            b.start();
        });
}

0117d7895db9c4af7504e2fecef4a92b.png

ReentrantLock、 Condition(await与signal)与synchronized、wait、notify非常相似,那么两者有什么差别呢?

  1. 调用wait时,首先需要确保调用了wait方法的线程已经持有了对象的锁,调用wait后,该线程会释放掉这个对象的锁,进入等待队列(wait set)
  2. 当调用notify时,系统会随机唤醒该对象等待队列中任意一个线程,当这个线程被唤醒后,它就会与其它线程一同竞争对象的锁
  3. synchronized获取锁和释放锁都是通过JVM底层来操作,无需开发者关注
  4. ReentrantLock获取锁和释放锁可以由开发者操作,更加灵活,调用await方法的线程会进入对象的等待队列中,调用singal方法时可以指定唤醒某个对象等待队列中的阻塞任务
JDK中的阻塞队列

JDK中提供了非常多的阻塞队列,这里只解析LinkedBlockingQueue,如果理解了上面阻塞队列的写法,可以很快理解JDK阻塞队列的源码

一些重要的参数

// 阻塞队列中的元素会包装成一个节点,有链表必有节点
static class Node<E> {
    E item;
  
    Node<E> next;

    Node(E x) { item = x; }
}

// 阻塞队列容量,
private final int capacity;

// 阻塞队列当前元素个数
private final AtomicInteger count = new AtomicInteger();

// 阻塞队列头节点
transient Node<E> head;

// 阻塞队列尾节点
private transient Node<E> last;

// LinkedBlockingQueue使用了两把锁,存取互不排斥
// take锁
private final ReentrantLock takeLock = new ReentrantLock();

// 当队列中无元素时,take锁会阻塞,直到被其它线程唤醒
private final Condition notEmpty = takeLock.newCondition();

// put锁
private final ReentrantLock putLock = new ReentrantLock();

// 当队列中元素已满时,put锁会阻塞,直到被其它线程唤醒
private final Condition notFull = putLock.newCondition();

put方法

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
  	// put锁进行加锁
        putLock.lockInterruptibly();
        try {
            // 如果队列元素已满,阻塞在notFull条件上
            while (count.get() == capacity) {
                notFull.await();
            }
            // 入队
            enqueue(node);
            // 注意:这里是先获取出队前队列长度,再加一
            c = count.getAndIncrement();
            // 如果当前队列元素加一还未达队列元素上线,则再唤醒一个线程,因为可能有很多线程阻塞在notFull
            // 条件上
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // put锁解锁
            putLock.unlock();
        }
  	// 加了一个元素后,唤醒阻塞在notEmpty条件上的线程来取元素
        if (c == 0)
            signalNotEmpty();
}

take方法

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
  	// takeLock进行加锁
        takeLock.lockInterruptibly();
        try {
            // 如果链表元素为空,阻塞在notEmpty上
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 元素出队
            x = dequeue();
            // 注意:这里是先获取出队前队列长度,再减一
            c = count.getAndDecrement();
            // 如果链表中元素> 1,唤醒指定对象的等待队列中的阻塞任务
            if (c > 1)
                notEmpty.signal();
        } finally {
            // 释放锁
            takeLock.unlock();
        }
  	// 如果出队前队列长度已满,现在减了一个元素后,唤醒阻塞在notFull条件上的线程
        if (c == capacity)
            signalNotFull();
        return x;
}

最后

最近我整理了整套《JAVA核心知识点总结》,说实话 ,作为一名Java程序员,不论你需不需要面试都应该好好看下这份资料。拿到手总是不亏的~我的不少粉丝也因此拿到腾讯字节快手等公司的Offer

Java进阶之路群,找管理员获取哦-!

e74d1a6f177fa4303a32dffe1cc135af.png

ddae41cc1277561e6fdd6913dbb11018.png

标签:数已,count,elements,队列,元素,阻塞,线程
来源: https://blog.51cto.com/u_15220153/2949891

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有