ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

基于ActiveMq服务器Paho Java客户端的MQTT消息订阅与发送

2020-01-21 11:06:16  阅读:296  来源: 互联网

标签:订阅 Java String topic MQTT client TopicConst Paho 客户端


Java基于ActiveMq 客户端的MQTT实现

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,MQTT消息的发送和订阅都是依赖MQTT服务器的,没有MQTT服务器,你的客户端是无法订阅和发送消息的。所以在最开始的时候,可以选择性的在你的电脑上面安装一个MQTT服务器。

这里需要了解一些概念:

(1)MQTT协议实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者、代理、订阅者,其中消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者也可以是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

主题:可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)
负载:可以理解为消息内容,是指的订阅者具体收到的数据。
(2)MQTT客户端
发布消息给其它相关的客户端。
订阅以请求接受相关的消息。
取消订阅以移除接受消息的请求。
从服务端断开连接
(3)服务器
接受来自客户端的网络连接。
接受客户端发布的应用消息。
处理客户端的订阅和取消订阅请求。
转发应用消息给符合条件的已订阅客户端。

MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 (我们这里使用的ActiveMQ)

1 安装apache-activemq

(1) http://activemq.apache.org 进行下载需要的版本,这里是安装在Windows上,下载好的压缩包,进行解压。
在这里插入图片描述(2)进入到指定目录,运行/bin/win32或win64 activemq.bat
在这里插入图片描述(3)这样我们的ActiveMq 就运行,然后在浏览器中输入http://localhost:8161 刚开始需要登录,初始账号h和密码:admin ,这样服务器就部署好了。
在这里插入图片描述

2 业务需求

具体的实现方式,根据业务不同,代码逻辑不同,我这里定义了设备信息采集的客户端,和接收所有设备信息的服务端,
(1)客户端设备信息能实时上传
(2)客户端可以实时获取最新的设备参数等数据
(3)服务器能实时更新设备参数和加载最新数据
(4)其他需求

3 具体实现

(1)了解Paho
Paho Java客户端是一个用Java编写的MQTT客户端库,用于开发在JVM或其他Java兼容平台(如Android)上运行的应用程序。
Paho Java客户端提供了两个API:MqttAsyncClient提供了一个完全异步的API,通过已注册的回调通知完成活动。 MqttClient是MqttAsyncClient的一个同步包装,其中函数与应用程序同步。
(2)添加依赖

  <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.0</version>
    </dependency>

