##前言 redis 源码分析很多人在做了,我只是重复造轮子。

  • ae 模块结构体分析
  • debug redis 初始化过程中,ae模块初始化了什么事件。(ae的函数会在这里讲到)

##redis ae事件库结构体分析 ae.h

  • 文件事件结构体

      /* File event structure */
      typedef struct aeFileEvent {
      	   int mask; /* one of AE_(READABLE|WRITABLE) */
      	   aeFileProc *rfileProc;
      	   aeFileProc *wfileProc;
      	   void *clientData;
      } aeFileEvent;	
    

    mask 是标记动作类型,可读或者可写;

    aeFileProc 是事件处理函数 原型是:
    typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

    void *clientData; 是api所需要的数据


  • 时间事件结构体

      /* Time event structure */
      typedef struct aeTimeEvent {
      	   long long id; /* time event identifier. */
      	   long when_sec; /* seconds */
      	   long when_ms; /* milliseconds */
      	   aeTimeProc *timeProc;
      	   aeEventFinalizerProc *finalizerProc;
      	   void *clientData;
      	   struct aeTimeEvent *next;
      } aeTimeEvent;
    

    aeTimeProc 的原型: ` typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); `

    aeEventFinalizerProc是事件被删除时候调用的清理函数:
    ` typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); `

    struct aeTimeEvent *next; 可以看出这是个链表,指向下一个时间事件

  • 就绪事件

      /* A fired event */
      typedef struct aeFiredEvent {
        int fd;
        int mask;
      } aeFiredEvent;
    
  • 事件循环

      /* State of an event based program */
      typedef struct aeEventLoop {
          int maxfd; /* highest file descriptor currently registered */
          int setsize; /* max number of file descriptors tracked */
          long long timeEventNextId;
          aeFileEvent *events; /* Registered events */
          aeFiredEvent *fired; /* Fired events */
          aeTimeEvent *timeEventHead;
          int stop;
          void *apidata; /* This is used for polling API specific data */
          aeBeforeSleepProc *beforesleep;
      } aeEventLoop;
    

    int maxfd 当前已经注册的最大的文件描述符

    int setsize 允许的最大文件描述符的数量

    long long timeEventNextId 时间事件的id(这个是用来生成新id的)

    ` aeBeforeSleepProc *beforesleep; 是处理事件之前要执行的函数,原型是: typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);`


