ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

MQTT接收HEX(2/2)

2022-04-25 10:32:58  阅读:383  来源: 互联网

标签:fusesource HEX mqtt MQTT static org import 接收


一些通讯硬件默认发送和接收的是UTF-8字符和ASCII的消息,但也是有一些通讯硬件的是通过16进制消息进行交互的。

一、配置pom的maven依赖

<dependency>
    <groupId>org.fusesource.hawtbuf</groupId>
    <artifactId>hawtbuf</artifactId>
    <version>1.11</version>
</dependency>
<dependency>
    <groupId>org.fusesource.hawtdispatch</groupId>
    <artifactId>hawtdispatch</artifactId>
    <version>1.22</version>
</dependency>
<dependency>
    <groupId>org.fusesource.hawtdispatch</groupId>
    <artifactId>hawtdispatch-transport</artifactId>
    <version>1.22</version>
</dependency>
<dependency>
    <groupId>org.fusesource.mqtt-client</groupId>
    <artifactId>mqtt-client</artifactId>
    <version>1.16</version>
</dependency>

 

为了代码的可阅读性,我将以下代码按照功能封装到不同的类当中。

 

二、MQTT数据接收类

package com.xxxx.worker.controller.Server.MQTT;

import com.xxxx.common.utils.PropertyUtils;
import com.xxxx.worker.controller.Server.Execute.MessageExecute;
import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

/**
 * @title: MQTT消息订阅(接收十六进制)
 * @author: hunttown
 * @date: 2021年01月06日 16:32
 * @description: 当前类用bean注入,不要使用注解注入,因为下面要将其配置到worker里。
 */
public class MQTTSubHex {

    //服务器地址
    private final static String serverUrl = "你的服务器IP:端口";

    //客户端唯一标识
    private final static String clientid = "随便起个名字";

    //订阅主题
    private final static String subtopic = "你的订阅主题";

    //用户名
    private final static String username = "用户名";

    //密码
    private final static String password = "密码";

    //传输质量:0至多一次;1至少一次;2确保只有一次。
    private final static int qos = 0;

    void start() {
        try {
            //创建MQTT对象
            MQTT mqtt = new MQTT();

            // 设置MQTT Broker的ip和端口
            mqtt.setHost(serverUrl);

            // 连接前清空会话信息
            mqtt.setCleanSession(true);

            // 设置重新连接的次数
            mqtt.setReconnectAttemptsMax(10);

            // 设置重连的间隔时间(毫秒)
            mqtt.setReconnectDelay(2000);

            // 设置心跳时间(秒)
            mqtt.setKeepAlive((short) 30);

            // 设置缓冲的大小
            mqtt.setSendBufferSize(2 * 1024 * 1024);

            // 设置客户端ID
            mqtt.setClientId(clientid);

            // 设置用户名和字码
            mqtt.setUserName(username);
            mqtt.setPassword(password);

            final FutureConnection connection = mqtt.futureConnection();
            connection.connect();

            Topic[] topics = {new Topic(subtopic, QoS.AT_LEAST_ONCE)};
            connection.subscribe(topics);

            System.out.println("MQTT订阅设置初始化完毕!");

            MessageExecute hexExecute = new MessageExecute();

            while (true) {
                //接收信息
                Future<Message> futrueMessage = connection.receive();
                Message message = futrueMessage.await();

                String msg = String.valueOf(message.getPayloadBuffer());
                System.out.println("接收到信息:" + msg);

                //这里开始处理你的业务
                //1、如果数据量较大,可使用中间件暂存信息,如:MQ
                //2、如果数据量较小,可以使用异步处理
                //TODO
            }

        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("MQTT订阅设置初始化失败!");
        }
    }
}

 

三、线程类

package com.xxxx.worker.controller.Server.MQTT;

/**
 * @title: 启动一个线程
 * @author: hunttown
 * @date: 2021年03月23日 15:55
 * @description:
 */
public class MQTTHexThread extends Thread {

    private static MQTTSubHex mqttSubHex;

    public void run() {
        mqttSubHex.start();
    }

    public static MQTTSubHex getMqttSubHex() {
        return mqttSubHex;
    }

    public static void setMqttSubHex(MQTTSubHex mqttSubHex) {
        MQTTHexThread.mqttSubHex = mqttSubHex;
    }
}

 

四、监听类

package com.xxxx.worker.controller.TaskListener;

import com.xxxx.worker.controller.Server.MQTT.MQTTHexThread;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.http.HttpServlet;

/**
 * @title: MQTT监听类
 * @author: hunttown
 * @date: 2020年10月19日 18:06
 * @description: MQTT类
 */
public class MQTTHexListener extends HttpServlet implements ServletContextListener {
    public void contextInitialized(ServletContextEvent arg0) {

        System.out.println("----------------- MQTT:Hex线程已启动 --------------------------");

        MQTTHexThread thread = new MQTTHexThread();
        thread.setDaemon(true); //设置线程为后台线程
        thread.start();
    }

    public void contextDestroyed(ServletContextEvent arg0) {
        // TODO
    }
}

 

五、将监听类配置到web.xml中

<listener>
    <!-- MQTT:接收Hex数据 -->
    <listener-class>com.hunttown.analysis.worker.controller.TaskListener.MQTTHexListener</listener-class>
</listener>

 

完毕!

 

标签:fusesource,HEX,mqtt,MQTT,static,org,import,接收
来源: https://www.cnblogs.com/hunttown/p/16188905.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有