ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

一文读懂I/O模型与Reactor模式

2022-01-04 11:06:07  阅读:200  来源: 互联网

标签:读取数据 阻塞 读懂 线程 内核 IO 一文 fd Reactor


IO模型与Reactor总结

OS层面的 UNIX五种IO模型

从TCP发送数据的流程说起

要深入的理解各种IO模型,那么必须先了解下产生各种IO的原因是什么,要知道这其中的本质问题那么我们就必须要知一条消息是如何从过一个人发送到另外一个人的;

以两个应用程序通讯为例,我们来了解一下当“A”向"B" 发送一条消息,简单来说会经过如下流程:

第一步:应用A把消息发送到 TCP发送缓冲区。

第二步: TCP发送缓冲区再把消息发送出去,经过网络传递后,消息会发送到B服务器的TCP接收缓冲区。

**第三步:**B再从TCP接收缓冲区去读取属于自己的数据。

img

根据上图我们基本上了解消息发送要经过 应用A、应用A对应服务器的TCP发送缓冲区、经过网络传输后消息发送到了应用B对应服务器TCP接收缓冲区、然后最终B应用读取到消息。

如果理解了上面的消息发送流程,那么我们下面开始进入文章的主题;

阻塞IO、非阻塞IO(就是要读取数据的时候,如果此时数据没有准备好是直接告诉应用没数据,还是让应用进程一直等着)

我们把视角切换到上面图中的第三步, 也就是应用B从TCP缓冲区中读取数据

img

思考一个问题:

因为应用之间发送消息是间断性的,也就是说在上图中TCP缓冲区还没有接收到属于应用B该读取的消息时,那么此时应用B向TCP缓冲区发起读取申请,TCP接收缓冲区是应该马上告诉应用B 现在没有你的数据(非阻塞),还是说让应用B在这里等着,直到有数据再把数据交给应用B。(阻塞)

把这个问题应用到第一个步骤也是一样,应用A在向TCP发送缓冲区发送数据时,如果TCP发送缓冲区已经满了,那么是告诉应用A现在没空间了,还是让应用A等待着,等TCP发送缓冲区有空间了再把应用A的数据访拷贝到发送缓冲区。

什么是阻塞IO

如果上面的问题你已经思考过了,那么其实你已经明白了什么是阻塞IO了,所谓阻塞IO就是当应用B发起读取数据申请时,在内核数据没有准备好之前,应用B会一直处于等待数据状态,直到内核把数据准备好了交给应用B才结束

术语描述:在应用调用recvfrom读取数据时,其系统调用直到数据包到达且被复制到应用缓冲区中或者发送错误时才返回,在此期间一直会等待,进程从调用到返回这段时间内都是被阻塞的称为阻塞IO;

流程:

1.应用进程向内核发起recvfrom读取数据时。

2.准备数据报(应用进程阻塞)

3.将数据从内核空间复制到用户空间

4.复制完成后返回成功指示

img

什么是非阻塞IO

按照上面的思路,所谓非阻塞IO就是当应用B发起读取数据申请时,如果内核数据没有准备好会即刻告诉应用B,不会让B在这里等待。

术语描述:

非阻塞IO是在应用调用recvfrom读取数据时,如果该缓冲区没有数据的话,就会直接返回一个EWOULDBLOCK错误,不会让应用一直等待中。在没有数据的时候会即刻返回错误标识,那也意味着如果应用要读取数据就需要不断的调用recvfrom请求,直到读取到它数据要的数据为止。

流程:

1、应用进程向内核发起recvfrom读取数据。

2、没有数据报准备好,即刻返回EWOULDBLOCK错误码。

3、应用进程向内核发起recvfrom读取数据。

4、已有数据包准备好就进行一下 步骤,否则还是返回错误码。

5、将数据从内核拷贝到用户空间。

6、完成后,返回成功提示。

在这里插入图片描述

IO复用模型 (Java的 NIO(new io 业务线程还是阻塞的 非阻塞只是体现在读写数据利用缓冲区实现非阻塞)实际就是IO多路复用模型)

继续思考一个问题:

我们还是把视角放到应用B从TCP缓冲区中读取数据这个环节来。如果在并发的环境下,可能会N个人向应用B发送消息,这种情况下我们的应用就必须创建多个线程去读取数据,每个线程都会自己调用recvfrom 去读取数据。那么此时情况可能如下图:

img

