ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

reactor简介

2022-01-29 12:32:42  阅读:218  来源: 互联网

标签:reactor int 简介 si fd ev events struct


在IO多路复用中,epoll等IO多路复用工具是对IO进行管理,使用reactor模式,变为对事件的管理。

struct sockitem //socket的中间状态可以保存到这个结构体,如接收了一半的数据包
{
	int sockfd; 
	int (*callback)(int fd, int events, void *arg); //利用回调函数处理,避免区分读写和accpet
	char recvbuffer[1024]; 
	char sendbuffer[1024]; 
};

struct reactor //全局用到的变量
{
	int epfd;
	struct epoll_event events[512];
};
struct reactor *eventloop = NULL;

int recv_cb(int fd, int events, void *arg);

int send_cb(int fd, int events, void *arg) 
{

	struct sockitem *si = (struct sockitem*)arg;
	send(fd, "hello\n", 6, 0);
	struct epoll_event ev;
	ev.events = EPOLLIN | EPOLLET;
	//ev.data.fd = clientfd;
	si->sockfd = fd;
	si->callback = recv_cb;
	ev.data.ptr = si;
	epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}

int recv_cb(int fd, int events, void *arg) 
{
	//int clientfd = events[i].data.fd;
	struct sockitem *si = (struct sockitem*)arg;
	struct epoll_event ev;
	char buffer[1024] = {0};
	int ret = recv(fd, buffer, 1024, 0);
	if (ret < 0) 
{
		if (errno == EAGAIN || errno == EWOULDBLOCK) 
        { 
			return -1;
		} else 
        {
			//出错操作
		}
		ev.events = EPOLLIN;
		//ev.data.fd = fd;
		epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
		close(fd);
		free(si);
	} else if (ret == 0) 
{
		printf("disconnect %d\n", fd);
		ev.events = EPOLLIN;
		//ev.data.fd = fd;
		epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
		close(fd);
		free(si);
	} else
    {
		printf("Recv: %s, %d Bytes\n", buffer, ret);
		struct epoll_event ev;
		ev.events = EPOLLOUT | EPOLLET;
		//ev.data.fd = clientfd;
		si->sockfd = fd;
		si->callback = send_cb;
		ev.data.ptr = si;
		epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
	}
}

int accept_cb(int fd, int events, void *arg) 
{
	struct sockaddr_in client_addr;
	memset(&client_addr, 0, sizeof(struct sockaddr_in));
	socklen_t client_len = sizeof(client_addr);
	int clientfd = accept(fd, (struct sockaddr*)&client_addr, &client_len);
	if (clientfd <= 0) return -1;
	char str[INET_ADDRSTRLEN] = {0};
	printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port));

	struct epoll_event ev;
	ev.events = EPOLLIN | EPOLLET;
	//ev.data.fd = clientfd;
	struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
	si->sockfd = clientfd;
	si->callback = recv_cb;
	ev.data.ptr = si;
	epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, clientfd, &ev);
	return clientfd;
}

int main(int argc, char *argv[]) 
{
	if (argc < 2) 
{
		return -1;
	}
	int port = atoi(argv[1]);
	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
	if (sockfd < 0) 
{
		return -1;
	}
	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(struct sockaddr_in));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = INADDR_ANY;
	if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) 
{
		return -2;
	}
	if (listen(sockfd, 5) < 0) 
{
		return -3;
	}
	eventloop = (struct reactor*)malloc(sizeof(struct reactor));
	eventloop->epfd = epoll_create(1);
	struct epoll_event ev;
	ev.events = EPOLLIN;
	//ev.data.fd = sockfd; //int idx = 2000;
	

	struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
	si->sockfd = sockfd;
	si->callback = accept_cb;
	ev.data.ptr = si;
	epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev);
	pthread_t id;
	pthread_create(&id, NULL, worker_thread, NULL);
	//pthread_cond_waittime();
	while (1) 
{
		int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
		if (nready < -1) 
        {
			break;
		}
		int i = 0;
		for (i = 0;i < nready;i ++) 
        {
			if (eventloop->events[i].events & EPOLLIN) //读事件
            {
				//printf("sockitem\n");
				struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
				si->callback(si->sockfd, eventloop->events[i].events, si);
			}
			if (eventloop->events[i].events & EPOLLOUT) //写事件
            {
				struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
				si->callback(si->sockfd, eventloop->events[i].events, si);
			}
		}
	}
}

