ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

boot netty mqtt

2022-05-12 01:33:02  阅读:143  来源: 互联网

标签:netty mqttMessage 报文 boot mqtt new import channel


package com.luban.netty2;

/**
* @auther:zhoulei
* @description: BootNettyApplication
* @date : 2022/5/12 0:16
* QQ:20971053
*/

import com.luban.netty2.core.BootNettyServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BootNettyApplication
{
public static void main( String[] args )
{
SpringApplication app = new SpringApplication(BootNettyApplication.class);
app.run(args);
// 启动 1883
new BootNettyServer().startup();
}
}


package com.luban.netty2.core;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;

/**
* @auther:zhoulei
* @description: BootNettyServer
* @date : 2022/5/12 0:17
* QQ:20971053
*/
public class BootNettyServer {

private int port = 1883;

private NioEventLoopGroup bossGroup;

private NioEventLoopGroup workGroup;

/**
* 启动服务
* @throws InterruptedException
*/
public void startup() {

try {
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup);
bootstrap.channel(NioServerSocketChannel.class);

bootstrap.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_RCVBUF, 10485760);

bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ChannelPipeline channelPipeline = ch.pipeline();
// 设置读写空闲超时时间
channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
channelPipeline.addLast("decoder", new MqttDecoder());
channelPipeline.addLast(new BootChannelInboundHandler());
}
});
ChannelFuture f = bootstrap.bind(port).sync();
f.channel().closeFuture().sync();

} catch (Exception e) {
System.out.println("start exception"+e.toString());
}

}

/**
* 关闭服务
*/
public void shutdown() throws InterruptedException {
if (workGroup != null && bossGroup != null) {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
System.out.println("shutdown success");
}
}

}

package com.luban.netty2.core;


import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;


public class BootMqttMsgBack {

private static Logger log = LoggerFactory.getLogger(BootMqttMsgBack.class);

/**
* 确认连接请求
*
* @param channel
* @param mqttMessage
*/
public static void connack(Channel channel, MqttMessage mqttMessage) {
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();

// 构建返回报文, 可变报头
MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
// 构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK, mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
// 构建CONNACK消息体
MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
log.info("back--" + connAck.toString());
channel.writeAndFlush(connAck);
}

/**
* 根据qos发布确认
*
* @param channel
* @param mqttMessage
*/
public static void puback(Channel channel, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
mqttPublishMessage.payload().readBytes(headBytes);
String data = new String(headBytes);
System.out.println("publish data--" + data);

switch (qos) {
case AT_MOST_ONCE: // 至多一次
break;
case AT_LEAST_ONCE: // 至少一次
// 构建返回报文, 可变报头
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
// 构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK, mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
// 构建PUBACK消息体
MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
log.info("back--" + pubAck.toString());
channel.writeAndFlush(pubAck);
break;
case EXACTLY_ONCE: // 刚好一次
// 构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_LEAST_ONCE, false, 0x02);
// 构建返回报文, 可变报头
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2, mqttMessageIdVariableHeaderBack2);
log.info("back--" + mqttMessageBack.toString());
channel.writeAndFlush(mqttMessageBack);
break;
default:
break;
}
}

/**
* 发布完成 qos2
*
* @param channel
* @param mqttMessage
*/
public static void pubcomp(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
// 构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
// 构建返回报文, 可变报头
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
log.info("back--" + mqttMessageBack.toString());
channel.writeAndFlush(mqttMessageBack);
}

/**
* 订阅确认
*
* @param channel
* @param mqttMessage
*/
public static void suback(Channel channel, MqttMessage mqttMessage) {
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
// 构建返回报文, 可变报头
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
//log.info(topics.toString());
List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
for (int i = 0; i < topics.size(); i++) {
grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
}
// 构建返回报文 有效负载
MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
// 构建返回报文 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2 + topics.size());
// 构建返回报文 订阅确认
MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack, variableHeaderBack, payloadBack);
log.info("back--" + subAck.toString());
channel.writeAndFlush(subAck);
}

/**
* 取消订阅确认
*
* @param channel
* @param mqttMessage
*/
public static void unsuback(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
// 构建返回报文 可变报头
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
// 构建返回报文 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
// 构建返回报文 取消订阅确认
MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack, variableHeaderBack);
log.info("back--" + unSubAck.toString());
channel.writeAndFlush(unSubAck);
}

/**
* 心跳响应
*
* @param channel
* @param mqttMessage
*/
public static void pingresp(Channel channel, MqttMessage mqttMessage) {
// 心跳响应报文 11010000 00000000 固定报文
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
log.info("back--" + mqttMessageBack.toString());
channel.writeAndFlush(mqttMessageBack);
}


}

package com.luban.netty2.core;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* @auther:zhoulei
* @description: BootChannelInboundHandler
* @date : 2022/5/12 0:18
* QQ:20971053
*/
@ChannelHandler.Sharable
public class BootChannelInboundHandler extends ChannelInboundHandlerAdapter {

private Logger log = LoggerFactory.getLogger(this.getClass());

/**
* 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}

/**
* 客户端与服务端 断连时执行 channelInactive方法之后执行
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
}

/**
* 从客户端收到新的数据时,这个方法会在收到消息时被调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
if (null != msg) {
MqttMessage mqttMessage = (MqttMessage) msg;
log.info("info--" + mqttMessage.toString());
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
Channel channel = ctx.channel();

if (mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)) {
// 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
// to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
BootMqttMsgBack.connack(channel, mqttMessage);
}

switch (mqttFixedHeader.messageType()) {
case PUBLISH: // 客户端发布消息
// PUBACK报文是对QoS 1等级的PUBLISH报文的响应
System.out.println("123");
BootMqttMsgBack.puback(channel, mqttMessage);
break;
case PUBREL: // 发布释放
// PUBREL报文是对PUBREC报文的响应
// to do
BootMqttMsgBack.pubcomp(channel, mqttMessage);
break;
case SUBSCRIBE: // 客户端订阅主题
// 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
// 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
// SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
// to do
BootMqttMsgBack.suback(channel, mqttMessage);
break;
case UNSUBSCRIBE: // 客户端取消订阅
// 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
// to do
BootMqttMsgBack.unsuback(channel, mqttMessage);
break;
case PINGREQ: // 客户端发起心跳
// 客户端发送PINGREQ报文给服务端的
// 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
// 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
BootMqttMsgBack.pingresp(channel, mqttMessage);
break;
case DISCONNECT: // 客户端主动断开连接
// DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
// to do
break;
default:
break;
}
}
}

/**
* 从客户端收到新的数据、读取完成时调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
}

/**
* 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}

/**
* 客户端与服务端第一次建立连接时执行
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}

/**
* 客户端与服务端 断连时执行
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
super.channelInactive(ctx);
}

/**
* 服务端 当读超时时 会调用这个方法
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
super.userEventTriggered(ctx, evt);
ctx.close();
}


@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}

}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>mqtt_demo2</artifactId>


<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>



<dependencies>

<!--web模块的启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- netty依赖 springboot2.x自动导入版本 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

</dependencies>
</project>

标签:netty,mqttMessage,报文,boot,mqtt,new,import,channel
来源: https://www.cnblogs.com/mfk11/p/16260672.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有