如上图一样,并发情况下服务器很可能一瞬间会收到几十上百万的请求,这种情况下应用B就需要创建几十上百万的线程去读取数据,同时又因为应用线程是不知道什么时候会有数据读取,为了保证消息能及时读取到,那么这些线程自己必须不断的向内核发送recvfrom 请求来读取数据;

那么问题来了,这么多的线程不断调用recvfrom 请求数据,先不说服务器能不能扛得住这么多线程,就算扛得住那么很明显这种方式是不是太浪费资源了,线程是我们操作系统的宝贵资源,大量的线程用来去读取数据了,那么就意味着能做其它事情的线程就会少。

所以,有人就提出了一个思路,能不能提供一种方式,可以由一个线程监控多个网络请求我们后面将称为fd文件描述符,linux系统把所有网络请求以一个fd来标识),这样就可以只需要一个或几个线程就可以完成数据状态询问的操作,当有数据准备就绪之后再分配对应的线程去读取数据,这么做就可以节省出大量的线程资源出来,这个就是IO复用模型的思路

img

select既可以是阻塞也可以设置为非阻塞的

正如上图所示,IO复用模型的思路就是系统提供了一种函数可以同时监控多个fd的操作,这个函数就是我们常说到的select、poll、epoll函数,有了这个函数后,应用线程通过调用select函数就可以同时监控多个fd,select函数监控的fd中只要有任何一个数据状态准备就绪了,select函数就会返回可读状态,这时询问线程再去通知处理数据的线程,对应线程此时再发起recvfrom请求去读取数据。

**术语描述:**进程通过将一个或多个fd传递给select,阻塞在select操作上,select帮我们侦测多个fd是否准备就绪,当有fd准备就绪时,select返回数据可读状态,应用程序再调用recvfrom读取数据。

img

**总结:**复用IO的基本思路就是通过select或poll、epoll 来监控多fd ,来达到不必为每个fd创建一个对应的监控线程,从而减少线程资源创建的目的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gGqNQzWk-1641264520836)(E:\TyporaPic\image-20211210205757333.png)]

select本身可以设置 等待时间 如果设置为null 就是直接阻塞到有事件响应

int main(int argc, char **argv) {
    if (argc != 2) {
        error(1, 0, "usage: select01 <IPaddress>");
    }
    int socket_fd = tcp_client(argv[1], SERV_PORT);

    char recv_line[MAXLINE], send_line[MAXLINE];
    int n;

    fd_set readmask;
    fd_set allreads;
    FD_ZERO(&allreads);
    FD_SET(0, &allreads);
    FD_SET(socket_fd, &allreads);

    for (;;) {
        //重新置位allreads
        readmask = allreads;
        //select调用
        int rc = select(socket_fd + 1, &readmask, NULL, NULL, NULL);
		
        if (rc <= 0) {
            error(1, errno, "select failed");
        }
		//对这个向量进行检测,判断出对应套接字的元素 a[fd]是 0 还是 1。
        if (FD_ISSET(socket_fd, &readmask)) {
            n = read(socket_fd, recv_line, MAXLINE);
            if (n < 0) {
                error(1, errno, "read error");
            } else if (n == 0) {
                error(1, 0, "server terminated \n");
            }
            recv_line[n] = 0;
            fputs(recv_line, stdout);
            fputs("\n", stdout);
        }

        if (FD_ISSET(STDIN_FILENO, &readmask)) {
            if (fgets(send_line, MAXLINE, stdin) != NULL) {
                int i = strlen(send_line);
                if (send_line[i - 1] == '\n') {
                    send_line[i - 1] = 0;
                }

                printf("now sending %s\n", send_line);
                size_t rt = write(socket_fd, send_line, strlen(send_line));
                if (rt < 0) {
                    error(1, errno, "write failed ");
                }
                printf("send bytes: %zu \n", rt);
            }
        }
    }

}

信号驱动IO模型

复用IO模型解决了一个线程可以监控多个fd的问题,但是select是采用轮询的方式来监控多个fd的,通过不断的轮询fd的可读状态来知道是否就可读的数据而无脑的轮询就显得有点暴力,因为大部分情况下的轮询都是无效的,所以有人就想,能不能不要我总是去问你是否数据准备就绪,能不能我发出请求后等你数据准备好了就通知我,所以就衍生了信号驱动IO模型。

