ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Reactor模型概述 && Netty

2021-07-16 13:02:09  阅读:160  来源: 互联网

标签:&& Netty io netty System ByteBuf byteBuf import Reactor


目录

Reactor模型概述

 单线程模型

多线程模型

主从多线程模型

 Netty模型(主要是主从多线程模型)

第一个Netty服务

服务端

客户端

四 Netty核心组件

4.1 Channel

4.2 EventLoopGroup、EventLoop

4.3 ChannelHandler

4.4 ChannelPipeline

4.5 Bootstrap

4.6 Future

4.7 组件小结

五 缓存区-ByteBuf

5.1 ByteBuf概述

5.2 ByteBuf基本使用

5.3 ByteBuf 使⽤模式

5.3.1 根据存放缓冲区,分为三类

5.3.2 缓存区选择 

5.3.3 ByteBuf的分配

5.5 ByteBuf的释放  

5.5.1 ByteBuf的手动释放

5.5.1 ByteBuf的自动释放

5.6 ByteBuf小结


Reactor模型概述

1. 常见的Reactor模型

是一种并发编程模型,是一种思想,具有指导意义。

2. 常见的Reactor模型

单线程模型、多线程模型、主从多线程模型。netty非常友好支持前面三种模型,一般采用主从架构方式。

3. Reactor模型三种角色

a Reactor 监听和分配事件
b Acceptor 处理客户端新连接,并分配请求到处理链中。
c Handler 将自身和事件绑定,执行读写操作(完成channel的读入,执行业务逻辑并将结果写道channel)。

 单线程模型

上图说明

a 一个线程完成业务处理。

b Reactor相当于一个多路复用器,用于监听事件,并把发生的事件传递给Handler或者Acceptor。

c 如果是建立连接事件,Reactor传递给Acceptor。如果是读写事件,Reactor传给Handler。

1. 优点:

a 优点  结构简单、单线程完成,没有进程通信问题。

对一些业务场景简单,对性能要求不高的应用场景。

2. 缺点:

a 发挥不了服务器多核优势

 b 客户端连接过多导致客户端连接多,Reactor线程负载过重。导致客户端连接超时,   最终导致大量信息积压。性能低。    

c 单点故障后,导致系统通信故障。

多线程模型

 上图说明:

相对于单线程而言,不同点在于,Handler只负责用户响应和事件分发。真正业务逻辑在work线程池中处理。

 1. 存在问题

a 多线程数据共享比较复杂。如子线程完成业务处理后,把结果传递给主线程Reactor,
  就会涉及数据的互斥和保护机制。
  
b Reactor承担所有的监听和响应。如果百万客户端连接,获取服务端进行客户端握手安
  全认证,认证本身就很消耗性能。

主从多线程模型

 上图说明

a Reactor分成两个部分,MainReactor负责监听server socket,用来处理网络io建立,
将建立的socketChannel指定注册给SubReactor。
b SubReactor建立和socket数据交互和事件业务处理。

优点:

a 响应快 不必为单个同步事件所阻塞,虽然Reactor本身是同步的
b 可扩展性强 通过扩展SubReactor充分利用CPU资源
c 可复用性高 该模型和具体事件处理逻辑无关,具有很高复用性。

 Netty模型(主要是主从多线程模型)

 上图解释:

a 在netty模型中,负责处理新连接的是BossGroup,负责其他事件的是WorkderGroup。(Group代表线程池的概念)

b NioEventLoop表示一个不断循环处理任务的线程,用于监听绑定在上面的读写事件。

c PipLine里面放着一个个ChannelHandler,ChannelHandler用于业务处理。

第一个Netty服务

服务端

1. 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.haopt.iot</groupId>
    <artifactId>first-netty</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 2. 服务端-MyRPCServer

package com.haopt.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyRPCServer {
    public void start(int port) throws Exception {
        // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // 工作线程,线程数默认是:cpu核数*2
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker) //设置线程组
                    .channel(NioServerSocketChannel.class)  //配置server通道
                    .childHandler(new MyChannelInitializer()); //worker线程的处理器
            //ByteBuf 的分配要设置为非池化,否则不能切换到堆缓冲区模式
            serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("服务器启动完成,端口为:" + port);
            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } finally {
            //优雅关闭
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}

3. 服务端-ChannelHandler

