小白谈一谈:命令在redis中是怎么被执行的呢?

当我准备谈谈 redis 的命令是如何执行时,发现其实在网上已经有很多优秀的文章讲过这类问题了。
推荐看过的一个 github 上的源码分析:github.com/menwengit/redis_source_...

当然,即使很多人已经讲解了这类问题,可是依然不影响我去写一些自己的所感。因为,读书百遍不如自己写一遍。这也是防止眼高手低。

命令是如何被执行的呢?#

以在命令行执行命令 redis-cli 为例。这个我们太熟悉了,只要执行 redis-cli 就连接 redis-server 成功了,接下来就可以执行一些 getset 命令。再然后就得到了我们要的结果。但是你从来有想过,这些命令的背后到底干了些什么呢?接下里是我翻看源码 (版本 3.0.7) 之后,得到的一些答案。

第一步:启动服务器#

文件入口:redis.cmain 函数
重点看如下的 2 个方法:

initServer();
aeMain(server.el);

initServer 中初始化的数据很多,其中和本节讨论相关的有:

  • 初始化 server 的数据存储结构
  • 初始化服务端 socket
  • 初始化 EventLoop

初始化 server 的数据存储结构是贯穿 redis 的生命周期的,因为任何操作都会涉及到从 redis 的数据结构中获取或者设置数据。
初始化服务端 socket
1)在 server 的配置文件中,是支持多个 socket 的配置。也支持 IPv4 和 IPv6。这么做的好处就是 redis-server 既可以支持本地连接的 unix,又支持本地的网络连接 127.0.0.1,还能支持远程的网络连接 remote_address:port 等。
2)默认的 socket 初始化后设置的是非阻塞模式。
初始化 EventLoop

对于小白的我来说,刚学 redis 听得最多的就是 redis 中的事件,之所以 redis 很快就是因为事件和 IO 多路复用及单进程等等。听的再多,如果不自己去探究源码终究是纸上得来终觉浅,绝知此事要躬行。所以即便是看完了本篇文章依然要去看源代码,每个人的理解都会不一样。我本地是 mac,redis 底层自动会使用 kqueue。以下的分析都是基于 kqueue 的 IO 多路复用。

// 初始化 EventLoop对象
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
// 注册时间事件到el对象,回调函数是`serverCron`
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
// 注册网络连接的文件事件:就是服务端socket监听client的连接,并且关联fd和回调函数`acceptTcpHandler`。
for (j = 0; j < server.ipfd_count; j++) {
    aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL)
}
// 注册本地文件事件:监听本地client的连接并且关联回调函数`acceptUnixHandler`。
aeCreateFileEvent(server.el,server.sofd,AE_READABLE,acceptUnixHandler,NULL);

到这里,将服务端的 socket 和时间事件及文件事件都简单的拉出来溜溜了。怎么理解 kqueue 的事件触发呢?联想 PHP 的 socket_select 函数去理解。推荐阅读:www.jianshu.com/p/397449cadc9a

  • socket_select 函数是很早之前最开始使用的 IO 多路复用模式,它的工作机制就是在内核态接收到的事件通知给用户态,并且返回目前已准备好的数量。然后切换到用户态之后在循环已经准备好的数量,接收数据。
    socket_select($read, $write, $except, $sec, $usec);
  • $read 是可读事件:将所有可读的事件放到该数组,然后从该数组循环遍历并且读取数组内文件的数据。
  • $write 是可写事件:将已准备好的可写文件放在这数组,然后循环遍历调用写函数写入数据到该文件。

select 的缺点就是循环遍历,并且最大只能支持 1024 个客户端同时连接。后来升级到了 poll,但是 poll 只是去除了 1024 的限制。后来直到进化到了 epoll,才得以解决 select 的所有弊端,kqueue 和 epoll 类似。

都知道一个问题:Redis 是串行的,那为什么速度还那么快呢?一般我们的回答都是 IO多路复用。那么现在有个问题就是:Redis是如何保证串行的呢?

当你读完 Redis 串行地处理 client 的请求时,可能并没有多大感触。但是如果让你写一个基于多进程下的 server 和 client 通信时,你就会发现如何保证 server 和 client 接收到的数据准确可信是很难的问题。这里就不得不赞叹为什么 Redis 要用单进程处理了,因为实在是省去了很多并发问题的思考。
多进程下的 serverclient 通信问题,看 beanstalk 的 issue:github.com/pheanstalk/pheanstalk/i...

解答这个问题必须再看源代码的时候,注意 aeEventLoop 的数据结构:

