ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

Linux IO多路复用

2022-09-09 00:34:48  阅读:223  来源: 互联网

标签:多路复用 epoll int 描述符 fds fd IO Linux include


https://segmentfault.com/a/1190000003063859

\ select poll epoll
操作方式 遍历 遍历 回调
底层实现 数组 链表 哈希表
IO效率 每次调用都进行线性遍历,时间复杂度为O(n) 每次调用都进行线性遍历,时间复杂度为O(n) 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度O(1)
最大连接数 1024(x86)或 2048(x64) 无上限 无上限
fd拷贝 每次调用select,都需要把fd集合从用户态拷贝到内核态 每次调用poll,都需要把fd集合从用户态拷贝到内核态 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝

select(pselect)

select 直接操纵多个文件描述符的集合 fd_set

流程:

  1. 创建文件描述符的集合 fd_set
  2. 将监听的socket和客户端socket加入 fd_set
  3. select()
  4. 用 FD_ISSET 判断哪个 fd 有事件
    1. 监听的socket有事件,表示有新客户端连接请求
    2. 客户端socket有事件,有数据或连接断开

select 只有“水平触发”模式,如果报告了fd后事件没有被处理或数据没有被全部读取,那么下次select时会再次报告该fd

select函数的缺点

  1. bitmap默认大小为1024,虽然可以调整但还是有限度的
  2. 需要遍历所有描述符
  3. rset每次循环都必须重新置位为0,不可重复使用
  4. 将rset从用户态拷贝到内核态,由内核态直接判断文件描述符是否有数据的操作,这样比直接用户态判断要快。
    尽管将rset从用户态拷贝到内核态并由内核态判断是否有数据,但还是有拷贝的开销
#include <sys/types.h>
#include <sys/time.h>

// 初始化空集合
void FD_ZERO(fd_set *fdset);

// 从集合中清除fd
void FD_CLR(int fd, fd_set *fdset);

// 添加fd到集合
void FD_SET(int fd, fd_set *fdset);

// 判断是否在set中,在 非零值,不在 零
int FD_ISSET(int fd, fd_set *fdset);

// 超时值
struct timeval {
    time_t tv_sec;  // seconds
    long tv_usec;   // microseconds
}

/*
用于测试fd_set中是否由fd处于可读或可写或错误状态
当__readfds中有可读fd,__writefds中有科协fd,__exceptfds中有错误fd
成功返回状态发生变化的fd总数,失败返回-1并设置errno
*/
extern int select (int __nfds,           // 需要测试的fd数目
                   fd_set * __readfds,   
                   fd_set * __writefds,
                   fd_set * __exceptfds,
                   struct timeval * __timeout); // Linux在退出时会将此选项清空,故每次进入select前重新设置此选项
select 示例代码
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>

int main(int argc, char const *argv[])
{
    int server_sockfd, client_sockfd;
    int server_len, client_len;
    struct sockaddr_in server_address;
    struct sockaddr_in client_address;
    int result;
    fd_set readfds, testfds;

    server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(9734);
    server_len = sizeof(server_address);

    bind(server_sockfd, (struct sockaddr *)&server_address, server_len);

    listen(server_sockfd, 5);

    FD_ZERO(&readfds);
    FD_SET(server_sockfd, &readfds);

    while (1) {
        char ch;
        int fd;
        int nread;

        testfds = readfds;

        printf("server waiting\n");
        result = select(FD_SETSIZE, &testfds, NULL, NULL, 0);
        if (result < 1) {
            perror("select failed");
            exit(1);
        }

        for (fd = 0; fd < FD_SETSIZE; fd++) {
            if (FD_ISSET(fd, &testfds)) {
                if (fd == server_sockfd) {
                    client_len = sizeof(client_address);
                    client_sockfd = accept(server_sockfd, 
                                           (struct sockaddr*) &client_address, 
                                           &client_len);
                    FD_SET(client_sockfd, &readfds);
                    printf("adding client on fd %d\n", client_sockfd);
                } else {
                    ioctl(fd, FIONREAD, &nread);

                    if (nread == 0) {
                        close(fd);
                        FD_CLR(fd, &readfds);
                        printf("removing client on fd %d\n", fd);
                    } else {
                        read(fd, &ch, 1);
                        printf("serving client on fd %d\n", fd);
                        write(fd, &ch, 1);
                    }
                }
            }
        }
    }
    return 0;
}

poll(ppoll)

与select没有本质差别,管理多个文件描述符也是使用轮询,根据描述符的状态进行处理,但是poll没有最大文件描述符数量限制。

select采用了bitmap,poll采用了数组。

poll与select相同的缺点:文件描述符的数组或位图被整体复制于用户态和内核态之间,不论这些文件描述符是否有事件,它的开销随着文件描述符的增加而线性增加。二者在返回后都需要遍历整个描述符的数组。

