标签:Netty pipeline 群聊 void 系统 new public channel 客户端
要求:
1、实现客户端与服务器端之间的数据通讯(非阻塞)
2、多人群聊
3、服务器端监测客户端用户上线、离线、并实现消息转发
4、客户端可以通过channel无阻塞发送消息给其他用户,同时可以接收到其他用户发来的消息(由服务器进行转发)
服务器端
public class GroupChatServer {
private int port; //监听端口
public GroupChatServer(int port) {
this.port = port;
}
//处理客户端请求
public void run() throws Exception {
//创建线程组
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(); //默认大小为cpu核数 * 2
try{
//创建服务器引导启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true) //活跃状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//往pipeline里面添加处理器
//1、向pipeline添加一个解码器
pipeline.addLast("decoder", new StringDecoder());
//2、向pipeline添加一个编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的业务处理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("Netty服务器启动...");
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//监听关闭事件
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(9999).run();
}
}
服务器Handler
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//定义一个channel组,管理所有channel
//GlobalEventExecutor.INSTANCE 是一个全局的事件执行器(单例)
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//用于输出时间
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//handlerAdded 表示链接建立,一旦链接,第一个被执行的方法
//将当前channel加入channelGroup中进行管理
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将该客户加入聊天室的信息推送给其他在线的客户端
/**
* channelGroup中的writeAndFlush 方法会交channelGroup中所有管理的channel进行遍历并发送消息
* 无需自己再遍历
*/
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "加入聊天室 "+sdf.format(new Date()) +"\n");
channelGroup.add(channel);
}
//表示channel处于活动状态、提示 xx上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+"上线了...");
}
//表示channel处于非活动状态触发
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "离开聊天室 "+sdf.format(new Date()) +"\n");
System.out.println("当前聊天室人数(ChannelGroup Size):"+channelGroup.size());
}
//读取数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//获取当前channel
Channel channel = ctx.channel();
//遍历channelGroup,根据不同情况 回送不同消息
channelGroup.forEach(ch -> {
if (channel != ch) { //直接转发消息到其他channel
ch.writeAndFlush("[客户端]" + channel.remoteAddress() + "发送了消息 :" + msg + " "+sdf.format(new Date()) +"\n");
} else { //回显自己发送的消息
ch.writeAndFlush("[自己]发送了消息" + msg + " "+sdf.format(new Date()) + "\n");
}
});
}
//发生异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭当前通道
ctx.close();
}
}
客户端
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host=host;
this.port = port;
}
public void run() throws Exception{
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder())
.addLast("encoder", new StringEncoder())
.addLast(new GroupChatClientHandler());//加入自定义handler
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
System.out.println("--------"+channel.localAddress()+"---------");
//客户端需要输入信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通过channel发送到服务器端
channel.writeAndFlush(msg + "\r\n");
}
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 9999).run();
}
}
客户端Handler
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
效果图:
服务器端检测上线:
客户端发送消息:
其余客户端接收消息:
标签:Netty,pipeline,群聊,void,系统,new,public,channel,客户端 来源: https://blog.csdn.net/zhangxuchuan111/article/details/109998668
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。