/* State of an event based program */
// 事件状态的描述
typedef struct aeEventLoop {
    // 最大的事件描述符
    int maxfd;   /* highest file descriptor currently registered */
    // 目前追踪的事件数量
    int setsize; /* max number of file descriptors tracked */
    // 时间事件的下一个待触发的id
    long long timeEventNextId;
    // 用于检测系统偏差
    time_t lastTime;     /* Used to detect system clock skew */
    // 已注册的文件事件:如服务端socket。是一个array。
    aeFileEvent *events; /* Registered events */
    // 触发的事件,是一个array。
    aeFiredEvent *fired; /* Fired events */
    // 时间事件是一个链表。指向了该链表的头指针。
    aeTimeEvent *timeEventHead;
    // 置为1则结束循环
    int stop;
    // 万能指针 void*:
    // 我的mac是使用的是kqueue,所以保存的数据结构如:
    /*typedef struct aeApiState {
            int kqfd; //指的是kqueue
            struct kevent *events; //提前申请好的setsize的kqueue的大小空间
        } aeApiState;*/
    void *apidata; /* This is used for polling API specific data */
    // 回调函数
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

先注意到两个属性:aeFileEvent *eventsaeFiredEvent *fired,然后对比下 ae.c 文件下的 aeCreateFileEvent 函数去了解。

// accept:server.el  server.ipfd[j]   AE_READABLE   acceptTcpHandler  null
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    // 已注册的事件,这里是fe是引用的地址。
    aeFileEvent *fe = &eventLoop->events[fd];
    // 注册kevent事件:readable | writeable
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    //如 accept是READBALE事件
    fe->mask |= mask;
    // rfileproc和wfileproc分别记录回调函数,和fd关联起来了
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    // ??
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

结合之前的 accept 操作:server.el server.ipfd [j] AE_READABLE acceptTcpHandler null,那么得到的如下:

/* File event structure
typedef struct aeFileEvent {
    int mask;  //one of AE_(READABLE|WRITABLE)
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent; */

aeFileEvent *events[fd1] = {
    int mask = AE_READABLE;
    aeFileProc *rfileProc = 'acceptTcpHandler';
    aeFileProc *wfileProc;
    void *clientData = null;
}; /* Registered events */
aeFiredEvent *fired; /* Fired events */

同时 aeApiAddEventkevent 注册了文件事件,代码如下:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct kevent ke;
    if (mask & AE_READABLE) {
        EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
        /* kevent参数说明
        kq            - kqueue() 返回的唯一描述符, 标记着一个内核队列
        changes       – 需要对kqueue进行修改的事件集合, 此参数就是kevent()对目前kqueue中的事件的操作,比如删除kqueue中已经存在的事件,或者向kqueue中添加新的事件,也就是说,kevent()通过此参数对kqueue的修改
        nchanges      – 需要修改的事件的个数
        events        – kevent()会把所有事件存储在events中
        nevents       – kevent()需要知道存储空间有多大, == 0 : kevent()会立即返回
        timeout       – 超时控制, = NULL:kevent()会一直等到有关注的事件发生; != NULL:kevent()会等待指定的时间
        */
        // 结合select理解:就是添加到了read数组,并且等待数据的读取。
        if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
    }
    if (mask & AE_WRITABLE) {
        EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
        // 结合select理解:就是添加到了write数组,并且等待数据的写入。
        if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
    }
    return 0;
}

当事件被触发时,那么就会触发执行回调函数 acceptTcpHandler,简练的代码如下:

// 其实就是执行了`accept`函数,并且返回了client的文件描述符cfd。
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 设置成非阻塞模式,且将cfd注册到kevent:
// aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)
acceptCommonHandler(cfd,0);

那么当 client 请求 server 发送命令时,就会触发函数 readQueryFromClient,Redis 经过对命令一系列的处理之后,经过函数 processCommand 执行命令,最终执行的代码如:c->cmd->proc(c);,这里的函数就是执行命令的函数,如 set|get 等。执行完命令之后通过 addReply 函数返回给客户端结果。但是并不会立即返回给 client,而是将事件注册到了 kevent:aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c),待。
这里可想可知 EventLoop 的属性 event 又多加了一个事件。
此时就能得到如下的 EventLoop 的部分属性数据:

