ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

[JUC] 阻塞队列 BlockingQueue

2021-06-11 14:03:57  阅读:163  来源: 互联网

标签:JUC Thread 队列 queue value Ele try BlockingQueue


[JUC] 阻塞队列 BlockingQueue

@TOC[目录]

BlockingQueue用法

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:
在这里插入图片描述
一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。

一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。

负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

抛异常 特定值 阻塞 超时
插入 add(o) offer(o) put(o)
移除 remove(o) poll(o) take(o)
检查 element(o) peek(o)

四组不同的行为方式解释:

  1. 抛异常:如果试图的操作无法立即执行,抛一个异常。
  2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。

可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的实现(Java 6):

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

ArrayBlockingQueue

数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 类实现了 BlockingQueue 接口。 ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。

ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

UML

在这里插入图片描述

Ex.1


public class ArrayBlockingQueueEx {

    /**
     * 主线程
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new ArrayBlockingQueue(1024);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
        Thread.sleep(4000);
    }


    /**
     * 消费者
     */
    static class Consumer implements Runnable {

        protected BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                System.out.println(queue.take());
                System.out.println(queue.take());
                System.out.println(queue.take());
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 生产者
     */
    static class Producer implements Runnable {

        protected BlockingQueue queue;

        public Producer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                queue.put("1");
                Thread.sleep(1000);
                queue.put("2");
                Thread.sleep(1000);
                queue.put("3");
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }
}

Ex.1.out

1
2
3

DelayQueue

延迟队列 DelayQueue

DelayQueue 实现了 BlockingQueue 接口。 DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现
java.util.concurrent.Delayed 接口。

DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。
传递给 getDelay 方法的 getDelay 实例是一个枚举类型,它表明了将要延迟的时间段。
TimeUnit 枚举将会取以下值:

枚举值 含义
DAYS
HOURS 小时
MINUTES 分钟
SECONDS
MILLISECONDS 毫秒
MICROSECONDS 微秒
NANOSECONDS 纳秒

正如你所看到的,Delayed 接口也继承了 java.lang.Comparable 接口,这也就意味着 Delayed 对象之间可以进行对比。这个可能在对 DelayQueue 队列中的元素进行排序时有用,因此它们可以根据过期时间进行有序释放。

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

UML

在这里插入图片描述
Ex.2

public class DelayQueueEx {

