标签:Netty websocket String void ctx private new 脚手架 public
1. 引入netty依赖
<netty.version>4.1.48.Final</netty.version>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
2. NettyConfig 配置netty 端口 及心跳
@Configuration
@ConfigurationProperties(prefix = "netty")
public class NettyConfig {
@Value("${netty.port}")
private int port;
@NestedConfigurationProperty //便于yml 解析 配置参数
private HeartBeat heartBeat;
public static class HeartBeat {
int readTimeOut;//读超时
int writeTimeOut;//写超时
int readWriteTimeOut;//读写超时
public int getReadTimeOut() {
return readTimeOut;
}
public void setReadTimeOut(int readTimeOut) {
this.readTimeOut = readTimeOut;
}
public int getWriteTimeOut() {
return writeTimeOut;
}
public void setWriteTimeOut(int writeTimeOut) {
this.writeTimeOut = writeTimeOut;
}
public int getReadWriteTimeOut() {
return readWriteTimeOut;
}
public void setReadWriteTimeOut(int readWriteTimeOut) {
this.readWriteTimeOut = readWriteTimeOut;
}
}
public HeartBeat getHeartBeat() {
return heartBeat;
}
public void setHeartBeat(HeartBeat heartBeat) {
this.heartBeat = heartBeat;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@Bean
public ServerBootstrap serverBootstrap() {
return new ServerBootstrap();
}
@Bean
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup();
}
@Bean
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup();
}
@Bean
public WebSocketInitializerChannelHandler childChannelHandler() {
return new WebSocketInitializerChannelHandler();
}
@Bean
public NettyWebSocketServer nettyWebSocketServer(WebSocketInitializerChannelHandler webSocketInitializerChannelHandler) {
return new NettyWebSocketServer(webSocketInitializerChannelHandler);
}
}
3.yml 文件中设置参数
netty:
port: 4444
heart-beat:
read-time-out: 10
write-time-out: 10
read-write-time-out: 60
4.AcceptorIdleStateTrigger 心跳检测
@Component
@Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
@Autowired
private ChattingService chattingService;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.ALL_IDLE) {
//读写超时移除相应的通道信息
this.chattingService.remove(ctx);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
5.HttpRequestHandler 处理socket 请求 ,获取链接参数
@Component
@Sharable
public class HttpRequestHandler extends SimpleChannelInboundHandler<Object> {
@Autowired
private ChattingService chattingService ;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(channelHandlerContext, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
channelHandlerContext.fireChannelRead(((WebSocketFrame) msg).retain());
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
if (!req.decoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
String uri = req.uri();
String userId = getParm(uri, "userId");
String source = getParm(uri, "source");
ChattingUser chattingUser = new ChattingUser();
chattingUser.setUserId(userId);
chattingUser.setChannelHandlerContext(ctx);
chattingUser.setType("app".equals(source) ? ChattingUser.Type.app : ChattingUser.Type.pc);
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws:/" + req.headers().get(HttpHeaders.Names.HOST) + "/websocket", null, false, Integer.MAX_VALUE);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
chattingUser.setHandshaker(handshaker);
//保存登录的用户信息
this.chattingService.saveLoginUserInfo(chattingUser);
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-Alive,关闭连接
boolean keepAlive = HttpUtil.isKeepAlive(req);
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!keepAlive) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
private String getParm(String uri,String parmName){
Map<String, String> parm = new HashMap<>();
//去掉uri /?
String[] split = uri.replace("/websocketchart?", "").split("&");
for (String key : split) {
String[] kV = key.split("=");
parm.put(kV[0], kV[1]);
}
return parm.get(parmName);
}
}
6.NettyApplicationContext 单例重新开启线程优雅启动
@Component
@Scope("singleton")
public class NettyApplicationContext {
private final Logger logger = LoggerFactory.getLogger(NettyApplicationContext.class);
@Autowired
private NettyWebSocketServer nettyWebSocketServer;
private Thread nettyThread;
@PostConstruct
public void init() {
nettyThread = new Thread(nettyWebSocketServer);
logger.info("开启独立线程,启动Netty WebSocket服务器...");
nettyThread.start();
}
/**
* 描述:Tomcat服务器关闭前需要手动关闭Netty Websocket相关资源,否则会造成内存泄漏。
* 1. 释放Netty Websocket相关连接;
* 2. 关闭Netty Websocket服务器线程。(强行关闭,是否有必要?)
*/
@PreDestroy
public void close() {
logger.info("正在释放Netty Websocket相关连接...");
nettyWebSocketServer.close();
logger.info("正在关闭Netty Websocket服务器线程...");
nettyThread.stop();
logger.info("系统成功关闭!");
}
}
7.NettyChannelMap 静态链接存放类,存放用户与链接的关系
public class NettyChannelMap {
/**
* 用户信息
*/
private static List<NettyUser> userInfoList = new ArrayList<>();
//保存用户信息
public static void saveUserInfo(ChattingUser chattingUser) {
NettyUser nettyUser = new NettyUser();
nettyUser.setUserKey(chattingUser.getUserId());
List<NettyUserInfo> nettyUserInfos = new ArrayList<>();
NettyUserInfo nettyUserInfo = new NettyUserInfo();
nettyUserInfo.setChattingUser(chattingUser);
nettyUserInfo.setCtx(chattingUser.getChannelHandlerContext());
nettyUserInfo.setType(chattingUser.getType());
nettyUserInfos.add(nettyUserInfo);
nettyUser.setList(nettyUserInfos);
NettyChannelMap.userInfoList.add(nettyUser);
}
//删除用户信息
public static void removeUserInfo(ChannelHandlerContext ctx) {
if (NettyChannelMap.userInfoList.size() > 0) {
for (NettyUser nettyUser : userInfoList) {
List<NettyUserInfo> list = nettyUser.getList();
if (list.size() > 0) {
for (NettyUserInfo nettyUserInfo : list) {
if (ctx.channel().id().asLongText().equals(nettyUserInfo.getCtx().channel().id().asLongText())) {
list.remove(nettyUserInfo);
}
}
}
}
}
}
//获取handshaker
public static WebSocketServerHandshaker get_handshaker(ChannelHandlerContext ctx){
WebSocketServerHandshaker webSocketServerHandshaker = null;
if (NettyChannelMap.userInfoList.size() > 0) {
for (NettyUser nettyUser : userInfoList) {
List<NettyUserInfo> list = nettyUser.getList();
if (list.size() > 0) {
for (NettyUserInfo nettyUserInfo : list) {
if (ctx.channel().id().asLongText().equals(nettyUserInfo.getCtx().channel().id().asLongText())) {
ChattingUser chattingUser = nettyUserInfo.getChattingUser();
webSocketServerHandshaker = chattingUser.getHandshaker();
}
}
}
}
}
return webSocketServerHandshaker;
}
}
8.NettyWebSocketServer netty服务端设置
public class NettyWebSocketServer implements Runnable{
private final Logger logger = LoggerFactory.getLogger(NettyWebSocketServer.class);
@Value("#{nettyConfig.port}")
private int port;
@Autowired
private EventLoopGroup bossGroup;
@Autowired
private EventLoopGroup workerGroup;
@Autowired
private ServerBootstrap serverBootstrap;
private ChannelHandler webSocketInitializerChannelHandler;
private ChannelFuture serverChannelFuture;
@Override
public void run() {
build();
}
public void build() {
try {
long begin = System.currentTimeMillis();
//boss辅助客户端的tcp连接请求 worker负责与客户端之前的读写操作
serverBootstrap.group(bossGroup, workerGroup)
//配置客户端的channel类型
.channel(NioServerSocketChannel.class)
//配置TCP参数,握手字符串长度设置
.option(ChannelOption.SO_BACKLOG, 1024)
//TCP_NODELAY算法,尽可能发送大块数据,减少充斥的小块数据
.option(ChannelOption.TCP_NODELAY, true)
//开启心跳包活机制,就是客户端、服务端建立连接处于ESTABLISHED状态,超过2小时没有交流,机制会被启动
.childOption(ChannelOption.SO_KEEPALIVE, true)
//配置固定长度接收缓存区分配器
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048))
//绑定I/O事件的处理类,WebSocketChildChannelHandler中定义
.childHandler(webSocketInitializerChannelHandler);
long end = System.currentTimeMillis();
serverChannelFuture = serverBootstrap.bind(port).sync();
logger.info("Netty Websocket服务器启动完成,耗时 " + (end - begin) + " ms,已绑定端口 " + port + " 阻塞式等候客户端连接");
/*// 等待服务器 socket 关闭 。
serverChannelFuture.channel().closeFuture().sync();*/
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
e.printStackTrace();
}
}
public void close(){
serverChannelFuture.channel().close();
Future<?> bossGroupFuture = bossGroup.shutdownGracefully();
Future<?> workerGroupFuture = workerGroup.shutdownGracefully();
try {
bossGroupFuture.await();
workerGroupFuture.await();
} catch (InterruptedException ignore) {
ignore.printStackTrace();
}
}
public NettyWebSocketServer(ChannelHandler webSocketInitializerChannelHandler) {
this.webSocketInitializerChannelHandler = webSocketInitializerChannelHandler;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
9.NettyWebSocketServerHandler socket处理
@Component
@Sharable
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private final Logger logger = LoggerFactory.getLogger(NettyWebSocketServerHandler.class);
@Autowired
private WebSocketMessageHandlerUtil webSocketMessageHandlerUtil;
// 用来保存所有的客户端连接
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {
handlerWebSocketFrame(channelHandlerContext, webSocketFrame);
}
private void handlerWebSocketFrame(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame) throws Exception {
// 关闭请求
if (frame instanceof CloseWebSocketFrame) {
WebSocketServerHandshaker handshaker = NettyChannelMap.get_handshaker(channelHandlerContext);
if (handshaker == null) {
this.webSocketMessageHandlerUtil.sendErrorMessage(channelHandlerContext, "不存在的客户端连接!");
} else {
handshaker.close(channelHandlerContext.channel(), (CloseWebSocketFrame) frame.retain());
}
return;
}
// ping请求
if (frame instanceof PingWebSocketFrame) {
channelHandlerContext.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
//支持二进制数据
if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame webSocketFrame = (BinaryWebSocketFrame) frame.copy();
ChannelFuture channelFuture = channelHandlerContext.channel().writeAndFlush(webSocketFrame);
}
// 只支持文本格式,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame)) {
this.webSocketMessageHandlerUtil.sendErrorMessage(channelHandlerContext, "仅支持文本(Text)格式,不支持二进制消息");
}
// 客服端发送过来的消息
String text = ((TextWebSocketFrame) frame).text();
//消息处理
this.webSocketMessageHandlerUtil.messageHandler(text, channelHandlerContext);
}
/**
* 通道注册
*
* @param ctx
* @throws Exception
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
logger.info("通道注册:" + ctx.channel().id().asLongText());
}
/**
* 当有新的客户端连接服务器之后,会自动调用这个方法
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
logger.info("有新的客户端连接加入:" + ctx.channel().id().asLongText());
// 将新的通道加入到clients
clients.add(ctx.channel());
}
/**
* 异常关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.info("客户端异常断开:" + ctx.channel().id().asLongText());
clients.remove(ctx.channel());
NettyChannelMap.removeUserInfo(ctx);
ctx.channel().close();
}
/**
* 通道 主动关闭
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
clients.remove(ctx.channel());
NettyChannelMap.removeUserInfo(ctx);
logger.info("netty服务器通道关闭");
}
}
10.WebSocketInitializerChannelHandler 初始化通道
@Component
public class WebSocketInitializerChannelHandler extends ChannelInitializer<SocketChannel> {
@Autowired
private NettyConfig nettyConfig;
@Autowired
private HttpRequestHandler httpRequestHandler;
@Autowired
private NettyWebSocketServerHandler nettyWebSocketServerHandler;
@Autowired
private AcceptorIdleStateTrigger idleStateTrigger ;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
NettyConfig.HeartBeat heartBeat = this.nettyConfig.getHeartBeat();
// HTTP编码解码器
socketChannel.pipeline().addLast("http-codec", new HttpServerCodec());
// 把HTTP头、HTTP体拼成完整的HTTP请求
socketChannel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
// 方便大文件传输,不过实质上都是短的文本数据
socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
socketChannel.pipeline().addLast("http-handler", httpRequestHandler);
socketChannel.pipeline().addLast("websocket-handler", nettyWebSocketServerHandler);
//增加服务端的心跳监测
socketChannel.pipeline().addLast(new IdleStateHandler(
heartBeat.getReadTimeOut(),
heartBeat.getWriteTimeOut(),
heartBeat.getReadWriteTimeOut(),
TimeUnit.SECONDS));
socketChannel.pipeline().addLast(idleStateTrigger);
}
}
11.ChattingMessage 消息类封装
@Data
public class ChattingMessage implements Serializable {
public enum MessageType {
Text_and_Emoticons("文字与表情"),
Images("图片"),
Audio("音频"),
Video("视频");
MessageType(String description) {
this.description = description;
}
private String description;//描述
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
}
/**
* 发送者
*/
private Long fromId;
/**
* 接收者
*/
private Long toId;
/**
* 聊天内容
*/
private String content;
/**
* 消息类型
*/
private MessageType messageType;
/**
* 聊天类型
*/
private ChattingType chattingType;
/**
* 文件信息
*/
private Long fileInfoId;
}
12.ChattingType 类型封装
public enum ChattingType {
//注册
REGISTER,
//心跳断线重连
Heartbeat_Mechanism,
//单聊
SINGLE_SENDING,
//群聊
GROUP_SENDING,
//单一文件发送
FILE_MSG_SINGLE_SENDING,
//文件群发
FILE_MSG_GROUP_SENDING,
//转接
TRANSFORM_SENDING,
//自动回复 auto_response
AUTO_RESPONSE,
//退出
REMOVE;
}
13.ChattingUser 关系存放封装
@Data
public class ChattingUser {
public enum Type{
app,
pc
}
private String userId;
private ChannelHandlerContext channelHandlerContext;
private Type type;
private WebSocketServerHandshaker handshaker;
}
14.NettyUser
@Data
public class NettyUser implements Serializable {
private String userKey;
private List<NettyUserInfo> list;
}
15.NettyUserInfo
@Data
public class NettyUserInfo implements Serializable {
private ChannelHandlerContext ctx;
private ChattingUser chattingUser;
private ChattingUser.Type type;
}
16.WebSocketMessageHandlerUtil socket消息处理工具类
@Component
public class WebSocketMessageHandlerUtil {
@Autowired
private ChattingService chattingService;
public void sendMessage(ChannelHandlerContext ctx, String msg) {
Result result = new Result(true, StatusCode.OK, msg);
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(result)));
}
public void sendErrorMessage(ChannelHandlerContext ctx, String errorMsg) {
String result = new Result(false, StatusCode.ERROR, errorMsg).toString();
ctx.channel().writeAndFlush(new TextWebSocketFrame(result));
}
public void messageHandler(String text, ChannelHandlerContext channelHandlerContext) {
JSONObject param = null;
try {
param = JSONObject.parseObject(text);
} catch (Exception e) {
sendErrorMessage(channelHandlerContext, "JSON字符串转换出错!");
e.printStackTrace();
}
if (param == null) {
sendErrorMessage(channelHandlerContext, "参数为空!");
return;
}
String chattingType = (String) param.get("ChattingType");
getChattingType(param, chattingType,text,channelHandlerContext);
}
private void getChattingType(JSONObject param, String chattingType,String text, ChannelHandlerContext channelHandlerContext) {
ChattingType chatting_type = null;
switch (chattingType) {
case "REGISTER":
chatting_type = ChattingType.REGISTER;
break;
case "SINGLE_SENDING":
chatting_type = ChattingType.SINGLE_SENDING;
this.chattingService.singleSending(param,text);
break;
case "GROUP_SENDING":
chatting_type = ChattingType.GROUP_SENDING;
break;
case "FILE_MSG_SINGLE_SENDING":
chatting_type = ChattingType.FILE_MSG_SINGLE_SENDING;
break;
case "FILE_MSG_GROUP_SENDING":
chatting_type = ChattingType.FILE_MSG_GROUP_SENDING;
break;
case "TRANSFORM_SENDING":
chatting_type = ChattingType.TRANSFORM_SENDING;
break;
case "Heartbeat_Mechanism":
chatting_type = ChattingType.Heartbeat_Mechanism;
this.chattingService.heartbeatMechanism(param);
break;
case "AUTO_RESPONSE":
chatting_type = ChattingType.AUTO_RESPONSE;
break;
case "REMOVE":
chatting_type = ChattingType.REMOVE;
this.chattingService.remove(channelHandlerContext);
break;
}
}
}
标签:Netty,websocket,String,void,ctx,private,new,脚手架,public 来源: https://blog.csdn.net/weixin_45637293/article/details/117585622
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。