ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java并发54:并发集合系列-基于CAS算法的非阻塞无数据缓冲队列SynchronousQueue

2021-10-23 15:33:21  阅读:281  来源: 互联网

标签:队列 Java CAS 元素 并发 线程 SynchronousQueue sc new


原文地址:https://blog.csdn.net/Dax1n/article/details/69813682

介绍

Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue(队列只能存储一个元素)。

生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样,消费者移除数据操作必须等待生产者的插入。

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间。

你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。

队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。

数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。

可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。


实现原理

不像ArrayBlockingQueue、LinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。


应用场景

SynchronousQueue的一个使用场景是在线程池里。

Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

// 创建newCachedThreadPool线程池使用的消息队列是:SynchronousQueue  
ExecutorService es1 = Executors.newCachedThreadPool();  

java.util.concurrent.Executors.newCachedThreadPool实现:

public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());  
}  

SynchronousQueue非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务。

使用方式

接下来研究一下SynchronousQueue的使用:

SynchronousQueue创建:

// 如果为 true,则等待线程以 FIFO 的顺序竞争访问;否则顺序是未指定的。
// SynchronousQueue<Integer> sc =new SynchronousQueue<>(true);//fair -
SynchronousQueue<Integer> sc = new SynchronousQueue<>(); // 默认不指定的话是false,不公平的

由于SynchronousQueue是没有缓冲区的,所以如下方法不可用:

sc.peek();// Always returns null
sc.clear();
sc.contains(1);
sc.containsAll(new ArrayList<Integer>());
sc.isEmpty();
sc.size();
sc.toArray();
Integer [] in=new Integer[]{new Integer(2)};
sc.toArray(in);
sc.removeAll(new ArrayList<Integer>());
sc.retainAll(new ArrayList<Integer>());
sc.remove("a");
sc.peek();

由于SynchronousQueue 队列中最多只有一个元素,所以这些方法是没有意义的,所以在对方法的实现体中阉割掉了。


SynchronousQueue 获取元素:

public class Main {  
    public static void main(String[] args) throws InterruptedException {  
        SynchronousQueue<Integer> sc = new SynchronousQueue<>(); // 默认不指定的话是false,不公平的  
//      sc.take();// 没有元素阻塞在此处,等待其他线程向sc添加元素才会获取元素向下执行  
        sc.poll();//没有元素不阻塞在此处直接返回null向下执行  
        sc.poll(5,TimeUnit.SECONDS);//没有元素阻塞在此处等待指定时间,如果还是没有元素直接返回null向下执行  
    }  
}  

SynchronousQueue 存入元素:

public class Main {  
    public static void main(String[] args) throws InterruptedException {  
        SynchronousQueue<Integer> sc = new SynchronousQueue<>(); // 默认不指定的话是false,不公平的  
        // sc.put(2);//没有线程等待获取元素的话,阻塞在此处等待一直有线程获取元素时候放到队列继续向下运行  
        sc.offer(2);// 没有线程等待获取元素的话,不阻塞在此处,如果该元素已添加到此队列,则返回 true;否则返回 false  
        sc.offer(2, 5, TimeUnit.SECONDS);// 没有线程等待获取元素的话,阻塞在此处等待指定时间,如果该元素已添加到此队列,则返回true;否则返回 false  
    }  
}  

总结:

  • take和put是阻塞的获取和存储元素的方法,
  • poll和offer是不阻塞的获取元素和存储元素的方法,并且poll和offer可以指定超时时间。

以上是基础Api的讲解,但是并不是实际的用法。接下来看一下使用的Demo:

public class SynchronousQueueMain {  
    public static void main(String[] args) throws Exception {  
        // 如果为 true,则等待线程以 FIFO 的顺序竞争访问;否则顺序是未指定的。  
        // SynchronousQueue<Integer> sc =new SynchronousQueue<>(true);//fair  
        SynchronousQueue<Integer> sc = new SynchronousQueue<>(); // 默认不指定的话是false,不公平的  
        new Thread(() -> { //生产者线程,使用的是lambda写法,需要使用JDK1.8  
            while (true) {  
                try {  
                    sc.put(new Random().nextInt(50));  
                    //将指定元素添加到此队列,如有必要则等待另一个线程接收它。  
                    // System.out.println("sc.offer(new Random().nextInt(50)): "+sc.offer(new Random().nextInt(50)));   
                    // 如果另一个线程正在等待以便接收指定元素,则将指定元素插入到此队列。如果没有等待接受数据的线程则直接返回false  
                    // System.out.println("sc.offer(2,5,TimeUnit.SECONDS):  
                    // "+sc.offer(2,5,TimeUnit.SECONDS));//如果没有等待的线程,则等待指定的时间。在等待时间还没有接受数据的线程的话,直接返回false  
                    System.out.println("添加操作运行完毕...");//是操作完毕,并不是添加或获取元素成功!  
                    Thread.sleep(1000);  
                } catch (Exception e) {  
                    // TODO Auto-generated catch block  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
        new Thread(() -> {//消费者线程。使用的是lambda创建的线程写法需要使用jdk1.8  
            while (true) {  
                try {  
                    System.out.println("-----------------> sc.take: " + sc.take());  
                    System.out.println("-----------------> 获取操作运行完毕...");//是操作完毕,并不是添加或获取元素成功!  
                    Thread.sleep(1000);  
                } catch (Exception e) {  
                    // TODO Auto-generated catch block  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
    }  
}  

take 、poll和put、offer可以组合使用,可以根据实际业务需求选择!

标签:队列,Java,CAS,元素,并发,线程,SynchronousQueue,sc,new
来源: https://www.cnblogs.com/yaochunhui/p/15446046.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有