这里有一个问题,listenfd是和其他fd一样,加入到epoll中监听读事件的循环。这样每次循环就只能accpet一次,只能产生一个新连接,其余都是与listenfd无关的操作。对于截接入量较大的场景,响应速度较慢。可以用一个线程专门处理listenfd,一个线程处理其他clientfd。

void *worker_thread(void *arg) //处理clientfd
{
	while (1) 
{
		int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
		if (nready < -1) 
        {
			break;
		}
		int i = 0;
		for (i = 0;i < nready;i ++) 
        {
			if (eventloop->events[i].events & EPOLLIN) 
            {
				struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
				si->callback(si->sockfd, eventloop->events[i].events, si);
			}

			if (eventloop->events[i].events & EPOLLOUT) 
            {
				struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
				si->callback(si->sockfd, eventloop->events[i].events, si);
			}
		}
	}
}

void *thread_cb(void *arg)//处理listenfd
{
    //lfd,re,si已定义
    struct pollfd pfd={0};//只监听一个fd,没必要用epoll
    pfd.fd=lfd;//lfd是监听fd
    pfd.events=POLLIN;
    while(1)
    {
        int ret=poll(&pfd,lfd+1,-1);
        if(ret==-1)
        {
            printf("poll error,errno=%d\n",errno);
            return NULL;
        }
        if(pfd.revents&POLLIN)
        {
            struct sockaddr_in cli_addr={0};
            struct epoll_event ev={0};
            socklen_t cli_len=sizeof(struct sockaddr_in);
            int cfd=accept(lfd,(struct sockaddr*)&cli_addr,&cli_len);
            if(cfd<0)
            {
                printf("accpet error,errno=%d\n",errno);
                return NULL;
            }
            char ip[64]={0};
            printf("recv from %s at port %d\n",inet_ntop(AF_INET,&client_addr.sin_addr,ip,sizeof(ip)),ntohs(client_addr.sin_port));
            si=(struct sockitem*)malloc(sizeof(struct sockitem));
            si->sockfd=cfd;
            si->cb=recv_cb;
            ev.events=EPOLLIN;
            ev.data.ptr=si;
            epoll_ctl(re->epfd,EPOLL_CTL_ADD,cfd,&ev);
        }
    }
}

如果接入速度还不符合要求,我们还可以设置多个listenfd开多个线程,每个线程监听一个listenfd,同时监听clientfd的线程也开多个,这样就是“多对多”。这样会产生一个问题,如果客户端前后session的数据有关联,会出现A线程调用B线程中的数据,导致不同线程之间数据共享(竞争),这样的话就需要加锁,但是效率会下降。解决方法是,做到把数据和业务逻辑相分离,线程运行时信息不存到服务器本身的进程中,而是存到单独的缓存中,这样就可以做到隔离客户端前后之间的依赖关系。这样一来,就相当于服务器不存储数据,单处理业务。客户端的请求由不同的服务线程响应也没关系,直接根据客户端id去缓存中取数据即可。
对于多进程的结构,方法也是一样,不过还有几个问题。一是,多进程监听同一端口,需要listen()之后直接fork(),listenfd就会被继承到子进程中,与父进程一致。这里再提一下,epoll_create()在fork()之前或之后都可以,虽然效果不同,但是没有太大影响。二是惊群的问题,一个连接来了由哪个进程响应,通过共享内存的一把锁实现,保证任一时刻epoll中只有一个fd加入。

最后,再提一个问题,为什么UDP很少用epoll,而TCP却用的多。UDP对于服务器而言,只有一个fd,没必要使用多路复用。TCP是一个连接就有一个fd,所以需要epoll去管理。

标签:reactor,int,简介,si,fd,ev,events,struct
来源: https://blog.csdn.net/m0_65931372/article/details/122741518

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有