aeFileEvent 
events[fd1] = {
    int mask = AE_READABLE;
    aeFileProc *rfileProc = 'acceptTcpHandler';
    aeFileProc *wfileProc;
    void *clientData = null;
}; /* Registered events */
event[fd2] = {
    int mask = AE_READABLE;
    aeFileProc *rfileProc = 'readQueryFromClient';
    aeFileProc *wfileProc;
    void *clientData = c;
};
event[fd2] = {
    int mask = AE_READABLE;
    aeFileProc *rfileProc = 'sendReplyToClient';
    aeFileProc *wfileProc;
    void *clientData = c;
};
aeFiredEvent *fired; /* Fired events */

以上都只是介绍了事件被触发后的执行过程,但是这些事件是何时被触发的呢?这些事件的触发在 main 函数的 aeMain 函数中,跟踪代码如:

...
// 和时间事件联合起作用,可能等待至下一个时间事件的触发,也可能等待直到某个事件发生。
// 一旦某个事件被触发,就会将该事件记录到`EventLoop`的`fired`属性。然后异步调用了回调函数。
numevents = aeApiPoll(eventLoop, tvp);
// 这就是串行的根本原因所在:将所有触发的事件先保存到数组`fired`,然后在遍历这些事件依次处理。
// 由于此前已经将fd和回调函数相关联,则在遍历的时候就可以执行回调函数。
for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

        /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                /* 如socket等待accept,那么就是一个READABLE事件,然后执行的回调函数
                 * rfileProc=acceptTcpHandler,那么就会执行这个函数 */
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    // 如get命令,就是一个writeable事件
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }

