标签:netty return void ctx private 源码 msg 解析 final
文章目录
DefaultChannelPipeline
implements ChannelPipeline
- 构造方法,维护头尾节点,头尾节点组成双向链表。ChannelHandler封装成ChannelHandlerContext,再有ChannelHandlerContext组成链表的元素。
protected DefaultChannelPipeline(Channel channel) {
// ...
// 头尾节点
tail = new TailContext(this);
head = new HeadContext(this);
// 双向链表
head.next = tail;
tail.prev = head;
}
- 往尾部添加handler,头尾节点固定,新增的handler的下一个节点为尾节点
private void addLast0(AbstractChannelHandlerContext newCtx) {
// 在链表添加一个handler,即在head和tail之间构建双向链表
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
- 写操作,很明显,从尾节点开始往前面的节点写数据
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
AbstractChannelHandlerContext
implements ChannelHandlerContext
- 向通道写数据
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 如果是同一个线程,那么执行下一个节点的invokeChannelRead方法
next.invokeChannelRead(m);
} else {
// 如果不是同一线程,那么投放到队列去执行
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 找到下一个InboundHandler(从head到tail的顺序找)
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
- 找到下一个OutboundHandler(从tail到head的顺序)
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
HeadContext
extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler,它是一个context,同时是一个出入站handler,说明不管出入站,都会流经。
- HeadContext是入站的第一个handler,负责传递入站消息到下一个ChannelInboundHandler
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
- HeadContext是出站的最后一个handler,直接往网络发送
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
}
TailContext
extends AbstractChannelHandlerContext implements ChannelInboundHandler,它是一个context,同时是一个入站handler,也是最后一个入站handler
- TailContext可用来做资源的释放,假如前面的handler不释放资源,那么它必须把释放资源的操作交给后面的handler,最后由TailContext处理
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
ByteToMessageDecoder
extends ChannelInboundHandlerAdapter
- 读操作
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 如果消息是一个ByteBuf,那么执行解码操作
// 创建一个存放解码数据的list集合
CodecOutputList out = CodecOutputList.newInstance();
try {
// cumulation也是一个ByteBuf,表示累计的消息
first = cumulation == null;
// 如果cumulation为空,则无需累加,初始化为一个空的buffer。否则,说明cumulation有数据,需要累加数据
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
callDecode(ctx, cumulation, out); // 解码操作
}
// ...
} else {
// 如果消息不是ByteBuf,则传递给下一个handler
ctx.fireChannelRead(msg);
}
}
- 解码逻辑
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
// 读取ByteBuf直到没数据
int outSize = out.size();
if (outSize > 0) {
// 如果有解码结果(List<Object>不为空),那么传递给下一个Handler
fireChannelRead(ctx, out, outSize);
// 清空List<Object>
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 调用该方法进行解码
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
- 解码逻辑
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out); // 调用decode方法,这个方法需要我们实现
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}
MessageToByteEncoder
extends ChannelOutboundHandlerAdapter
- 写操作
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
// 如果类型正确,强转
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf); // 调用encode方法,需要我们实现
} finally {
// 结束,需要释放,为什么需要释放?因为Message转到ByteBuf了,说明已经是出站了,msg没用了,所以需要释放msg了。
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
}
// ...
}
- 判断是否是我们想要的类型
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
ChannelFuture
extends Future
状态:完成(可能成功、可能失败、也可能被取消),未完成
- 添加监听器
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
- 监听器
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}
一般不推荐直接使用future.get,建议使用监听器,完成后通知执行operationComplete(F future)方法,在这个方法里面去获取值
DefaultPromise
extends AbstractFuture implements Promise
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
// 使用原子类保障安全
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
// 执行成功后,唤醒监听器
notifyListeners();
}
return true;
}
return false;
}
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
- 调用监听器的完成方法
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
AbstractNioChannel
extends AbstractChannel
- 封装了nio的一些东东
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey; // nio的selectionkey
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
- 构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp; // 设置通道关注事件类型为读
try {
ch.configureBlocking(false); // 设置为非阻塞
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
- 注册逻辑
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 0表示所有事件都不感兴趣
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true; // 注册成功
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
- 开始读
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 大爷表示我对读事件很感兴趣 AbstractNioByteChannel()构造方法可以看下,会调用本类的构造方法,把read事件传递过去
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 如果当前不是读事件,那么设置成读事件,位运算
selectionKey.interestOps(interestOps | readInterestOp);
}
}
AbstractNioByteChannel
- 写方法,为什么需要对写的循环次数做限制,因为担心写线程假死
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount(); // 循环写的次数,缺省16次
do {
Object msg = in.current();
if (msg == null) {
// 为空,说明写完了,那么本大爷就对写事件不感兴趣了
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
writeSpinCount -= doWriteInternal(in, msg); // 每次写后,次数-1
} while (writeSpinCount > 0); // 最多循环16次
incompleteWrite(writeSpinCount < 0); // 超过16次没有写完的数据,再来写
}
- 取消写事件
protected final void clearOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
}
- 写逻辑
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 如果是bytebuf类型
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
in.remove();
return 0; // 返回0,说明不扣减,
}
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
// 如果是文件传输类型
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
标签:netty,return,void,ctx,private,源码,msg,解析,final 来源: https://blog.csdn.net/cblstc/article/details/110386431
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。