ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java实现springboot集成exmq(mqtt协议)

2022-07-12 09:37:54  阅读:210  来源: 互联网

标签:java springboot exmq topic mqtt client import public log


1、application.yml配置

spring:
mqtt:
username: test
password: qwerty123
host-url: tcp://172.18.42.34:32016
client-id: /dataProcessingTopicwf
subscribe-id: /dataProcessing
timeout: 100000
keep-alive-interval: 100
defaultTopic: $queue/+/dataProcessing

2、mqttclient

package com.catl.mqttutil.mqtt.client;

import com.catl.mqttutil.mqtt.callback.PushCallback;
import com.catl.mqttutil.mqtt.model.BytesModel;
import com.catl.mqttutil.mqtt.properties.MqttProperties;
import com.catl.mqttutil.mqtt.utils.MqttUtils;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class MqttCustomClient {

@Autowired
private PushCallback pushCallback;

@Autowired
private MqttProperties mqttProperties;

private static MqttClient client;

public static MqttClient getClient() {
return client;
}

private static void setClient(MqttClient client) {
MqttCustomClient.client = client;
}

// public static String pubTopic;

@PostConstruct
public void init() {
// ReqHeaderUtil.platformKey = stationProperties.getPlatformkey();
// pubTopic = stationProperties.getPlatformkey() + mqttProperties.getClientId();
this.connect();
}

public String getPubTopic(String platformkey) {
return platformkey + mqttProperties.getClientId();
}

public void connect() {
MqttClient client;
try {
log.info("开始连接mqtt服务端,mqttProperties={}", mqttProperties);
client = new MqttClient(mqttProperties.getHostUrl(), "111111111111100", new MemoryPersistence());
MqttCustomClient.setClient(client);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
options.setAutomaticReconnect(true);
client.setCallback(pushCallback);
client.connect(options);
if (client.isConnected()) {
client.subscribe(mqttProperties.getDefaultTopic(), 1);
} else {
IMqttToken token = client.connectWithResult(options);
token.waitForCompletion();
client.subscribe(mqttProperties.getDefaultTopic(), 1);
}
log.info("mqtt连接信息【subTopic=>{}】", mqttProperties.getDefaultTopic());
} catch (Exception e) {
if (e.getMessage().contains("已连接") || e.getMessage().contains("connected")) {
log.info("mqtt客户端已连接", e);
} else {
log.error("mqtt客户端连接异常", e);
}
}
}

/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void publish(int qos, boolean retained, String topic, ByteBuf pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
byte[] array = pushMessage.array();
message.setPayload(array);
MqttTopic mTopic = MqttCustomClient.client.getTopic(topic);
if (null == mTopic) {
log.error("订阅topic[{}]不存在", topic);
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
log.debug("mqtt服务器接收消息(publish) - 成功");
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
log.debug("EMQX服务器接收消息失败!");
}
});
log.error("发布topic[{}]成功", topic);
} catch (MqttException e) {
log.error("发布topic[{}]异常", topic);
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 订阅某个主题
*
* @param topic 主题
* @param qos 连接方式
*/
public void subscribe(String topic, int qos) {
log.info("【sub-开始订阅消息】,topic:【{}】,qos:【{}】", topic, qos);
try {
MqttCustomClient.client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}


/**
* @param commandSign 命令标识
* @param vin 车辆vin码
* @param dataBytes 数据
* @description 发布主题消息
* @date 2021/7/19 下午5:03
* @author junliu
**/
public void publish(short commandSign, String vin, byte[] dataBytes, String pubTopic) {
vin = StringUtil.isNullOrEmpty(vin) ? vin : "00000000000000000000";
BytesModel model = new BytesModel(commandSign, vin, dataBytes);
System.out.println("model" + model);
ByteBuf byteBuf = MqttUtils.toByteBuf(model);
System.out.println("byteBuf" + byteBuf);
publish(1, false, pubTopic, byteBuf);
}

/**
* @param commandSign 命令标识
* @param dataBytes 数据
* @description 发布主题消息
* @date 2021/7/19 下午5:03
* @author junliu
**/
public void publish(short commandSign, byte[] dataBytes, String pubTopic) {
this.publish(commandSign, "00000000000000000", dataBytes, pubTopic);
}

}

3、pushcallback

package com.catl.mqttutil.mqtt.callback;


import com.catl.mqttutil.mqtt.client.MqttCustomClient;
import com.catl.mqttutil.mqtt.model.BaseModel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;




/**
* 消费监听类
*
* @author yanxinghua
* @since 2021/7/19 9:13
*/
@Component
@Slf4j
public class PushCallback implements MqttCallback {

@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt连接断开,错误信息msg-{}", throwable.getMessage());
if (MqttCustomClient.getClient() == null || !MqttCustomClient.getClient().isConnected()) {
log.info("mqtt重连中");
}
}

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
try {
log.info(">>>>>>>>>接收消息主题 : {},接收消息Qos :{}", topic, mqttMessage.getQos());
byte[] payload = mqttMessage.getPayload();
ByteBuf byteBuf = Unpooled.wrappedBuffer(payload);
// 获取控制类型
byte[] msgIdBytes = new byte[1];
byteBuf.getBytes(2, msgIdBytes, 0, 1);
// short msgId = Short.valueOf(ByteBufUtil.hexDump(reverse(msgIdBytes)), 16);
// log.warn(">>>>>>>>>【{}消息报文】:{}", msgId, ByteBufUtil.hexDump(payload));
BaseModel model = new BaseModel(byteBuf);
model.parseBody();
// SpringUtils.applicationContext.publishEvent(model);
} catch (Exception e) {
log.error("接受消息解析失败!msg-{}", e);
}
}

@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}

}

4、pom.xml

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

标签:java,springboot,exmq,topic,mqtt,client,import,public,log
来源: https://www.cnblogs.com/chenxqNo01/p/16468784.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有