ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java 多线程 发布订阅模式:发布者java.util.concurrent.SubmissionPublisher;订阅者java.util.concurrent.Flow.Subscriber

2021-04-28 18:04:07  阅读:189  来源: 互联网

标签:订阅 java lock Flow util concurrent 发布者


1,什么是发布订阅模式?

在软件架构中,发布订阅是一种消息范式,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者)。而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在。同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的消息,无需了解哪些发布者(如果有的话)存在。 Java9开始新增了一个发布-订阅框架,框架是基于异步响应流。发布,订阅框架可以非常方便地处理异步线程之间的流数据交换( 比如两个线程之间需要交换数据) 而且这个发布、订阅框架不需要使用数据中心来缓冲数据,同时具有非常高效的性能。

 

2,发布订阅模式的4个角色

  1. Flow.Publisher: 代表数据发布者,生产者
  2. Flow.Subscriber: 表数据订阅者、消费者
  3. Flow.Subscription: 表发布者和订阅者之间的链接纽带。订阅者既可通过调用该对象的request()方法来获取数据项,也可通过调用对象的cancel()方法来取消订阅。
  4. Flow.Processor: 数据处理器,它可同时作为发布者和订阅者使用

测试用例:发布者每秒钟发布一条消息,订阅者每秒钟订阅一条消息。

注意:订阅者处理消息,依赖当前线程的存活状态,如果发布消息后当前程序代码运行完毕会立即退出,订阅者来不及执行任何程序。

此例 用锁保持当前线程存活

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClassName PublisherFlowSubscriber
 * @projectName: object1
 * @author: Zhangmingda
 * @description: XXX
 * date: 2021/4/28.
 */
public class PublisherFlowSubscriber {
    /**
     * 定义用来保持线程不退出的锁
     */
    private static Lock lock = new ReentrantLock(true);
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        /**
         * 定义一个发布者,需要设定要发送消息的泛型数据类型
         */
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        /**
         * 定义一个订阅者
         */
        MySubscirber<String> subscirber = new MySubscirber<>("订阅者1");
        MySubscirber<String> subscirber2 = new MySubscirber<>("订阅者2");
        /**
         * 通过发布者配置订阅者 会触发订阅者的onSubscribe方法,他们之间的链接纽带会通过参数传递给onSubscribe方法,如果注册失败会触发onError方法
         */
        publisher.subscribe(subscirber);publisher.subscribe(subscirber2);

        /**
         * 测试发布消息
         */
        List<String> list =  List.of("张三", "李四", "王五", "赵六");
        list.forEach(string -> publisher.submit(string)); //向订阅者发布数据,需要保持前台的线程存活,否则当前线程执行结束,发布者和订阅者都被销毁了。
        /**
         * 关闭消息发布
         */
        publisher.close(); //关闭后,如果当前线程未退出,待订阅者所有消息都处理完毕才会运行订阅者的onComplete方法
        lock.lock();
        //抛出锁
        condition.await();
        lock.lock();
    }

    /**
     * 定义订阅者类,需要注意实现接口Flow.Subscriber 实现其泛型传递
     */
    private static class MySubscirber<T> implements Flow.Subscriber<T>{
        /**
         * 订阅者自定义的属性,名字,关联的订阅平台
         */
        private String name;
        private Flow.Subscription subscription;

        public MySubscirber(String name) {
            this.name = name;
        }

        /**
         * 订阅的时候触发的方法
         * @param subscription 订阅者被关联的订阅平台
         */
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println(name + "开启订阅" + subscription);
            /**
             * 从订阅平台获取一条消息
             */
            subscription.request(1);
            /**
             * 将平台实例保存,便于复用
             */
            this.subscription = subscription;
        }

        /**
         * 获取一条数据后触发的方法
         * @param
         */
        @Override
        public void onNext(T t) {
            System.out.println(name + "获取到了一条数据:" +t);
            //再次获取一条数据...自循环触发自己循环调用,一直将所有数据获取完毕
            subscription.request(1);
            /**
             * 模拟处理耗时
             */
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        /**
         * 订阅出错时运行的方法
         * @param throwable 错误对象
         */
        @Override
        public void one rror(Throwable throwable) {
            throwable.printStackTrace();
        }

        /**
         * 发布者停止发布,且订阅者处理完接收数据后,触发该方法
         */
        @Override
        public void onComplete() {
            System.out.println(name + "发布者关闭了发布");
            lock.lock();
            condition.signalAll();
            lock.unlock();
        }
    }
}

 

 

标签:订阅,java,lock,Flow,util,concurrent,发布者
来源: https://www.cnblogs.com/zhangmingda/p/14715139.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有