ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

MQTT SpringBoot入门跑起来

2021-09-23 10:57:52  阅读:316  来源: 互联网

标签:String 入门 mqtt MQTT connectOptions new public SpringBoot


一、简单介绍
1.MQTT-即时通讯协议
在这里插入图片描述
mqtt broker即服务端
mqtt client即客户端

2.主要特点

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
  • 对负载内容屏蔽的消息传输
  • 使用 TCP/IP 提供网络连接
  • 有三种消息发布服务质量:
    “至多一次”:适用消息频繁发且丢失一两条不影响的场景,如:环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    “至少一次”:确保消息到达,但消息重复可能会发生
    “只有一次”:确保消息到达一次。优点是确保消息送达且有且仅有一次,缺点是系统开销大.
  • 小型传输,开销很小
  • 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制

3.下载安装(省略)

二、MQTT+Spring Boot
1.MQTT链接相关信息配置

server:
  mqtt:
    url: tcp://xxx.xx.xx:1883 
    topics: scrm_user/#
    clientId: xxxx
    maxInflight: 20
    username: '***'
    password: '***'

2.MQTT Bean Configuration

@Configuration
public class MqttConfig {

    @Value("${mding.mqtt.url}")
    private String mqttUrl;

    @Value("${mding.mqtt.topics}")
    private String mqttTopics;

    @Value("${mding.mqtt.username}")
    private String mqttUserName;

    @Value("${mding.mqtt.password}")
    private String mqttPassword;

    @Value("${mding.mqtt.maxInflight}")
    private Integer mqttMaxInflight;

    @Value("${mding.mqtt.clientId}")
    private String mqttClientId;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setServerURIs(mqttUrl.split(","));
        connectOptions.setConnectionTimeout(5000);
        if (StringUtils.isNotEmpty(mqttUserName)) {
            connectOptions.setPassword(mqttPassword.toCharArray());
            connectOptions.setUserName(mqttUserName);
        }
        connectOptions.setAutomaticReconnect(true);
        // 客户端心跳消息的最大并发数
        connectOptions.setMaxInflight(mqttMaxInflight);
        connectOptions.setKeepAliveInterval(120);
        factory.setConnectionOptions(connectOptions);
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        String clientId = mqttClientId.concat("_").concat(String.valueOf(System.currentTimeMillis()));
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttUrl, clientId);
        // 设置连接超时时长
        adapter.setCompletionTimeout(30000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        // 设置服务质量
        // 0 最多一次,数据可能丢失;
        // 1 至少一次,数据可能重复;
        // 2 只有一次,有且只有一次;最耗性能
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInputChannel());
        adapter.addTopic(mqttTopics.split(","));
        return adapter;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MdingMqttInBoundHandler();
    }


    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        String clientId = mqttClientId.concat("_").concat(String.valueOf(System.currentTimeMillis()));
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
        // 设置发送消息时不阻塞
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(0);
        return messageHandler;
    }

}

3.MQTT订阅消息入口

public class MdingMqttInBoundHandler implements MessageHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(MdingMqttInBoundHandler.class);

    private static final String PRE_SUFFER = "scrm_user-";

    /**
     * 备注:低版本使用 mqtt_topic
     *
     * @param message 消息体
     * @throws MessagingException
     */
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
        String payload = message.getPayload().toString();
        LOGGER.info("MdingMqttInBoundHandler.handleMessage->{} {}", topic, payload);
        // 正则匹配具体的topic
        Map<Pattern, String> map = TopicPattern.M_DING_PATTERN;
        Pattern pattern = map.keySet()
                .stream().filter(ptn -> ptn.matcher(topic).find()).findFirst().orElse(null);
        if (null != pattern) {
            String action = map.get(pattern);
            //TODO 执行业务分发逻辑
        }
    }
}

4.MQTT发布消息

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MdingMqttMsgGateWay {

    /**
     * 发送消息到MQTT
     *
     * @param data  Data
     * @param topic Topic
     */
    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

}

三、客户端调试工具
可以实现用MQTTX客户端进行发布消息和订阅消息的调试。

在这里插入图片描述

最后,需要的可以直接拿过去干上。

标签:String,入门,mqtt,MQTT,connectOptions,new,public,SpringBoot
来源: https://blog.csdn.net/u013067015/article/details/120430248

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有