内核将用户的fds结构体数组拷贝到内核中,当有事件发生时内核再将所有时间都返回到fds数组中,

struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};

int poll(struct poll_fd *fds,  // 数组指针
         nfds_t  nfds,         // 数组大小
         int timeout);         // 超时时间

events 和 revents 的取值:

poll 使用并不方便,代码比select和epoll都复杂,但性能不如epoll

poll 代码
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#define NFDS 100  // fds数组的大小

// 创建一个用于监听的socket
int CreateSocket() {
  int listenfd = socket(AF_INET, SOCK_STREAM, 0);
  assert(-1 != listenfd);

  struct sockaddr_in ser;
  memset(&ser, 0, sizeof(ser));
  ser.sin_family = AF_INET;
  ser.sin_port = htons(6000);
  ser.sin_addr.s_addr = inet_addr("127.0.0.1");

  int res = bind(listenfd, (struct sockaddr *)&ser, sizeof(ser));
  assert(-1 != res);

  listen(listenfd, 5);

  return listenfd;
}

// 初始化fds结构体数组
void InitFds(struct pollfd *fds) {
  int i = 0;
  for (; i < NFDS; ++i) {
    fds[i].fd = -1;
    fds[i].events = 0;
    fds[i].revents = 0;
  }
}

// 向fds结构体数组中插入一个文件描述符
void InsertFd(
    struct pollfd *fds, int fd,
    int flag)  //此处flag是为了判断是文件描述符c,还是listenfd,来设置events
{
  int i = 0;
  for (; i < NFDS; ++i) {
    if (fds[i].fd == -1) {
      fds[i].fd = fd;
      fds[i].events |= POLLIN;
      if (flag) {
        fds[i].events |= POLLRDHUP;
      }

      break;
    }
  }
}

// 从fds结构体数组中删除一个文件描述符
void DeleteFd(struct pollfd *fds, int fd) {
  int i = 0;
  for (; i < NFDS; ++i) {
    if (fds[i].fd == fd) {
      fds[i].fd = -1;
      fds[i].events = 0;
      break;
    }
  }
}

// 获取一个已完成三次握手的连接
void GetClientLink(int fd, struct pollfd *fds) {
  struct sockaddr_in cli;
  socklen_t len = sizeof(cli);
  int c = accept(fd, (struct sockaddr *)&cli, &len);
  assert(c != -1);

  printf("one client link success\n");

  InsertFd(fds, c, 1);
}

// 断开一个用户连接
void UnlinkClient(int fd, struct pollfd *fds) {
  close(fd);
  DeleteFd(fds, fd);
  printf("one client unlink\n");
}

// 处理客户端发送来的数据
void DealClientData(int fd, struct pollfd *fds) {
  char buff[128] = {0};

  int n = recv(fd, buff, 127, 0);
  if (n <= 0) {
    UnlinkClient(fd, fds);
    return;
  }

  printf("%s\n", buff);

  send(fd, "ok", 2, 0);
}

// poll返回后,处理就绪的文件描述符
void DealFinishFd(struct pollfd *fds, int listenfd) {
  int i = 0;
  for (; i < NFDS; ++i) {
    if (fds[i].fd == -1) {
      continue;
    }

    int fd = fds[i].fd;
    if (fd == listenfd && fds[i].revents & POLLIN) {
      GetClientLink(fd, fds);
      //获取连接
    } else if (fds[i].revents & POLLRDHUP) {
      UnlinkClient(fd, fds);
      //断开连接
    } else if (fds[i].revents & POLLIN) {
      DealClientData(fd, fds);
      //处理客户端数据
    }
  }
}

int main() {
  int listenfd = CreateSocket();

  struct pollfd *fds = (struct pollfd *)malloc(sizeof(struct pollfd) * NFDS);
  // malloc一个fds结构体数组
  assert(NULL != fds);

  InitFds(fds);
  //初始化fds结构体数组

  InsertFd(fds, listenfd, 0);
  //插入文件描述符listenfd

  while (1) {
    int n = poll(fds, NFDS, -1);
    if (n <= 0) {
      printf("poll error\n");
      continue;
    }

    DealFinishFd(fds, listenfd);
    //处理就绪的文件描述符
  }

  free(fds);
}

优点:

  1. poll() 不要求开发者计算最大文件描述符加一的大小。
  2. poll() 在应付大数目的文件描述符的时候速度更快,相比于select。
  3. 它没有最大连接数的限制,原因是它是基于链表来存储的。
  4. 在调用函数时,只需要对参数进行一次设置就好了

