ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

netty源码分析之AbstractNioByteChannel.NioByteUnsafe.read()

2022-08-19 19:02:50  阅读:142  来源: 互联网

标签:netty 读取 read ctx 源码 ByteBuf cumulation public out


 1      @Override
 2         public final void read() {
 3             final ChannelConfig config = config();
 4             if (shouldBreakReadReady(config)) {
 5                 clearReadPending();
 6                 return;
 7             }
 8             final ChannelPipeline pipeline = pipeline();
 9             final ByteBufAllocator allocator = config.getAllocator();
10             //获取自适应缓冲区分配器对象(第一次进来才会创建)
11             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
12             //重置分配器对象    
13             allocHandle.reset(config);
14 
15             ByteBuf byteBuf = null;
16             boolean close = false;
17             try {
18                 do {
19                     //通过分配器分配默认大小为1024的ByteBuf(其大小会自适应调整,具体变化规则下面解析)
20                     byteBuf = allocHandle.allocate(allocator);
21                     //1.doReadBytes(byteBuf):先根据byteBuf大小来设置attemptedBytesRead属性(在while判断中会用到此属性),然后byteBuf尝试读取最大为attemptedBytesRead的数据
22                     //2.allocHandle.lastBytesRead():记录lastBytesRead和totalBytesRead,这里可能会触发一次自适应调整
23                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
24                     if (allocHandle.lastBytesRead() <= 0) {
25                         // nothing was read. release the buffer.
26                         byteBuf.release();
27                         byteBuf = null;
28                         close = allocHandle.lastBytesRead() < 0;
29                         if (close) {
30                             // There is nothing left to read as we received an EOF.
31                             readPending = false;
32                         }
33                         break;
34                     }
35 
36                     //增加已经读取消息的次数
37                     allocHandle.incMessagesRead(1);
38                     readPending = false;
39                     //将已经读取到的数据抛给处理器链的channelRead处理(正常业务逻辑在这里处理)
40                     pipeline.fireChannelRead(byteBuf);
41                     byteBuf = null;
42                 } while (allocHandle.continueReading()); //判断是否需要继续读取数据
43 
44                 //触发一次自适应调整
45                 allocHandle.readComplete();
46                 //读取完成后触发处理器链的channelReadComplete
47                 pipeline.fireChannelReadComplete();
48 
49                 if (close) {
50                     closeOnRead(pipeline);
51                 }
52             } catch (Throwable t) {
53                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
54             } finally {
55                 // Check if there is a readPending which was not processed yet.
56                 // This could be for two reasons:
57                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
58                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
59                 //
60                 // See https://github.com/netty/netty/issues/2254
61                 if (!readPending && !config.isAutoRead()) {
62                     removeReadOp();
63                 }
64             }
65         }    

 