##redis 启动过程中,ae库的初始化及注册的事件 ** 这里选用什么debug工具都是可以的,GDB或是IDE,我是ubuntu的图形GDB :Nemiver **

  • make && src/redis-server
    我是ubuntu的系统,按照redis的宏定义,我默认是使用epoll的。

    直接在redis根目录make,会在src/里面生成几个可执行文件,redis-server和redis-cli就是redis的服务器和客户端。对redis-server的装载,并令启动参数是: redis.conf

    redis.c 里面的main方法会初始化zmallor模块,然后读取启动参数,还有配置文件的解释。然后redis会开始初始化服务器,所以可以把第一个断点打在 void initServer()

      $ gdb src/redis-server 
      Reading symbols from /d/wo/c/redis/src/redis-server...done.       
      (gdb) b initServer
      Breakpoint 1 at 0x4194f0: file redis.c, line 1247.
      (gdb) run redis.conf
      Starting program: /d/wo/c/redis/src/redis-server redis.conf
      [Thread debugging using libthread_db enabled]
      Using host libthread_db library "/lib/x86_64-linux-gnu/        libthread_db.so.1".
    
      Breakpoint 1, initServer () at redis.c:1247
      warning: Source file is more recent than executable.
      1247	void initServer() {
      (gdb) 
    

    initServer 里面做的事情不少,我着重看ae相关的。

      Breakpoint 2, initServer () at redis.c:1268
      1268	    server.el = aeCreateEventLoop(server.maxclients+1024);
      (gdb) print server.maxclients
      $2 = 3984
    

    这里可以看到是创建一个事件循环,这里server.maxclients之所以是3984,是因为我执行的不是root,没有权限修改最大客户端链接数,所以是默认值

  • 创建一个事件循环
    aeEventLoop *aeCreateEventLoop(int setsize) 里面主要是分配和初始化结构体的内存空间,然后调用封装过的IO复用库。

    对于epoll的封装来说,使用了一个结构体,然后会存放到eventLoop->apidata里面。 下面是一系列的代码片段 at ae_epoll.c

      typedef struct aeApiState {
          int epfd;
          struct epoll_event *events;
      } aeApiState;
      ......
      state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
      ......
      state->epfd = epoll_create(1024); 
      ......
      eventLoop->apidata = state;
    

    创建完一个事件循环之后,事件循环结构体会放到server.el里面。之后会初始化tcp连接,和一些配置,忽略了。打下一个断点,那里有我想要的东西。

      (gdb) b 1328
      Breakpoint 5 at 0x4197cd: file redis.c, line 1328.
      (gdb) c	
      Breakpoint 5, initServer () at redis.c:1328
      1328	    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
    
  • 创建一个时间事件

    参照函数原型,可以知道,serverCron就是超时之后的回调的函数.

      long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
      aeTimeProc *proc, void *clientData,
      aeEventFinalizerProc *finalizerProc)
      {
          long long id = eventLoop->timeEventNextId++;
          aeTimeEvent *te;
          te = zmalloc(sizeof(*te));
          if (te == NULL) return AE_ERR;
          te->id = id;
          // 设定处理该事件的时间
          //这个函数内部是取出当前时间,并算出时间差
          aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
          te->timeProc = proc;
          te->finalizerProc = finalizerProc;
          te->clientData = clientData;
          // 将新事件放入时间事件的链表表头
          te->next = eventLoop->timeEventHead;
          eventLoop->timeEventHead = te;
          return id;
      }
    
  • 创建一个文件事件
    成功创建时间事件之后,开始创建文件事件
    aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,acceptTcpHandler,NULL)

      int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
      {
          
          if (fd >= eventLoop->setsize) return AE_ERR;
        
          aeFileEvent *fe = &eventLoop->events[fd];
        
          // 这里将事件加入epoll的监控队列(1)
          if (aeApiAddEvent(eventLoop, fd, mask) == -1)
      return AE_ERR;
    
          //将该fd原有的标识和新标识进行  按位“或”赋值运算 ,并把按照事件标识去初始化执行函数 (2)
          fe->mask |= mask;
          if (mask & AE_READABLE) fe->rfileProc = proc;
          if (mask & AE_WRITABLE) fe->wfileProc = proc;
    
          fe->clientData = clientData;
          if (fd > eventLoop->maxfd)
              eventLoop->maxfd = fd;
    
          return AE_OK;
      }
    

    我先说(2),一个事件就是一个动作,而在某一刻只有读或者写动作。至于为什么用‘或’ 和 ‘与’运算,因为mask的动作只有0,1,2。所以操作可行,而且相对高效。

    对于(1),添加动作到epoll。首先会检查这个fd是否已经有监控的事件,来决定epoll是替换事件还是增加事件, int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    一点点不同的地方是epoll可以多个事件,所以代码中也使用了位运算,但是是合并操作。

      mask |= eventLoop->events[fd].mask; /* Merge old events */
      if (mask & AE_READABLE) ee.events |= EPOLLIN;
      if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    
  • 事件处理循环

    redis.c的main方法最后几个动作就是设置 aeSetBeforeSleepProc(server.el,beforeSleep);,然后进入事件处理循环 ` aeMain(server.el);。里面很简单就一个while死循环,然后调用beforesleep动作,进入aeProcessEvents(eventLoop, AE_ALL_EVENTS);`开始处理事件。

    第二参数是flag:   * 如果 flags 为 0 :不做动作,直接返回,   * 如果 flags 的 AE_ALL_EVENTS 处理所有事件,   * 如果 flags 的 AE_FILE_EVENTS 处理文件事件,   * 如果 flags 的 AE_TIME_EVENTS 处理时间事件,   * 如果 flags 的 AE_DONT_WAIT 处理不需等待的事件;
    

aeProcessEvents 流程

  • aeProcessEvents 的处理原则是: 1 有文件事件都会处理;

    2 如果时间事件和文件事件都要处理时候,先查找最近的时间事件的时间差,而这个时间差就是文件事件调用epoll_wait的超时时间。 如果使用了 AE_DONT_WAIT epoll_wait 的超时时间就会 ` -1 ` ,即阻塞;

    3 再处理时间事件。 static int processTimeEvents(aeEventLoop *eventLoop) ,处理时间事件时候会遍历链表,逐个判断超时,超时之后就会调用回调函数,最后会判断是否重复触发,是就重新设置时间差,不是的就会删除掉该时间事件。

  • redis成功启动以后 到这里redis成功启动了。无连接情况下,它会回调两个函数,一个是超时的int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) ,另外一个是sleep前的 void beforeSleep(struct aeEventLoop *eventLoop)
    如果要触发文件事件,就是要连接上redis。只要执行src/redis-cli就可以了。触发epoll_wait之后,会回调void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask),里面会accept tcp连接,并会创建一个redisClient,将添加文件事件到主eventloop: aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR)

      (gdb) b acceptTcpHandler 
      Breakpoint 1 at 0x41fc60: file networking.c, line 514.
      (gdb) b createClient 
      Breakpoint 2 at 0x41f250: file networking.c, line 23.
      (gdb) b readQueryFromClient 
      Breakpoint 3 at 0x421310: file networking.c, line 983.
      --
      src/redis-cli
    

下面可以接着调下去了。

后话

这是重复造轮子的工作,原本是用着libev的,偶尔看到redis自己实现了一个够用的事件库,好奇心重,就调试了一遍。学到很多东西,也大概了解redis的事件库工作模式了。
-by dp

–EOF–



blog comments powered by Disqus

Published

04 September 2012

Category

c