于是信号驱动IO不是用循环请求询问的方式去监控数据就绪状态,而是在调用sigaction时候建立一个SIGIO的信号联系,当内核数据准备好之后再通过SIGIO信号通知线程数据准备好后的可读状态,当线程收到可读状态的信号后,此时再向内核发起recvfrom读取数据的请求,因为信号驱动IO的模型下应用线程在发出信号监控后即可返回,不会阻塞,所以这样的方式下,一个应用线程也可以同时监控多个fd。

类似于下图描述:

img

术语描述:首先开启套接口信号驱动IO功能,并通过系统调用sigaction执行一个信号处理函数,此时请求即刻返回,当数据准备就绪时,就生成对应进程的SIGIO信号,通过信号回调通知应用线程调用recvfrom来读取数据。

img

总结: IO复用模型里面的select虽然可以监控多个fd了,但select其实现的本质上还是通过不断的轮询fd来监控数据状态, 因为大部分轮询请求其实都是无效的,所以信号驱动IO意在通过这种建立信号关联的方式,实现了发出请求后只需要等待数据就绪的通知即可,这样就可以避免大量无效的数据状态轮询操作。

对于 UDP 套接字,下列情况会产生信号:

  • 数据报到达套接字;
  • 套接字上发生异步错误;

对于 TCP 套接字,信号驱动式 I/O 近乎无用。

  • 太多情况都会产生信号,而我们又无法得知事件类型,因此这里就不再列举其产生信号的情况。

信号驱动 I/O 的缺点:

  • 信号的处理流程较为复杂;
  • 无法指定需要监控的事件类型。

异步IO

其实经过了上面两个模型的优化,我们的效率有了很大的提升,但是我们当然不会就这样满足了,有没有更好的办法,通过观察我们发现,不管是IO复用还是信号驱动,我们要读取一个数据总是要发起两阶段的请求,第一次发送select请求,询问数据状态是否准备好,第二次发送recevform请求读取数据。

思考一个问题:

也许你一开始就有一个疑问,为什么我们明明是想读取数据,什么非得要先发起一个select询问数据状态的请求,然后再发起真正的读取数据请求,能不能有一种一劳永逸的方式,我只要发送一个请求我告诉内核我要读取数据,然后我就什么都不管了,然后内核去帮我去完成剩下的所有事情?

当然既然你想得出来,那么就会有人做得到,有人设计了一种方案,应用只需要向内核发送一个read 请求,告诉内核它要读取数据后即刻返回;内核收到请求后会建立一个信号联系,当数据准备就绪,内核会主动把数据从内核复制到用户空间,等所有操作都完成之后,内核会发起一个通知告诉应用,我们称这种一劳永逸的模式为异步IO模型。

img

术语描述: 应用告知内核启动某个操作,并让内核在整个操作完成之后,通知应用,这种模型与信号驱动模型的主要区别在于,信号驱动IO只是由内核通知我们合适可以开始下一个IO操作,而异步IO模型是由内核通知我们操作什么时候完成。

img

**总结:**异步IO的优化思路是解决了应用程序需要先后发送询问请求、发送接收数据请求两个阶段的模式,在异步IO的模式下,只需要向内核发送一次请求就可以完成状态询问和数拷贝的所有操作。

异步IO模型采用的就是Proactor模式。

总结

最后,总结比较下五种IO模型:

img

IO分两阶段:

1.数据准备阶段
2.内核空间复制回用户进程缓冲区阶段

一般来讲:阻塞IO模型、非阻塞IO模型、IO复用模型(select/poll/epoll)、信号驱动IO模型都属于同步IO,因为阶段2是阻塞的(尽管时间很短)。只有异步IO模型是符合POSIX异步IO操作含义的,不管在阶段1还是阶段2都可以干别的事。

同步IO都是用户进程自己去内核空间 去拷贝数据 异步IO是内核帮你完成 数据拷贝完成 然后发一个信号告诉你 所有的事都做好了。

而同步IO在内核缓冲区没有数据的时候不同做法 又分为阻塞IO 非阻塞IO IO多路复用 信号驱动IO 阻塞IO是 没有数据自己阻塞 非阻塞IO是直接返回EWOLDBLOCK的错误,IO多路复用相当于一个线程监听多个socket 不用每个用户进程自己去阻塞监听内核缓冲区,信号驱动是直接有数据的时候来通知你。

以上是I/O模型是unix操作系统层面的,os提供了一些对应的API库。 高级语言在OS的I/O模型上又进行了封装,并且提供了更加简便的API。

Java语言层面的I/O

1.Java BIO