    /**
     * 测试Delay属性
     *
     * @param args
     */
    public static void main(String[] args) {
        DelayQueue<DelayElement> queue = new DelayQueue<>();
        for (int i = 0; i < 10; i++) {
            long delay = Double.valueOf(Math.random() * 10000).longValue();
            queue.put(new DelayElement(delay, "task:" + i));
        }
        DelayElement ele = null;
        while (true) {
            try {
                if ((ele = queue.take()) != null) {
                    System.out.println(ele.toString());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 延迟元素
     */
    @Getter
    @ToString
    static class DelayElement implements Delayed {

        /**
         * 延迟时间
         */
        long delay;
        /**
         * 到期时间
         */
        long expire;
        /**
         * 任务名称
         */
        String name;

        public DelayElement(long delay, String name) {
            this.delay = delay;
            this.name = name;
            this.expire = System.currentTimeMillis() + delay;
        }

        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         * 剩余时间 = 到期时间 - 当前时间
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        /**
         * Compares this object with the specified object for order.  Returns a
         * negative integer, zero, or a positive integer as this object is less
         * than, equal to, or greater than the specified object.
         *
         * <p>The implementor must ensure <tt>sgn(x.compareTo(y)) ==
         * -sgn(y.compareTo(x))</tt> for all <tt>x</tt> and <tt>y</tt>.  (This
         * implies that <tt>x.compareTo(y)</tt> must throw an exception iff
         * <tt>y.compareTo(x)</tt> throws an exception.)
         *
         * <p>The implementor must also ensure that the relation is transitive:
         * <tt>(x.compareTo(y)&gt;0 &amp;&amp; y.compareTo(z)&gt;0)</tt> implies
         * <tt>x.compareTo(z)&gt;0</tt>.
         *
         * <p>Finally, the implementor must ensure that <tt>x.compareTo(y)==0</tt>
         * implies that <tt>sgn(x.compareTo(z)) == sgn(y.compareTo(z))</tt>, for
         * all <tt>z</tt>.
         *
         * <p>It is strongly recommended, but <i>not</i> strictly required that
         * <tt>(x.compareTo(y)==0) == (x.equals(y))</tt>.  Generally speaking, any
         * class that implements the <tt>Comparable</tt> interface and violates
         * this condition should clearly indicate this fact.  The recommended
         * language is "Note: this class has a natural ordering that is
         * inconsistent with equals."
         *
         * <p>In the foregoing description, the notation
         * <tt>sgn(</tt><i>expression</i><tt>)</tt> designates the mathematical
         * <i>signum</i> function, which is defined to return one of <tt>-1</tt>,
         * <tt>0</tt>, or <tt>1</tt> according to whether the value of
         * <i>expression</i> is negative, zero or positive.
         *
         * @param o the object to be compared.
         * @return a negative integer, zero, or a positive integer as this object
         * is less than, equal to, or greater than the specified object.
         * @throws NullPointerException if the specified object is null
         * @throws ClassCastException   if the specified object's type prevents it
         *                              from being compared to this object.
         */
        @Override
        public int compareTo(Delayed o) {
            return Long.valueOf(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)).intValue();
        }
    }
}

Ex.2.out

DelayQueueEx.DelayElement(delay=1167, expire=1566299775344, name=task:6)
DelayQueueEx.DelayElement(delay=1701, expire=1566299775877, name=task:1)
DelayQueueEx.DelayElement(delay=3548, expire=1566299777725, name=task:7)
DelayQueueEx.DelayElement(delay=6072, expire=1566299780249, name=task:9)
DelayQueueEx.DelayElement(delay=6701, expire=1566299780878, name=task:5)
DelayQueueEx.DelayElement(delay=7510, expire=1566299781686, name=task:0)
DelayQueueEx.DelayElement(delay=7652, expire=1566299781829, name=task:8)
DelayQueueEx.DelayElement(delay=8476, expire=1566299782653, name=task:3)
DelayQueueEx.DelayElement(delay=8771, expire=1566299782948, name=task:2)
DelayQueueEx.DelayElement(delay=8976, expire=1566299783153, name=task:4)

LinkedBlockingQueue

链阻塞队列 LinkedBlockingQueue

LinkedBlockingQueue 类实现了 BlockingQueue 接口。

LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。

LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

UML

在这里插入图片描述
Ex.3

public class LinkedBlockingQueueEx {

    /**
     * 链阻塞队列
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new LinkedBlockingQueue();
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
        Thread.sleep(4000);
    }
}

Ex.3.out

1
2
3

PriorityBlockingQueue

具有优先级的阻塞队列 PriorityBlockingQueue

PriorityBlockingQueue 类实现了 BlockingQueue 接口。

PriorityBlockingQueue 是一个***的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入
null 值。

所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的
Comparable 实现。

注意

  • PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。
  • 如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。
    在这里插入图片描述
public class PriorityBlockingQueueEx {

    /**
     * 测试优先级队列 PriorityBlockingQueue
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Ele> queue = new PriorityBlockingQueue<>();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                int value = Double.valueOf(Math.random() * 10).intValue();
                try {
                    queue.put(new Ele(value));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        Thread.sleep(3000);

        new Thread(() -> {
            while (true) {
                try {
                    System.out.println(queue.take().toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }


    @Getter
    @AllArgsConstructor
    @ToString
    static class Ele implements Comparable<Ele> {

        int value;

        @Override
        public int compareTo(Ele o) {
            return this.getValue() - o.getValue();
        }
    }
}
PriorityBlockingQueueEx.Ele(value=1)
PriorityBlockingQueueEx.Ele(value=2)
PriorityBlockingQueueEx.Ele(value=2)
PriorityBlockingQueueEx.Ele(value=3)
PriorityBlockingQueueEx.Ele(value=3)
PriorityBlockingQueueEx.Ele(value=6)
PriorityBlockingQueueEx.Ele(value=6)
PriorityBlockingQueueEx.Ele(value=7)
PriorityBlockingQueueEx.Ele(value=8)
PriorityBlockingQueueEx.Ele(value=9)

SynchronousQueue

同步队列 SynchronousQueue

SynchronousQueue 类实现了 BlockingQueue 接口。

SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

public class SynchronousQueueEx {

    /**
     * 同步队列测试
     *
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Ele> queue = new SynchronousQueue<>();
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                int value = Double.valueOf(Math.random() * 10).intValue();
                try {
                    Ele ele = new Ele(value);
                    queue.put(ele);
                    System.out.println(String.format("[%s] try put: %s", Thread.currentThread().getName(), ele.toString()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread1.start();
        Thread.sleep(3000);
        Thread thread2 = new Thread(() -> {
            while (true) {
                try {
                    Ele take = queue.take();
                    System.out.println(String.format("[%s] try take: %s", Thread.currentThread().getName(), take.toString()));
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread2.start();
    }

    @Getter
    @AllArgsConstructor
    @ToString
    static class Ele {
        int value;
    }
}
[Thread-0] try put: SynchronousQueueEx.Ele(value=9)
[Thread-1] try take: SynchronousQueueEx.Ele(value=9)
[Thread-1] try take: SynchronousQueueEx.Ele(value=5)
[Thread-0] try put: SynchronousQueueEx.Ele(value=5)
[Thread-0] try put: SynchronousQueueEx.Ele(value=2)
[Thread-1] try take: SynchronousQueueEx.Ele(value=2)
[Thread-0] try put: SynchronousQueueEx.Ele(value=4)
[Thread-1] try take: SynchronousQueueEx.Ele(value=4)
[Thread-0] try put: SynchronousQueueEx.Ele(value=6)
[Thread-1] try take: SynchronousQueueEx.Ele(value=6)
[Thread-1] try take: SynchronousQueueEx.Ele(value=9)
[Thread-0] try put: SynchronousQueueEx.Ele(value=9)
[Thread-1] try take: SynchronousQueueEx.Ele(value=3)
[Thread-0] try put: SynchronousQueueEx.Ele(value=3)
[Thread-1] try take: SynchronousQueueEx.Ele(value=8)
[Thread-0] try put: SynchronousQueueEx.Ele(value=8)
[Thread-1] try take: SynchronousQueueEx.Ele(value=7)
[Thread-0] try put: SynchronousQueueEx.Ele(value=7)
[Thread-0] try put: SynchronousQueueEx.Ele(value=7)
[Thread-1] try take: SynchronousQueueEx.Ele(value=7)

从日志输出可以看出同步队列中的元素每次取完后才会再次put进去,如果没被取走的情况下,put会被阻塞。

标签:JUC,Thread,队列,queue,value,Ele,try,BlockingQueue
来源: https://blog.51cto.com/u_15263565/2896382

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有