ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Netty-组件(一)EventLoop

2021-12-28 19:34:34  阅读:169  来源: 互联网

标签:Netty group EventLoop 线程 事件 组件 new channel


EventLoop

事件循环对象&事件循环组

EventLoop 本质上是一个单线程执行器(同时维护了一个selector),里面的run方法处理channel 上源源不断的IO事件,也就是集成NIO中负责分配任务和处理accept请求的boss线程

它的继承关系比较复杂:

  • 一条线是继承自j.u.c.ScheduledExecutorService 可以用于执行定时任务,同时也包含了线程池中的所有方法
  • 另一条线是继承自netty自己的OrderedEventExecutor,有序的执行任务
    • 提供了boolean isEventLoop(Thread thread)判断一个线程是否属于此EventLoop
    • 提供了parent 方法看看自己属于哪个EventLoopGroup

事件循环组:

EventLoopGroup 就是一组EventLoop,channel 一般会调用EventLoopGroup 的register 方法来绑定其中的一个EventLoop,后续这个Channel 上的IO事件都由此EventLoop 来处理(保证了IO事件处理时的线程安全)

处理普通事件

执行普通任务的意义在于:

  • 异步处理
  • 事件分发的时候可以通过这种方式,将接下来一段代码的执行从一个线程传递给另一个线程
@Slf4j
public class TestEventLoop {

    public static void main(String[] args) {

        /**
         * 1、创建事件循环组
         *  NioEventLoopGroup 可以用于处理IO事件,定时事件,普通任务
         *  DefaultEventLoopGroup 只能处理定时事件和普通任务,并不能处理IO事件
         *
         *  一个事件循环是一个线程来维护的
         *  这里传入的参数是创建线程的个数,如果没有传入参数则线程默认数为1
         */
        EventLoopGroup group = new NioEventLoopGroup(2);

        /*2、group.next
        * 循环获取下一个事件循环对象,这样就保证负载均衡
        * 依次将不同的channel 注册到不同eventLoop 上
        * 就类似于NIO对于两个worker 的处理
        * */
        group.next().submit(() -> {
            /**
             * 将事件提交给某个事件循环对象去处理(交给一个线程去处理)
             * 实现一个异步处理
             */
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("ok");
        });

        log.debug("main");
    }
}

image-20211110152447645

  • group.next
    • 获取事件循环对象
  • submit 向事件循环对象提交一个事件,一个事件循环对象通过一个线程来专门进行维护
    • 也可以使用execute

处理定时任务

使用意义:

  • 实现keep-live 的时候可以作为连接的保护而使用
  • 通过另一条线程规律执行某个线程
        /**
         * 启动一个定时任务,并以一定的频率来执行
         * 1、参数一是一个Runnable 接口类型的任务对象
         * 2、参数二是初始延时事件
         * 3、参数三是间隔时间
         * 4、执行单位
         */
        group.next().scheduleAtFixedRate(()->{
            log.info("ok");
        },0,1, TimeUnit.SECONDS);

image-20211110153213782

处理IO事件

@Slf4j
public class EventLoopServer {

    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                /*初始化器*/
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    /**
                     * 这个方法会在connection 建立后调用
                     * @param nioSocketChannel 泛型对象
                     * @throws Exception
                     */
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {

                        /*向流水线中添加处理,这里添加一个自定义的处理器*/
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            /*这里表示关注的是读操作
                            没有StringDecoder的时候
                            Object msg 是ByteBuf类型的数据*/
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

image-20211111135551286

当服务器和客户端两个channel 建立连接之后,那么服务器会用同一个EventLoop 或者说同一个线程来处理这个客户端,建立一个绑定关系

image-20211111135947864

但是并不是说有多少个channel 就会建立多少了eventLoop,eventLoop的数量是既定的,所以会将channel 循环的分给eventLoop,以达到负载均衡的效果,其次就是上面说的每一个eventLoop 对一个channel 都会负责到底

分工细化一

  • 创建两个eventLoopGroup 分别管理Boss 和Worker
    • BossEventLoop 中只会有一个线程因为它唯一去处理ServerSocketChannel,而只会有一个ServerSocketChannel

将eventLoop 的职责划分可以划的更细一些,划分为boss 和worker

.channel 会将ServerSocketChannel 绑定到第一个EventLoop上

.childHandler 会将SocketChannel 绑定到参数二的EventLoopGroup 中的EventLoop 上,初始化器针对的是已经绑定后的SocketChannel

        new ServerBootstrap()
                /**
                 * 划分为boss 和 worker
                 * 第一个boss 只负责ServerSocketChannel accept 事件
                 * 第二个worker 只负责socketChannel 上的读写事件 线程数默认是CPU核心数*2
                 * 这里并不需要将第一个EventLoopGroup 的线程数设为一
                 * 因为这里的serverSocketChannel默认只会和第一个EventLoopGroup 中的一个进行绑定
                 * 而并没有多个ServerSocketChannel
                 * 也只会占用一个线程
                 */
                .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)

分工细化二

  • 再创建一个EventLoopGroup 去处理耗时较长的操作,而不是用NioEventLoop (worker Thread) 去处理耗时较长的操作
    • 避免繁杂逻辑操作影响io线程
    • 当需要将两个自定义的handler 建立连接时需要在第一个handler 中调用context.fireChannelRead(msg) 将数据传递给下一个handler,这样才能将两个handler 串起来
    • 这里在第一个handler 可以实现条件判断看看是不是要走第二个handler,调用了那句代码才会走到对应的handler 中
    • 用指定的group 处理特殊情况的操作,两个用的线程是不同的,因为eventLoop 中线程的数量是有限的,而worker 线程要服务很多channel ,所以如果一个线程做比较消耗资源的操作就会降低其他channel 对资源的利用率,所以这里才会创建一个新的group 其实就是新的线程去处理对应的特殊操作
    • eventLoop 就是一个线程且带着一个selector
  public static void main(String[] args) {
        /*创建处理特殊事件的Group*/
        EventLoopGroup group = new DefaultEventLoop();
        new ServerBootstrap()
                .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(StandardCharsets.UTF_8));
                                /*将消息传递给下一个handler*/
                                ctx.fireChannelRead(msg);
                            }
                        }).addLast(group,"handler2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                })
                .bind(8080);
    }

切换线程

在流水线执行过程中,handler 执行是如何切换EventLoop的,例如:如何从NioEventLoop 切换到DefaultEventLoop 上执行的(换人)

image-20211111144854012

如果两个handler 绑定的是同一个线程(EventLoop)就可以直接调用,如果绑定的是不同线程,而是会将要调用的代码封装为一个Runnable 对象中然后传递给下一个handler的线程去处理

标签:Netty,group,EventLoop,线程,事件,组件,new,channel
来源: https://blog.csdn.net/weixin_46248230/article/details/122200657

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有