BIO就是 Blocking IO的简称,基于Linux的阻塞I/O模型进行抽象封装的,当一个Java线程在读入输入流或者写入输入流时,在读写动作完成之前,线程一直会被阻塞。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cmjtAWe1-1641264520838)(E:\TyporaPic\image-20211212132950332.png)]

/**
 1) 使用 BIO 模型编写一个服务器端,监听 6666 端口,当有客户端连接时,就启动一个线程与之通讯。
 2) 要求使用线程池机制改善,可以连接多个客户端.
 3) 服务器端可以接收客户端发送的数据(telnet 方式即可)
 * @author xsj
 * @create 2021-06-02 17:02
 */
public class BIOServer {
    //线程池机制
    //1. 创建一个线程池
    //2. 如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
    public static void main(String[] args) throws IOException {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //创建ServerSocket
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务器启动了");
        while(true){
            System.out.println(" 线 程 信 息 id =" + Thread.currentThread().getId() + " 名 字 =" +
                    Thread.currentThread().getName());
            //监听 等待客户端连接
            System.out.println("等待连接....");
            final Socket socket = serverSocket.accept();
            //连接到一个客户端就放到线程池创建一个线程,与之通讯(单独写一个方法)
            newCachedThreadPool.execute(()->{
                handler(socket);
            });
        }
    }
    public static void handler(Socket socket)  {
        try {
            System.out.println(" 线 程 信 息 id =" + Thread.currentThread().getId() + " 名 字 =" +
                    Thread.currentThread().getName());
            byte[] bytes=new byte[1024];
            //通过socket获取输入流
            InputStream is = socket.getInputStream();
            //循环的读取客户端发送的数据
            while(true){
                System.out.println(" 线 程 信 息 id =" + Thread.currentThread().getId() + " 名 字 =" +
                        Thread.currentThread().getName());
                System.out.println("read....");
                int read = is.read(bytes);
                if(read!=-1){
                    System.out.println(new String(bytes,0,read));
                }else{
                    break;
                }

            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            System.out.println("关闭和 client 的连接");
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Java BIO的问题

  1. 每个请求都需要创建独立的线程,与对应的客户端进行Read,业务处理,数据write

  2. 当并发量大的时候,需要创建大量线程来处理连接, 系统资源占用较大。

  3. 连接建立后, 如果当前线程暂时没有数据可读, 则线程就阻塞在 Read 操作上, 造成线程资源浪费

    int read = is.read(bytes);

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WgEo9ArJ-1641264520838)(E:\TyporaPic\image-20211212133739944.png)]

  4. 当请求量增大,线程数过高的时候,线程池开的核心线程数过大了,就会导致CPU频繁切换线程,他会导致系统CPU负载过高,并且服务端需要一个线程一直则色等待客户端的连接请求,在客户端的线程也会因为服务端还没有把请求结果返回而一直等待

2.Java NIO

  1. Java NIO 全称 java non-blocking IO, 是指 JDK 提供的新 API。 从 JDK1.4 开始, Java 提供了一系列改进的输入/输出的新特性, 被统称为 NIO(即 New IO), 是同步非阻塞的
  2. NIO 有三大核心部分: Channel(通道), Buffer(缓冲区), Selector(选择器)
  3. NIO 是 面向缓冲区 , 或者面向 块 编程的。 数据读取到一个它稍后处理的缓冲区, 需要时可在缓冲区中前后移动, 这就增加了处理过程中的灵活性, 使用它可以提供非阻塞式的高伸缩性网络
  4. Java NIO 的非阻塞模式, 使一个线程从某通道发送请求或者读取数据, 但是它仅能得到目前可用的数据, 如果目前没有数据可用时, 就什么都不会获取, 而不是保持线程阻塞, 所以直至数据变的可以读取之前, 该线程可以继续做其他的事情。 非阻塞写也是如此, 一个线程请求写入一些数据到某通道, 但不需要等待它完全写入,这个线程同时可以去做别的事情。
  5. 通俗理解: NIO 是可以做到用一个线程来处理多个操作的。 假设有 10000 个请求过来,根据实际情况, 可以分配
    50 或者 100 个线程来处理。 不像之前的阻塞 IO 那样, 非得分配 10000 个
package com.atguigu.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * @author xsj
 * @create 2021-06-05 23:00
 */
public class NIOServer {
    public static void main(String[] args) throws IOException {
        //创建ServerSocketChannel ->Serversocket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //得到一个 Selecor 对象
        Selector selector = Selector.open();
        //绑定一个端口6666,在服务端监听
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);

        //把serverSocketChannel注册到selector关心 事件为OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        //循环等待客户端连接
        while(true){
            //这里我们等待 1 秒,如果没有事件发生, 返回
            if(selector.select(1000) == 0) { //没有事件发生
                System.out.println("服务器等待了 1 秒,无连接");
                continue;
            }
            //如果返回的>0, 就获取到相关的 selectionKey 集合
            //1.如果返回的>0, 表示已经获取到关注的事件
            //2. selector.selectedKeys() 返回关注事件的集合
            // 通过 selectionKeys 反向获取通道
            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            //遍历 Set<SelectionKey>, 使用迭代器遍历
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()) {
                //获取到SelectionKey
                SelectionKey key = keyIterator.next();
                //根据 key 对应的通道发生的事件做相应处理
                if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接
                    //给该客户端生成一个SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println(" 客 户 端 连 接 成 功 生 成 了 一 个 socketChannel " +
                            socketChannel.hashCode());
                    //将socketchannel设置为非阻塞
                    socketChannel.configureBlocking(false);
                    //将socketchannel注册到selector 关心事件 为OP_READ 同时给 socketChannel
                    //关联一个 Buffer
                    socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));

                }
                if(key.isReadable()) { //发生 OP_READ
                    //通过key反向获取到对应的channel
                    SocketChannel channel = (SocketChannel)key.channel();
                    //获取到该channel关联到buffer
                    ByteBuffer buffer = (ByteBuffer)key.attachment();
                    //这个buffer是在socketchannel和 serversocketchannel之间的 不是客户端和socketchannel之间的
                    channel.read(buffer);
                    System.out.println("form 客户端 " + new String(buffer.array()));
                }
            }
            //手动从集合中移动当前的 selectionKey, 防止重复操作
            keyIterator.remove();
        }
    }
}

