ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java并发:阻塞队列之DelayQueue

2021-08-15 21:01:34  阅读:187  来源: 互联网

标签:java 队列 lock queue DelayQueue null public


延时队列

DelayQueue是一个支持延时获取元素的使用优先级队列实现的无界的阻塞队列。

在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

类图如下:

DelayQueue的定义以及构造函数如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     */
    private Thread leader;

    /**
     * Condition signalled when a newer element becomes available
     * at the head of the queue or a new thread may need to
     * become leader.
     */
    private final Condition available = lock.newCondition();

    /**
     * Creates a new {@code DelayQueue} that is initially empty.
     */
    public DelayQueue() {}

    /**
     * Creates a {@code DelayQueue} initially containing the elements of the
     * given collection of {@link Delayed} instances.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

解读:

DelayQueue 内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步。

 

重点解释一下变量 leader:

其使用基于 Leader-Follower模式的变体,用于减少不必要的线程等待。

当一个线程调用队列的 take 方法变为 leader 线程后,它会调用 available. awaitNanos(delay) 等待 delay 时间;其他线程(follwer线程) 则调用 available. await()进行无限等待。

leader 线程延迟时间过期后会退出 take 方法,并通过调用 available.signal()方法唤醒一个 follwer线程,被唤醒的 follwer线程被选举为新的 leader线程。 

 

Note:

队列中的元素必须实现Delayed接口和Comparable接口,也就是说DelayQueue里面的元素必须有public void compareTo(To)和long getDelay(TimeUnit unit)方法存在。

原因:由于每个元素都有一个过期时间,所以要实现获取当前元素还剩下多少时间就过期了的接口;由于DelayQueue底层使用优先级队列来实现,所以要实现元素之间相互比较的接口。

Delayed接口的定义如下:

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

解读:

此接口继承自Comparable接口。

Comparable接口的定义如下:

public interface Comparable<T> {
    public int compareTo(T o);
}

 

添加元素

offer方法的代码如下:

    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

解读:

该方法在获取独占锁之后调用优先级队列的offer方法实现入队

    /**
     * Inserts the specified element into this priority queue.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws ClassCastException if the specified element cannot be
     *         compared with elements currently in this priority queue
     *         according to the priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        siftUp(i, e);
        size = i + 1;
        return true;
    }

解读:

如果待插入元素 e 为 null,则抛出 NullPointerException 异常。

由于DelayQueue是无界队列,所以方法一直返回 true。

Note:

前述offer中 q.peek()方法返回的并不一定是当前添加的元素。

如果 q.peek()方法返回的是 e,则说明当前元素 e将是最先过期的,于是重置 leader线程为 null,进而激活 avaliable变量对应的条件队列里的一个线程,告诉它队列里面有元素了。

 

put方法的代码如下:

    /**
     * Inserts the specified element into this delay queue. As the queue is
     * unbounded this method will never block.
     *
     * @param e the element to add
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) {
        offer(e);
    }

解读:

此方法直接调用offer方法来实现。

获取元素

poll方法的代码如下:

    /**
     * Retrieves and removes the head of this queue, or returns {@code null}
     * if this queue has no elements with an expired delay.
     *
     * @return the head of this queue, or {@code null} if this
     *         queue has no elements with an expired delay
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            return (first == null || first.getDelay(NANOSECONDS) > 0)
                ? null
                : q.poll();
        } finally {
            lock.unlock();
        }
    }

解读:

如果队列里面没有过期元素,则返回null;否则返回队首元素。

 

take方法的代码如下:

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

解读:

如果队列里面没有过期元素,则等待。

 

示例

可以将延时队列DelayQueue运用在以下场景中:

  (1)缓存系统的设计:可以用DelayQueue保存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素,则表示缓存有效期到了。

  (2)定时任务调度:使用DelayQueue保存当天将要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行任务,比如TimerQueue就是使用DelayQueue实现的。

 

具体示例如下:

(1)Student对象作为DelayQueue的元素,其必须实现Delayed接口的两个方法

package com.test;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Student implements Delayed {//必须实现Delayed接口
    
    private String name;
    private long submitTime;// 交卷时间
    private long workTime;// 考试时间

    public Student(String name, long submitTime) {
        this.name = name;
        this.workTime = submitTime;
        this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
        System.out.println(this.name + " 交卷,用时" + workTime);
    }

    public String getName() {
        return this.name + " 交卷,用时" + workTime;
    }
    
    //必须实现getDelay方法
    public long getDelay(TimeUnit unit) {
        //返回一个延迟时间
        return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
    }

    //必须实现compareTo方法
    public int compareTo(Delayed o) {
        Student that = (Student) o;
        return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
    }

}

 

(2)主线程程序

package com.test;

import java.util.concurrent.DelayQueue;

public class DelayQueueTest { public static void main(String[] args) throws Exception { // 新建一个等待队列 final DelayQueue<Student> bq = new DelayQueue<Student>(); for (int i = 0; i < 5; i++) { Student student = new Student("学生"+i,Math.round((Math.random()*10+i))); bq.put(student); // 将数据存到队列里! } //获取但不移除此队列的头部;如果此队列为空,则返回 null。 System.out.println(bq.peek().getName()); } }

 

小结:

 

标签:java,队列,lock,queue,DelayQueue,null,public
来源: https://www.cnblogs.com/studyLog-share/p/15140333.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有