package com.haopt.netty.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class MyChannelHandler extends ChannelInboundHandlerAdapter {
    /**
    * 获取客户端发来的数据
    * @param ctx
    * @param msg
    * @throws Exception
    */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
        System.out.println("客户端发来数据:" + msgStr);
        //向客户端发送数据
        ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
    }
    
    /**
    * 异常处理
    * @param ctx
    * @param cause
    * @throws Exception
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

4. 测试用例

package com.haopt.netty.myrpc;
import com.haopt.netty.server.MyRPCServer;
import org.junit.Test;
public class TestServer {
    @Test
    public void testServer() throws Exception{
        MyRPCServer myRPCServer = new MyRPCServer();
        myRPCServer.start(5566);
    }
}

客户端

1. 客户端-client

package com.haopt.netty.client;
import com.haopt.netty.client.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyRPCClient {
    public void start(String host, int port) throws Exception {
        //定义⼯作线程组
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //注意:client使⽤的是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker)
            .channel(NioSocketChannel.class) //注意:client使⽤的是NioSocketChannel
            .handler(new MyClientHandler());
            //连接到远程服务
            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().closeFuture().sync();
        } finally {
             worker.shutdownGracefully();
        }
    }
}

2. 客户端-client

package com.haopt.netty.client;
import com.haopt.netty.client.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyRPCClient {
    public void start(String host, int port) throws Exception {
        //定义⼯作线程组
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //注意:client使⽤的是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker)
            .channel(NioSocketChannel.class) //注意:client使⽤的是NioSocketChannel
            .handler(new MyClientHandler());
            //连接到远程服务
            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().closeFuture().sync();
        } finally {
             worker.shutdownGracefully();
        }
    }
}

2. 客户端-(ClientHandler)

package com.haopt.netty.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("接收到服务端的消息:" +
        msg.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 向服务端发送数据
        String msg = "hello";
        ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

四 Netty核心组件

4.1 Channel

1. 初识Channel

a 可以理解为socket连接,客户端和服务端连接的时候会创建一个channel。
负责基本的IO操作,例如:bind()、connect()、read()、write()。
b Netty的Channel接口所提供的API,大大减少了Socket类复杂性

2. 常见Channel(不同的协议和阻塞类型的连接会有不同的Channel类型与之对应)

a NioSocketChannel,NIO的客户端 TCP Socket 连接。

b NioServerSocketChannel,NIO的服务器端 TCP Socket 连接。

c NioDatagramChannel, UDP 连接。

d NioSctpChannel,客户端 Sctp 连接。

e NioSctpServerChannel,Sctp 服务器端连接,这些通道涵盖了UDP和TCP⽹络IO以及⽂件IO。

4.2 EventLoopGroup、EventLoop

1. 概述

有了Channel连接服务,连接之间消息流动。服务器发出消息称为出站,服务器接受消息称为入站。
那么消息出站和入站就产生了事件例如:连接已激活;数据读取;用户事件;异常事件;打开连接;
关闭连接等等。有了事件,有了事件就需要机制来监控和协调事件,这个机制就是EventLoop。

2. 初识EventLoopGroup、EventLoop

 对上图解释(Event Loop类似于Selector,轮询Channel)

a 一个EventLoopGroup包含一个或者多个EventLoop
b 一个EventLoop在生命周期内之和一个Thread绑定
c EventLoop上所有的IO事件在它专有的Thread上被处理。
d Channel在它生命周期只注册于一个Event Loop
e 一个Event Loop可能被分配给一个或者多个Channel

3. 代码实现

// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();

4.3 ChannelHandler

1. 初识ChannelHandler

对于数据的出站和入栈的业务逻辑都是在ChannelHandler中。

 2. 对于出站和入栈对应的ChannelHandler

ChannelInboundHandler ⼊站事件处理器
ChannelOutBoundHandler 出站事件处理器

3. 开发中常用的ChannelHandler

a 在服务端编写ChannelHandler时继承的是ChannelInboundHandlerAdapter
b 在客户端编写ChannelHandler时继承的是SimpleChannelInboundHandler

注意: 两者的区别在于,前者不会释放消息数据的引⽤,⽽后者会释放消息数据的引⽤。

4.4 ChannelPipeline

1. 初识ChannelPipeline

将ChannelHandler串起来。一个Channel包含一个ChannelPipeline,而ChannelPipeline维护者一个ChannelHandler列表。
ChannelHandler与Channel和ChannelPipeline之间的映射关系,由ChannelHandlerContext进⾏维护。

 如上图解释

ChannelHandler按照加⼊的顺序会组成⼀个双向链表,⼊站事件从链表的head往后传递到最后⼀个ChannelHandler。
出站事件从链表的tail向前传递,直到最后⼀个ChannelHandler,两种类型的ChannelHandler相互不会影响。

4.5 Bootstrap

1. 初识BootStrap

是引导作用,配置整个netty程序,将各个组件串起来,最后绑定接口,启动服务。

2. Bootstrap两种类型(Bootstrap、ServerBootstrap)

客户端只需要一个EventLoopGroup,服务端需要两个EventLoopGroup。

 上图解释

与ServerChannel相关联的EventLoopGroup 将分配⼀个负责为传⼊连接请求创建 Channel 的EventLoop。
⼀旦连接被接受,第⼆个 EventLoopGroup 就会给它的 Channel 分配⼀个 EventLoop。

4.6 Future

1. 初识

操作完成时通知应用程序的方式。这个对象可以看做异步操作执行结果占位符,它在将来某个时刻完成,并提供对其结果的访问。

2. ChannelFuture的由来

JDK 预置了 interface java.util.concurrent.Future,但是其所提供的实现,
只允许⼿动检查对应的操作是否已经完成,或者⼀直阻塞直到它完成。这是⾮常
繁琐的,所以 Netty 提供了它⾃⼰的实现--ChannelFuture,⽤于在执⾏异步
操作的时候使⽤。

3. Netty为什么完全是异步?

a ChannelFuture提供了⼏种额外的⽅法,这些⽅法使得我们能够注册⼀个或者多个  ChannelFutureListener实例。

b 监听器的回调⽅法operationComplete(),将会在对应的操作完成时被调⽤。
 然后监听器可以判断该操作是成功地完成了还是出错了。
  
c 每个 Netty 的出站 I/O 操作都将返回⼀个 ChannelFuture,也就是说,
 它们都不会阻塞。所以说,Netty完全是异步和事件驱动的。

4.7 组件小结

 上图解释

将组件串起来

五 缓存区-ByteBuf

5.1 ByteBuf概述

1. 初识ByteBuf

JavaNIO提供了缓存容器(ByteBuffer),但是使用复杂。因此netty引入缓存ButeBuf,
一串字节数组构成。

2. ByteBuf两个索引(readerIndex,writerIndex)

a readerIndex 将会根据读取的字节数递增
b writerIndex 也会根据写⼊的字节数进⾏递增

注意:如果readerIndex超过了writerIndex的时候,Netty会抛出IndexOutOf-BoundsException异常。

5.2 ByteBuf基本使用

1. 读取

package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf01 {
    public static void main(String[] args) {
        //构造
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",
        CharsetUtil.UTF_8);
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        while (byteBuf.isReadable()){ //⽅法⼀:内部通过移动readerIndex进⾏读取
            System.out.println((char)byteBuf.readByte());
        }
        //⽅法⼆:通过下标直接读取
        for (int i = 0; i < byteBuf.readableBytes(); i++) {
            System.out.println((char)byteBuf.getByte(i));
        }
        //⽅法三:转化为byte[]进⾏读取
        byte[] bytes = byteBuf.array();
        for (byte b : bytes) {
        System.out.println((char)b);
        }
    }
}

2. 写入

package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf02 {
    public static void main(String[] args) {
        //构造空的字节缓冲区,初始⼤⼩为10,最⼤为20
        ByteBuf byteBuf = Unpooled.buffer(10,20);
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        for (int i = 0; i < 5; i++) {
            byteBuf.writeInt(i); //写⼊int类型,⼀个int占4个字节
        }
        System.out.println("ok");
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        while (byteBuf.isReadable()){
            System.out.println(byteBuf.readInt());
        }
    }
}

3. 丢弃已读字节

package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf03 {
    public static void main(String[] args) {
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8);
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        while (byteBuf.isReadable()){
            System.out.println((char)byteBuf.readByte());
        }
        byteBuf.discardReadBytes(); //丢弃已读的字节空间
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
    }
}

 4. clear()

package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf04 {
    public static void main(String[] args) {
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8);
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        byteBuf.clear(); //重置readerIndex 、 writerIndex 为0
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
    }
}

5.3 ByteBuf 使⽤模式

5.3.1 根据存放缓冲区,分为三类

1. 堆缓存区

内存的分配和回收速度⽐较快,可以被JVM⾃动回收,缺点是,如果进⾏socket的IO读写,需要额外做⼀次内存复制,将堆内存对应的缓冲区复制到内核Channel中,性能会有⼀定程度的下降。
由于在堆上被 JVM 管理,在不被使⽤时可以快速释放。可以通过 ByteBuf.array() 来获取 byte[] 数
据。

2. 直接缓存区 

⾮堆内存,它在对外进⾏内存分配,相⽐堆内存,它的分配和回收速度会慢⼀些,但是
将它写⼊或从Socket Channel中读取时,由于减少了⼀次内存拷⻉,速度⽐堆内存块。

3. 复合缓存区

顾名思义就是将上述两类缓冲区聚合在⼀起。Netty 提供了⼀个 CompsiteByteBuf,
可以将堆缓冲区和直接缓冲区的数据放在⼀起,让使⽤更加⽅便。

5.3.2 缓存区选择 

//默认使⽤的是DirectByteBuf,如果需要使⽤HeapByteBuf模式,则需要进⾏系统参数的设置.

//netty中IO操作都是基于Unsafe完成的
System.setProperty("io.netty.noUnsafe", "true"); 
//ByteBuf的分配要设置为⾮池化,否则不能切换到堆缓冲器模式
serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT);

5.3.3 ByteBuf的分配

1. Netty 提供了两种 ByteBufAllocator的实现

PooledByteBufAllocator,实现了ByteBuf的对象的池化,提⾼性能减少并最⼤限度地减少内存碎⽚。
UnpooledByteBufAllocator,没有实现对象的池化,每次会⽣成新的对象实例。

2. 代码实现

//通过ChannelHandlerContext获取ByteBufAllocator实例
ctx.alloc();
//通过channel也可以获取
channel.alloc();

//Netty默认使⽤了PooledByteBufAllocator

//可以在引导类中设置⾮池化模式
serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT);
//或通过系统参数设置
System.setProperty("io.netty.allocator.type", "pooled");
System.setProperty("io.netty.allocator.type", "unpooled");

5.5 ByteBuf的释放  

ByteBuf如果采⽤的是堆缓冲区模式的话,可以由GC回收,但是如果采⽤的是直接缓冲区,就不受GC的
管理,就得⼿动释放,否则会发⽣内存泄露。

5.5.1 ByteBuf的手动释放

1. 实现逻辑

⼿动释放,就是在使⽤完成后,调⽤ReferenceCountUtil.release(byteBuf); 进⾏释放。
通过release⽅法减去 byteBuf 的使⽤计数,Netty 会⾃动回收 byteBuf。

代码:

/**
* 获取客户端发来的数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf) msg;
    String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
    System.out.println("客户端发来数据:" + msgStr);
    //释放资源
    ReferenceCountUtil.release(byteBuf);
}

注意:⼿动释放可以达到⽬的,但是这种⽅式会⽐较繁琐,如果⼀旦忘记释放就可能会造成内存泄露。

5.5.1 ByteBuf的自动释放

⾃动释放有三种⽅式,分别是:⼊站的TailHandler、继承SimpleChannelInboundHandler、
HeadHandler的出站释放。

5.6 ByteBuf小结

a ⼊站处理流程中,如果对原消息不做处理,调⽤ ctx.fireChannelRead(msg) 把
  原消息往下传,由流⽔线最后⼀棒 TailHandler 完成⾃动释放。
  
b 如果截断了⼊站处理流⽔线,则可以继承 SimpleChannelInboundHandler ,
  完成⼊站ByteBuf ⾃动释放。
  
c 出站处理过程中,申请分配到的 ByteBuf,通过 HeadHandler 完成⾃动释放。

d ⼊站处理中,如果将原消息转化为新的消息并调⽤ ctx.fireChannelRead(newMsg)
  往下传,那必须把原消息release掉。
  
e ⼊站处理中,如果已经不再调⽤ ctx.fireChannelRead(msg) 传递任何消息,
  也没有继承SimpleChannelInboundHandler 完成⾃动释放,那更要把原消息
  release掉。

标签:&&,Netty,io,netty,System,ByteBuf,byteBuf,import,Reactor
来源: https://blog.csdn.net/Vermont_/article/details/118786494

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有