Java NIO中最核心的就是Selector,每当连接事件,接受连接事件,读事件,写事件中的一种事件就绪时,相关的事件处理器就会执行相应的逻辑,这种基于事件驱动的模式叫做Reactor模式

Reactor模式与Netty模型

Reactor模式

Reactor模式的核心思想就是

1.减少线程等待,具体来说就是Handler线程只

2.同时将建立连接 读写I/O调用以及 具体的编解码业务计算 这三块进行分离,让每个线程做尽量单一的职责,充分利用多核CPU的资源。(主从Reactor多线程模式)

在这里插入图片描述

而Netty的主从多Reactor模型 相当于扩充了 IO多路服用模型吧,相当于有多个select去监听 由主Reactor监听连接事件,而之后有连接操作有操作 都是subReactor去监听 然后真正的执行是有handler去执行

  1. Reactor 主线程 MainReactor 对象通过 select 监听连接事件, 收到事件后, 通过 Acceptor 处理连接事件 ,

  2. 当 Acceptor 处理连接事件后, MainReactor 将连接分配给 SubReactor

  3. subreactor 将连接加入到连接队列进行监听,并创建 handler 进行各种事件处理

  4. 当有新事件发生时, subreactor 就会调用对应的 handler 处理

  5. handler 通过 read 读取数据, 分发给后面的 worker 线程处理

  6. worker 线程池分配独立的 worker 线程进行业务处理, 并返回结果

  7. handler 收到响应的结果后, 再通过 send 将结果返回给 client

  8. handler 收到响应的结果后, 再通过 send 将结果返回给 client

MainReactor相当于bossgroup 而 subreactor相当于workergroup

在这里插入图片描述

  1. Netty抽象出两组线程池 BossGroup 专门负责接受客户端的连接,WorkerGroup监听各个客户端网络的读写请求
  2. BossGroup 和WorkerGroup类型都是NioEventLoopGroup
  3. NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop
  4. NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通讯
  5. NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoopGroup
  6. 每个 Boss NioEventLoop 循环执行的步骤有 3 步
    • 轮训accept事件
    • 处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个 worker NIOEventLoop上的selector上
    • 处理任务队列的任务 , 即 runAllTasks
  7. 每个 WorkerNIOEventLoop 循环执行的步骤
    • 轮询 read, write 事件
    • 处理 i/o 事件, 即 read , write 事件, 在对应 NioScocketChannel 处理
    • 处理任务队列的任务 , 即 runAllTasks
  8. 每个Worker NIOEventLoop 处理业务时, 会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器

标签:读取数据,阻塞,读懂,线程,内核,IO,一文,fd,Reactor
来源: https://blog.csdn.net/xsjzn/article/details/122298284

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有