ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java 使用Mqtt 重连机制 订阅者 +服务端回复

2021-10-26 09:32:55  阅读:258  来源: 互联网

标签:Java String private Mqtt client new import 重连 public


package com.jeecg.tab.mymqtt;

import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import com.jeecg.tab.mqtt.ClientMQTT;

public class MyClient {
//设备属性读取 ----请求
String qone="PG01001/001/properties/get";
//设备修改 ----请求
String qupdate ="PG01001/001/properties/set";
//设备功能请求
String qinvoke="PG01001/001/function/invoke";
//设备消息透传
String qtransparent="PG01001/001//transparent/request";

//监控状态设定

//4.1.6.设备状态读取
//String qtransparent="PG01001/001/properties/get";

//3.3设备属性修改 回复
String hone="PG01001/001/properties/get/reply";
//设备属性 回复
String hupdate ="PG01001/001/properties/set/reply";
//设备属性上报
String uploadtext="PG01001/001/properties/report";
//设备功能上报
String hinvoke="PG01001/001/function/invoke/reply";
//设备事件上报
String uevent="PG01001/001/event/report";
//设备消息透传
String htransparent="PG01001/001//transparent/reply";



private MqttClient client;
private MqttConnectOptions options;
private static String[] myTopics = { "rpm/sfloc", "geng" ,"wang"};
private static int[] myQos = { 1, 1,1 };
private static CopyOnWriteArrayList<Map<String,String>> slistKZ = new CopyOnWriteArrayList<>();



public static CopyOnWriteArrayList<Map<String, String>> getSlistKZ() {
return slistKZ;
}

 

 

public static void setSlistKZ(CopyOnWriteArrayList<Map<String, String>> slistKZ) {
MyClient.slistKZ = slistKZ;
}

 

 

public static void main(String[] args) {
System.out.println("client start...");
MyMqtt myMqtt = new MyMqtt("12345678");
myMqtt.subscribe(myTopics, myQos);
}
}

 

 

package com.jeecg.tab.mymqtt;

import java.util.HashMap;
import java.util.Map;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqtt {
private String host = "tcp://192.168.124.250:1883";
private String userName = "admin";
private String passWord = "admin";
private MqttClient client;
private String id;
private static MyMqtt instance; // = new MyMqtt();
private MqttTopic mqttTopic;
private String myTopic = "wang";
private MqttMessage message;
public MyMqtt(String id) {
// super();
this( id, null, false);
}
//断线重连
// public void reConnect() throws Exception {
// MqttConnectOptions options = new MqttConnectOptions();
// if(null != client) {
// client.connect(option);
// }
// }
public MyMqtt(String id, MqttCallback callback, boolean cleanSession){
try {
//id应该保持唯一性
client = new MqttClient(host, id, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
if (callback == null) {
client.setCallback(new MqttCallback() {

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO Auto-generated method stub
Map<String, String> map=new HashMap<String, String>();
map.put("topic", topic);
map.put("qos", message.getQos()+"");
map.put("message", new String(message.getPayload()));
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
MyClient.getSlistKZ().add(map);

}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub

}

@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
System.out.println("连接断开正在重连");
while (true){
try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
Thread.sleep(1000);

//client.connect(options);
client.reconnect();
// System.out.println("连接成功");
break;
}catch (Exception e){
continue;
}
}

}
});
} else {
client.setCallback(callback);
}
client.connect(options);
//遗嘱
// options.setWill(topic, "close".getBytes(), 1, true);
// client.connect(options);
} catch (MqttException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
private static String[] myTopics = { "/PG01001/+/properties/set", "/PG01001/+/function/invoke" ,"/PG01001/+/properties/get"};
private static int[] myQos = { 0, 0, 0 };
public void sendMessage(String msg) {
sendMessage(myTopic, msg);
}

public void sendMessage(String topic, String msg){
try {
// client = new MqttClient(host, id, new MemoryPersistence());
message = new MqttMessage();
message.setQos(0);
message.setRetained(true);
message.setPayload(msg.getBytes());
mqttTopic = client.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(message);//发布主题
token.waitForCompletion();
} catch (MqttPersistenceException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
} catch (MqttException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}

public void subscribe(String[] topicFilters, int[] qos) {
try {
client.subscribe(topicFilters, qos);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}// 订阅主题

}

}

 

标签:Java,String,private,Mqtt,client,new,import,重连,public
来源: https://www.cnblogs.com/Javawang/p/15464291.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有