ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Redis源码分析--事件处理器

2022-02-06 17:36:15  阅读:251  来源: 互联网

标签:AE -- mask eventLoop Redis long int 源码 events


事件处理器:

​ Redis采用Reactor模式作为自己的网络事件处理器,可以看作是单线程单Reactor模型。

一、主要结构体:

1、事件:

/* File event structure */
typedef struct aeFileEvent {
    /* 事件类型:可读or可写 */
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;

  • Redis的事件分为文件事件与时间事件;

  • L5:文件事件处理函数,一个函数指针:

    typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
    
  • L15:时间事件处理函数,函数指针:

    typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
    

2、 事件循环:

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    /* 用户层保存的events,不是poller系统调用的events,注意区别 */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    /* 时间事件列表 */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;
  • L4:Redis自己实现定时器;
  • L8:注意区别;
  • L11:时间事件被放置在一个无序列表中,由于当前版本(Redis3.0)下Redis服务器只有serverCron一个时间事件,所以这个列表实际上退化为指针,无序也不影响性能;

二、创建及初始化:

1、创建事件循环:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    /* aeApiCreate:系统适配的poller api的初始化
     * Linux下使用epoll,创建epoll api所需要的epollfd和epoll_event数组,并放入apidata中*/
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}
  • L12:Redis没有使用timefd的api;
  • L18:注意Redis会选择对于该系统适配最佳的poller;

2、创建文件事件:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    /* fd不能超过setsize */
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    /* 取出空的aeFileEvent */
    aeFileEvent *fe = &eventLoop->events[fd];
    /* Linux下调用(之后默认Linux)epoll_ctl(2)添加事件 */
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}
  • L12:注意对系统调用的封装;

3、创建时间事件:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    /* 分配id */
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    /* 设置定时时间 */
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    /* 处理函数 */
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    /* 新事件插到链表头 */
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

三、开始事件循环:

  • 在Redis main函数中调用aeMain:
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

1、事件处理函数:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        /* 根据最近的时间事件设置poller调用的阻塞时间 */
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
        /* epoll_wait调用 */
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            /* 事件处理 */
            if (fe->mask & mask & AE_READABLE) {
                /* rfired 确保读/写事件只能执行其中一个 */
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    /* 先处理文件事件,再处理时间事件 */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
  • L19、L75:以最近时间事件的到达时间作为poller调用阻塞时间,以及先处理文件事件,再处理时间事件的好处是:
    1. 避免长时间阻塞在文件事件的监听上;
    2. 处理完文件事件后最近时间事件刚好或即将触发,这时再处理效率更高;
  • L76:时间事件分为周期性事件和定时事件,周期性事件在执行完后会被重新监听,目前Redis用的就是周期性事件;

参考:

​ 本文基于redis 3.0

标签:AE,--,mask,eventLoop,Redis,long,int,源码,events
来源: https://www.cnblogs.com/macguz/p/15865937.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有