缺点:

  1. 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义(epoll可以解决此问题)
  2. 与select一样,poll返回后,需要轮询pollfd来获取就绪的描述符,这样会使性能下降
  3. 同时连接的大量客户端在一时刻可能只有很少的就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降

epoll(epoll_pwait)

解决了fd_set拷贝轮询的问题。内核每次返回的都是已就绪的文件描述符。

  1. 创建epoll句柄,它本身就是一个fd,需要关闭
    int epoll_create(int size); // 返回一个文件描述符,参数在新版本中被忽略,但是要给一个大于0的数
  2. 注册需要监视的fd和事件
    int epoll_ctl(int epfd,
                  int op,   // 选项为3个宏:添加 EPOLL_CTL_ADD,删除 EPOLL_CTL_DEL,修改 EPOLL_CTL_MOD
                  int fd,   // 需要监听的fd(文件描述符)
                  struct epoll_event *event  // 需要监听什么事
                  );
    
    struct epoll_event {
      __uint32_t events;  /* Epoll events */
      epoll_data_t data;  /* User data variable */
    };
    
    //events可以是以下几个宏的集合:
    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    EPOLLOUT:表示对应的文件描述符可以写;
    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    EPOLLERR:表示对应的文件描述符发生错误;
    EPOLLHUP:表示对应的文件描述符被挂断;
    EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
    EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
    
    typedef union epoll_data {
        void *ptr;
        int fd;
        _uint32_t u32;
        _uint64_t u64;
    }epoll_data_t;
  3. 等待事件发生
    // 返回就绪事件的个数,失败-1,超时0
    int epoll_wait(int epfd,
                   struct epoll_event * events, // 用于接收内核返回的就绪事件的数组
                   int maxevents, // 一次最多能处理的事件个数
                   int timeout    // 超时时间,为0则立即返回,-1永不超时
    );
epoll_LT 示例模式
 #include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>

#include <iostream>

const int max_events = 128;

int server_socket;
int epoll_fd;

void sig_handler(int signo) {
  close(server_socket);
  close(epoll_fd);
  std::cout << "recv SIGTERM, exit process." << std::endl;
  exit(EXIT_SUCCESS);
}

int main(int argc, char const *argv[]) {
  struct sigaction term_action;
  sigset_t all_sig;
  sigfillset(&all_sig);
  term_action.sa_mask = all_sig;
  term_action.sa_handler = sig_handler;
  sigaction(SIGTERM, &term_action, nullptr);

  server_socket = socket(AF_INET, SOCK_STREAM, 0);

  sockaddr_in server_addr;
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(8080);
  server_addr.sin_addr.s_addr = inet_addr("0.0.0.0");

  bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr));
  listen(server_socket, 128);

  epoll_fd = epoll_create(1);
  epoll_event server_socket_event;
  server_socket_event.events = EPOLLIN;
  server_socket_event.data.fd = server_socket;
  epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_socket, &server_socket_event);

  epoll_event events[max_events];
  while (true) {

    int n = epoll_wait(epoll_fd, events, max_events, -1);
    for (int i = 0; i < n; ++i) {
      if (events[i].data.fd == server_socket) {
        int client_socket = accept(server_socket, nullptr, nullptr);
        std::cout << "accept new client: " << client_socket << std::endl;
        epoll_event client_socket_event;
        client_socket_event.events = EPOLLIN;
        client_socket_event.data.fd = client_socket;
        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_socket, &client_socket_event);

      } else if (events[i].events & EPOLLRDHUP) {
        // 理论上 EPOLLRDHUP 信号是对方挂断后发出,但实际上可能没有这个信号
        std::cout << events[i].data.fd << " disconnect" << std::endl;
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, nullptr);

      } else if (events[i].events & EPOLLIN) {
        char buf[BUFSIZ];
        memset(buf, 0, sizeof(buf));
        int client_socket = events[i].data.fd;
        int nrecv = recv(client_socket, buf, BUFSIZ, 0);
        std::cout << "recv from " << client_socket << " :" << buf << std::endl;
        if (nrecv == 0) {
          std::cout << events[i].data.fd << " disconnect" << std::endl;
          epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, nullptr);
        }
      }
    }
  }

  close(epoll_fd);
  close(server_socket);
  return 0;
}

epoll 工作模式:

  • 水平触发 level trigger:(默认,支持 block socket 和 non-block socket)
    若报告了fd事件后没有被处理或数据没有被全部读取,epoll还会再报告该事件
  • 边缘触发 edge trigger:(仅支持非阻塞)
    若报告了fd事件后没有被处理或数据没有被全部读取,epoll不会再报告该事件。如果不立即处理,数据会丢失。

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

 

 

 

 

 

 

标签:多路复用,epoll,int,描述符,fds,fd,IO,Linux,include
来源: https://www.cnblogs.com/zhh567/p/16629782.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有