不管是客户端还是服务端都要加,对于paho不存在客户端和服务端,消息的订阅与发送都是可以,只是通过代码的业务逻辑实现,有些消息只有客户端能订阅和发送,有些消息只有服务端能订阅和发送。
(3)连接 MqttClient
在程序加载的时候启动线程,连接mqtt服务器,完成一些数据初始化操作
1 连接mqtt

 	private MqttClient client;
    private JedisClient jedisClient;
    private String scanSrc;
    private IWorkshopAlarmDao alarmDao;
    private String mqttHost;
    private String clientId;
    private String userName;
    private String passWord;
    private MqttConnectOptions options;
    private int qos = 2;
     public void run(){
        try {
            System.out.println("连接MQTT服务。。。。。");
            connectMqtt();
            startServer();
            System.out.println("MQTT 订阅服务。。。。。");

        }catch (Exception e){
            e.printStackTrace();
        }
    }
    public void connectMqtt(){
        try {
//            client = new MqttAsyncClient(mqttHost, clientId, new MemoryPersistence());
            client = new MqttClient(mqttHost, clientId, new MemoryPersistence());
            options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(userName);
            options.setPassword(passWord.toCharArray());
            // 设置超时时间
            options.setConnectionTimeout(10);
            // 设置会话心跳时间
            options.setKeepAliveInterval(90);
            client.setCallback(new PublishMessage());

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2 设置断开重连

public void startServer() {
        try {
            while (true) {
                try {
                    //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
                    if (!client.isConnected()) {
                        System.out.println("*****尝试连接mqtt *****");
                        client.connect(options);
                    }
                    if (client.isConnected()) {//连接成功,跳出连接
                        System.out.println("*****已连接mqtt success *****");
                        break;
                    }
                    sleep(2000);
                } catch (MqttException e1) {
                    System.out.println("连接MQTT服务失败。。。。。。。。。。。。");
                }
            }
            //订阅消息
            subTopic();
            initMessage();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

3 在初始化的时候需要订阅消息topic

 public void subTopic(){
        //订阅消息
        int[] Qos  = {qos};
//        String[] reg_workshop = {TopicConst.reg_workshop};
//        String[] reg_aiservice = {TopicConst.reg_aiservice};
//        String[] reg_camera = {TopicConst.reg_camera};
//        String[] reg_gate = {TopicConst.reg_gate};
        String[] init_reg = {TopicConst.init_reg};
        String[] get_worker = {TopicConst.get_worker};
        String[] update_workshop = {TopicConst.update_workshop};
        String[] update_aiservice = {TopicConst.update_aiservice};
        String[] update_camera = {TopicConst.update_camera};
        String[] update_gate = {TopicConst.update_gate};
        String[] workshop_record = {TopicConst.workshop_record};
        String[] delete_gate = {TopicConst.delete_gate};
        String[] delete_camera = {TopicConst.delete_camera};
        String[] workshop_reset = {TopicConst.workshop_reset};
        try {
//            client.subscribe(reg_workshop, Qos);
//            client.subscribe(reg_aiservice, Qos);
//            client.subscribe(reg_camera, Qos);
//            client.subscribe(reg_gate, Qos);
            client.subscribe(get_worker, Qos);
            client.subscribe(update_workshop, Qos);
            client.subscribe(update_aiservice, Qos);
            client.subscribe(update_camera, Qos);
            client.subscribe(update_gate, Qos);
            client.subscribe(workshop_record, Qos);
            client.subscribe(delete_gate, Qos);
            client.subscribe(delete_camera, Qos);
            client.subscribe(init_reg, Qos);
            client.subscribe(workshop_reset, Qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

4 消息接收,业务逻辑处理

class PublishMessage implements MqttCallback {
        public void connectionLost(Throwable cause) {
            // 连接丢失后,一般在这里面进行重连
            System.out.println("连接断开,可以做重连");
//            connectMqtt();
            StopService();
            startServer();
        }

        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("deliveryComplete---------" + token.isComplete());
        }

        public void messageArrived(String topic, MqttMessage message) {
            // subscribe后得到的消息会执行到这里面
            byte[] messByte = message.getPayload();
            System.out.println("接收消息主题 : " + topic);
            System.out.println("接收消息Qos : " + message.getQos());
            try {
//                System.out.println("接收消息内容 : " + new String(messByte).getBytes("utf8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (topic == null) {
                return;
            }
            try {
                String data = new String(messByte, "utf-8");
                if (topic.equals(TopicConst.init_reg)) {
                    initReg(data);
                } else if (topic.equals(TopicConst.update_workshop)) {
                    updateWorkShop(data);
                } else if (topic.equals(TopicConst.update_aiservice)) {
                    updateAiservier(data);
                } else if (topic.equals(TopicConst.update_camera)) {
                    updateCamera(data);
                } else if (topic.equals(TopicConst.update_gate)) {
                    updateGate(data);
                } else if (topic.equals(TopicConst.get_worker)) {
                    if (data.equals("1")) {
                        sendWorker();
                    }
                } else if (topic.equals(TopicConst.workshop_record)) {
                    reportRecords(data);
                } else if (topic.equals(TopicConst.delete_gate)) {
                    deleteGate(data);
                } else if (topic.equals(TopicConst.delete_camera)) {
                    deleteCamera(data);
                }else if (topic.equals(TopicConst.workshop_reset)) {
                    resetWorkShop(data);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

5 发送消息

public void publish(MqttTopic topic , MqttMessage message) throws  MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("message is published completely! " + token.isComplete());
    }

6 服务断开做相关处理

public void StopService() {
        try {
            if(client !=null){
                // 断开连接
                client.disconnect();
                // 关闭客户端
                client.close();
            }

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

这里没有做相关Mqtt 服务,而是把消息订阅与分布和具体的业务代码融合。

使用org.eclipse.paho 工具进行消息的订阅和发送

在这里插入图片描述工具的使用很简单,不说明。

cyadyx 发布了9 篇原创文章 · 获赞 4 · 访问量 1万+ 私信 关注

标签:订阅,Java,String,topic,MQTT,client,TopicConst,Paho,客户端
来源: https://blog.csdn.net/cyadyx/article/details/104058946

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有