看下 aeApiPoll 函数:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    if (tvp != NULL) {
        struct timespec timeout;
        timeout.tv_sec = tvp->tv_sec;
        timeout.tv_nsec = tvp->tv_usec * 1000;
        // 时间事件存在时,则等待下个时间事件的触发的时间。如果此区间触发了一个文件事件,
        // 则优先处理文件事件。时间事件可能就会延迟处理。
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        &timeout);
    } else {
        // 阻塞等待直到事件的发生
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        NULL);
    }

    if (retval > 0) {
        int j;
        numevents = retval;
        // 文件事件被触发了,优先执行文件事件
        for(j = 0; j < numevents; j++) {
            int mask = 0;
            struct kevent *e = state->events+j;
            // 将kevent监听到的事件类型和eventLoop的mask相关联。
            if (e->filter == EVFILT_READ) mask |= AE_READABLE;
            if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
            // 这就是串行,是一个数组
            eventLoop->fired[j].fd = e->ident;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

到这里,在整体的过一遍就是:

  • server 注册服务端 socket 的 fd 到 kevent,然后 kevent 就会异步的去监听这个 fd,并且将这个 fd 关联回调函数 acceptTcpHandler
  • kevent 监听到了 client 的连接请求,然后调用了函数 acceptTcpHandler,然后再把 cfd 注册到了 kevent 且关联了回调函数 readQueryFromClient
  • kevent 监听到了 client 的发送数据,然后调用了函数 readQueryFromClient,再处理请求将 cfd 注册到了 kevent 且关联了回调函数 sendReplyToClient

更具体的描述是:

  • initServer 中先初始化 server.el, 这是全局公用的。
    server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
  • 通过 aeCreateTimeEvent 注册时间事件。这里只是注册,该函数会立即返回。
    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
  • 然后将初始化好的服务端 Socket 也通过 aeCreateFileEvent 注册文件事件。
    aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
    aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL);
  • 然后在 aeMain 函数中,处理时间事件和文件事件。
    void aeMain(aeEventLoop *eventLoop) {
      eventLoop->stop = 0;
      // 事件处理是一个循环
      while (!eventLoop->stop) {
          // 在此触发beforesleep函数。
          if (eventLoop->beforesleep != NULL)
              eventLoop->beforesleep(eventLoop);
          aeProcessEvents(eventLoop, AE_ALL_EVENTS);
      }
    }
  • aeProcessEvents 函数会先执行时间事件,在链表 timeEventHead 找到最近的时间事件。
    ```php
    /* … 省略上述代码
    找到最近的时间事件并且计算下次执行的时间间隔。但是可以看到时间事件的优先级是低于文件事件的。
    即使时间事件已经到了该执行的时间,可是也会先调用 kevent 函数等待文件事件的发生。不过这里分为两种情况:
    1)无时间事件:kevent 会一直阻塞等待直到文件事件发生。
    2)有时间事件:kevent 会等待最近时间事件触发的时间间隙去处理文件事件。如果没有文件事件发生,则处理时间事件。
    aeApiPoll 函数内,如果有文件事件触发,则将 fd 放到 aeEventLoop->aeFiredEventfired 数组,并且关联 AE_READABLE|AE_WRITABLE。

*/
numevents = aeApiPoll(eventLoop, tvp);

- 循环`aeEventLoop->aeFiredEvent`的`fd`,并且关联起`aeEventLoop->aeFileEvent`的`event`,取出`fd`的对应关系`event[fd]`的操作函数`proc`即之前注册事件时的回调函数,然后开始执行回调函数`proc`。如此所有kevent触发的事件都会被串行放在`fired`数组,然后在依次执行这些`fd`的回调函数。

> 时间事件和文件事件:
时间事件的触发分为定时事件和周期性事件,定时事件触发一次就结束了,而周期性事件会每隔固定的时间就被触发一次,如`serverCron`。
在时间事件触发的时间窗内,就会触发`kevent`阻塞相应的等待时间。在这个时间窗口内,可能就会触发了某个文件事件。如果没有触发文件事件,则会执行时间事件。
[kqueue介绍](https://blog.csdn.net/bytxl/article/details/17526351 "kqueue介绍")

## 第二步:client
分析源文件`redis-cli.c`的`main`函数。
```c
/* Start interactive mode when no command is provided */
    if (argc == 0 && !config.eval) {
        /* Ignore SIGPIPE in interactive mode to force a reconnect */
        signal(SIGPIPE, SIG_IGN);

        /* Note that in repl mode we don't abort on connection error.
         * A new attempt will be performed for every command send. */
         // client是阻塞模式
        cliConnect(0);
        repl();
    }

repl() 的代码继续跟踪到 redisGetReply,再到 redisBufferWrite 方法,是写数据到 server。

int redisGetReply(redisContext *c, void **reply) {
    int wdone = 0;
    void *aux = NULL;

    /* Try to read pending replies */
    if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
        return REDIS_ERR;

    /* For the blocking context, flush output buffer and read reply */
    if (aux == NULL && c->flags & REDIS_BLOCK) {
        /* Write until done */
        do {
            if (redisBufferWrite(c,&wdone) == REDIS_ERR)
                return REDIS_ERR;
        } while (!wdone);

        /* Read until there is a reply */
        do {
            if (redisBufferRead(c) == REDIS_ERR)
                return REDIS_ERR;
            if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
                return REDIS_ERR;
        } while (aux == NULL);
    }

    /* Set reply object */
    if (reply != NULL) *reply = aux;
    return REDIS_OK;
}
int redisBufferWrite(redisContext *c, int *done) {
    int nwritten;

    /* Return early when the context has seen an error. */
    if (c->err)
        return REDIS_ERR;

    if (sdslen(c->obuf) > 0) {
        nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
        if (nwritten == -1) {
            // errno==35 || errno == 4 这不是个错误,继续执行。注意到client是阻塞模式。
            if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
                /* Try again later */
            } else {
                __redisSetError(c,REDIS_ERR_IO,NULL);
                return REDIS_ERR;
            }
        } else if (nwritten > 0) {
            if (nwritten == (signed)sdslen(c->obuf)) {
                sdsfree(c->obuf);
                c->obuf = sdsempty();
            } else {
                sdsrange(c->obuf,nwritten,-1);
            }
        }
    }
    if (done != NULL) *done = (sdslen(c->obuf) == 0);
    return REDIS_OK;
}

读取 server 的数据代码如:

/* Use this function to handle a read event on the descriptor. It will try
 * and read some bytes from the socket and feed them to the reply parser.
 *
 * After this function is called, you may use redisContextReadReply to
 * see if there is a reply available. */
int redisBufferRead(redisContext *c) {
    char buf[1024*16];
    int nread;

    /* Return early when the context has seen an error. */
    if (c->err)
        return REDIS_ERR;

    nread = read(c->fd,buf,sizeof(buf));
    if (nread == -1) {
        if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
            /* Try again later */
        } else {
            __redisSetError(c,REDIS_ERR_IO,NULL);
            return REDIS_ERR;
        }
    } else if (nread == 0) {
        __redisSetError(c,REDIS_ERR_EOF,"Server closed the connection");
        return REDIS_ERR;
    } else {
        if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
            __redisSetError(c,c->reader->err,c->reader->errstr);
            return REDIS_ERR;
        }
    }
    return REDIS_OK;
}

思考#

分析完 Redis 的 kevent 之后,联想了下 Swoole协程,如代码:

Co\run(function() {
    go(function() {
        Co::sleep(1);
        // do something
    });
});
本作品采用《CC 协议》,转载必须注明作者和本文链接