ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

【并发编程】基于优先级队列实现的无界阻塞队列DelayQueue

2022-02-04 22:03:48  阅读:176  来源: 互联网

标签:队列 lock 元素 阻塞 无界 入队 DelayQueue


DelayQueue是什么

  • DelayQueue 是一个支持延时获取元素的阻塞队列。
  • 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;
  • 在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

DelayQueue的使用场景

  • 商城订单超时关闭。
  • 异步短信通知功能。
  • 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭。
  • 缓存过期清除。缓存中的对象,超过了存活时间,需要从缓存中移出。
  • 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

DelayQueue的特点

  • 不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
  • 使用的锁:ReentrantLock。
  • 使用的数据结构:PriorityQueue。与PriorityBlockingQueue类似,不过没有阻塞功能。
  • 阻塞的对象:Condition available

DelayQueue的入队出队逻辑

  • 入队:不阻塞,无界队列,与优先级队列入队相同,available。
  • 出队:为空时阻塞。不为空时检查堆顶元素过期时间,小于等于0则出队,大于0,说明没过期,则阻塞。

DelayQueue的数据结构源码分析

// 用于保证队列操作的线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,存储元素,用于保证延迟低的优先执行
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时):leader指向的是第一个从队列获取元素阻塞的线程
private Thread leader = null;
// 条件,用于表示现在是否有可取的元素:当新元素到达,或新线程可能需要成为leader时被通知
private final Condition available = lock.newCondition();

DelayQueue的构造方法源码分析

/**
 * 无参构造,什么也不做
 */
public DelayQueue() {}

/**
 * 有数据需要初始化的构造方法。
 */
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

/**
 * 添加全部元素
 */
public boolean addAll(Collection<? extends E> c) {
    // 添加的元素为空,抛异常
    if (c == null)
        throw new NullPointerException();
    // 传入的是当前对象,抛异常
    if (c == this)
        throw new IllegalArgumentException();
    // 定义修改的标志
    boolean modified = false;
    for (E e : c)
        // 添加元素
        if (add(e))
            // 添加成功,修改标志
            modified = true;
    // 返回修改的标志
    return modified;
}

DelayQueue的入队方法源码分析

/**
 * DelayQueue的入队方法
 */
public void put(E e) {
    offer(e);
}

/**
 * 有返回值的入队方法
 */
public boolean offer(E e) {
    // 得到当前队列的锁
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        // 调用优先级队列的offer操作
        q.offer(e);
        // 调用优先级队列的窥探操作:入队的元素位于队列头部,说明当前元素延迟最小
        if (q.peek() == e) {
            // 将 leader 置空
            leader = null;
            // available条件队列转同步队列,准备唤醒阻塞在available上的线程
            available.signal();
        }
        // 返回入队成功
        return true;
    } finally {
        // 解锁,真正唤醒阻塞的线程
        lock.unlock();
    }
}

DelayQueue的出队方法源码分析

/**
 * DelayQueue的出队方法
 */
public E take() throws InterruptedException {
    // 得到当前队列的锁
    final ReentrantLock lock = this.lock;
    // 获取锁操作:优先响应中断
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 取出堆顶元素( 最早过期的元素,但是不弹出对象)   
            E first = q.peek();
            // 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
            if (first == null)
                available.await();
            else {
                // 得到堆顶元素的到期时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
                    return q.poll();

                // 下面的逻辑是delay大于0 ,要阻塞了
                // 将first置为空方便gc
                first = null; // don't retain ref while waiting
                // 如果有线程争抢的Leader线程,则进行无限期等待。
                if (leader != null)
                    available.await();
                else {
                    // 如果leader为null,把当前线程赋值给它
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待剩余等待时间
                        available.awaitNanos(delay);
                    } finally {
                        // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
        if (leader == null && q.peek() != null)
            // available条件队列转同步队列,准备唤醒阻塞在available上的线程
            available.signal();
        // 解锁,真正唤醒阻塞的线程
        lock.unlock();
    }
}

结束语

  • 获取更多本文的前置知识文章,以及新的有价值的文章,让我们一起成为架构师!
  • 关注公众号,可以让你对MySQL有非常深入的了解
  • 关注公众号,每天持续高效的了解并发编程!
  • 关注公众号,后续持续高效的了解spring源码!
  • 这个公众号,无广告!!!每日更新!!!
    作者公众号.jpg

标签:队列,lock,元素,阻塞,无界,入队,DelayQueue
来源: https://www.cnblogs.com/zfcq/p/15863477.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有