public RecvByteBufAllocator.Handle recvBufAllocHandle() {
            //为空创建
            if (recvHandle == null) {
                //这里创建的是AdaptiveRecvByteBufAllocator.HandleImpl实例
                recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            return recvHandle;
        }

 

public void reset(ChannelConfig config) {
            this.config = config;
            //默认16
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }

 

public ByteBuf allocate(ByteBufAllocator alloc) {
            //创建一个由AdaptiveRecvByteBufAllocator.HandleImpl推测的容量大小的ByteBuf
            return alloc.ioBuffer(guess());
        }

public int guess() {
            //这里返回的是AdaptiveRecvByteBufAllocator.HandleImpl里推测容量,自适应调整变化的就是此值的大小
            return nextReceiveBufferSize;
        }

public ByteBuf ioBuffer(int initialCapacity) {
        if (PlatformDependent.hasUnsafe()) {
            //分配直接内存(堆外内存)
            return directBuffer(initialCapacity);
        }
        return heapBuffer(initialCapacity);
    }

 

 1     protected int doReadBytes(ByteBuf byteBuf) throws Exception {
 2         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
 3         //设置attemptedBytesRead属性,大小为byteBuf的可写大小
 4         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
 5         //尝试读取最大为attemptedBytesRead的数据
 6         return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
 7     }
 8 
 9       AdaptiveRecvByteBufAllocator.HandleImpl
10       public void lastBytesRead(int bytes) {
11             // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
12             // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
13             // the selector to check for more data. Going back to the selector can add significant latency for large
14             // data transfers.
15             //判断当前读取到的数据与推测大小是否一致,一致的话进行一次扩容处理
16             if (bytes == attemptedBytesRead()) {
17                 record(bytes);
18             }
19             //调用父类的lastBytesRead方法
20             super.lastBytesRead(bytes);
21         }
22 
23         private void record(int actualReadBytes) {
24             //SIZE_TABLE里保存着有序的递增的16-1073741824(到512前,每次递增16;512后每次*2)
25             //INDEX_DECREMENT 默认 1, INDEX_INCREMENT 默认 4
26             //真实读取到数据的大小小于等于SIZE_TABLE索引前两位的值大小时,第一次触发不会缩容,第二次触发会缩容为SIZE_TABLE索引前一位的值(若index - 1 < minIndex,index值为minIndex)
27             if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
28                 if (decreaseNow) {
29                     index = max(index - INDEX_DECREMENT, minIndex);
30                     nextReceiveBufferSize = SIZE_TABLE[index];
31                     decreaseNow = false;
32                 } else {
33                     //第一次触发不会缩容
34                     decreaseNow = true;
35                 }
36             } else if (actualReadBytes >= nextReceiveBufferSize) { //真实读取到数据大于等于推测大小时,直接扩容为SIZE_TABLE索引后四位的值(若index + 4 > maxIndex,index值为maxIndex)
37                 index = min(index + INDEX_INCREMENT, maxIndex);
38                 nextReceiveBufferSize = SIZE_TABLE[index];
39                 decreaseNow = false;
40             }
41         }
42 
43        DefaultMaxMessagesRecvByteBufAllocator
44        public void lastBytesRead(int bytes) {
45             //将读取到的数据赋值给lastBytesRead
46             lastBytesRead = bytes;
47             if (bytes > 0) {
48                 //将读取到的数据叠加到totalBytesRead 
49                 totalBytesRead += bytes;
50             }
51         }

 

        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        }

    DefaultChannelConfig
    public boolean isAutoRead() {
        //autoRead默认为1,因此该判断为true
        return autoRead == 1;
    }

            DefaultMaxMessagesRecvByteBufAllocator
            public boolean get() {
                //当本次读操作读取到的字节数与AdaptiveRecvByteBufAllocator推测出的ByteBuf容量大小不一样时,就会返回false;否则返回true。如果本次读操作可读取的字节大于了attemptedBytesRead的话,一次读操作也只会先读取attemptedBytesRead的字节数
                return attemptedBytesRead == lastBytesRead;
            }

    //totalMessages < maxMessagePerRead:根据上面的流程我们可以知道,maxMessagePerRead为16,totalMessages为读循环已经执行的读操作次数(即,循环次数)。

    //totalBytesRead > 0:当本次读操作有读取到字节数时,或者以读取到的字节数小于Integer.MAX_VALUE,那么该判断都会大于0,即,为true;否则为false。

 

        public void readComplete() {
            //读结束后,触发一次自适应调整
            record(totalBytesRead());
        }

        protected final int totalBytesRead() {
            //本次已读取数据总和
            return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
        }

 

扩展:ByteToMessageDecoder.channelRead()

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                //判断累加区是否为空
                first = cumulation == null;
                if (first) {
                    //为空,直接将字节容器的指针指向新读取的数据
                    cumulation = data;
                } else {
                    //不为空,调用累加器累加数据。可能会触发一次扩容
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                //数据传递给业务
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                //没有累加区中有数据可读时,清空
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                //如果连续16次(discardAfterReads的默认值),累加区中仍然有未被业务拆包器读取的数据,那就做一次压缩,有效数据段整体移到容器首部(粘包后拆包时会存在此情况)
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    //压缩,即丢弃已读数据
                    discardSomeReadBytes();
                }

                //传递业务数据包给业务解码器处理(拆包时,在callDecode()里就会交给业务解码器处理了,这边通常是处理的是最后一次拆包,即数据只有一个完整包时,直接走这边)
                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

 

private Cumulator cumulator = MERGE_CUMULATOR;

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            try {
                final ByteBuf buffer;
                //容器不够本次追加
                if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                    // Expand cumulation (by replace it) when either there is not more room in the buffer
                    // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                    // duplicate().retain() or if its read-only.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/2327
                    // - https://github.com/netty/netty/issues/1764
                    //追加扩容
                    buffer = expandCumulation(alloc, cumulation, in.readableBytes());
                } else {
                    buffer = cumulation;
                }
                //将新数据累加到字节容器中
                buffer.writeBytes(in);
                return buffer;
            } finally {
                // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
                // for whatever release (for example because of OutOfMemoryError)
                in.release();
            }
        }
    };

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
        ByteBuf oldCumulation = cumulation;
        //扩容也是一个内存拷贝操作,新增的大小即是新读取数据的大小
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        cumulation.writeBytes(oldCumulation);
        oldCumulation.release();
        return cumulation;
    }

 

/**
     * Called once data should be decoded from the given {@link ByteBuf}. This method will call
     * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added
     */
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                //若有已经拆出来的数据包,交给后面的处理器链处理
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                //记录字节容器中有多少字节
                int oldInputLength = in.readableBytes();
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    //拆包器未读取任何数据
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                //拆包器未读取任何数据,已经解到了数据包
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            //待实现的业务处理逻辑
            decode(ctx, in, out);
        } finally {
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                handlerRemoved(ctx);
            }
        }
    }

 

标签:netty,读取,read,ctx,源码,ByteBuf,cumulation,public,out
来源: https://www.cnblogs.com/gumanlou/p/16603043.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有