小白谈一谈:命令在redis中是怎么被执行的呢?

当我准备谈谈redis的命令是如何执行时,发现其实在网上已经有很多优秀的文章讲过这类问题了。
推荐看过的一个github上的源码分析:github.com/menwengit/redis_source_...

当然,即使很多人已经讲解了这类问题,可是依然不影响我去写一些自己的所感。因为,读书百遍不如自己写一遍。这也是防止眼高手低。

命令是如何被执行的呢?

以在命令行执行命令redis-cli为例。这个我们太熟悉了,只要执行redis-cli就连接redis-server成功了,接下来就可以执行一些getset命令。再然后就得到了我们要的结果。但是你从来有想过,这些命令的背后到底干了些什么呢?接下里是我翻看源码(版本3.0.7)之后,得到的一些答案。

第一步:启动服务器

文件入口:redis.cmain函数
重点看如下的2个方法:

initServer();
aeMain(server.el);

initServer中初始化的数据很多,其中和本节讨论相关的有:

  • 初始化server的数据存储结构
  • 初始化服务端socket
  • 初始化EventLoop

初始化server的数据存储结构是贯穿redis的生命周期的,因为任何操作都会涉及到从redis的数据结构中获取或者设置数据。
初始化服务端socket
1)在server的配置文件中,是支持多个socket的配置。也支持IPv4和IPv6。这么做的好处就是redis-server既可以支持本地连接的unix,又支持本地的网络连接127.0.0.1,还能支持远程的网络连接remote_address:port等。
2)默认的socket初始化后设置的是非阻塞模式。
初始化EventLoop

对于小白的我来说,刚学redis听得最多的就是redis中的事件,之所以redis很快就是因为事件和IO多路复用及单进程等等。听的再多,如果不自己去探究源码终究是纸上得来终觉浅,绝知此事要躬行。所以即便是看完了本篇文章依然要去看源代码,每个人的理解都会不一样。我本地是mac,redis底层自动会使用kqueue。以下的分析都是基于kqueue的IO多路复用。

// 初始化 EventLoop对象
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
// 注册时间事件到el对象,回调函数是`serverCron`
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
// 注册网络连接的文件事件:就是服务端socket监听client的连接,并且关联fd和回调函数`acceptTcpHandler`。
for (j = 0; j < server.ipfd_count; j++) {
    aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL)
}
// 注册本地文件事件:监听本地client的连接并且关联回调函数`acceptUnixHandler`。
aeCreateFileEvent(server.el,server.sofd,AE_READABLE,acceptUnixHandler,NULL);

到这里,将服务端的socket和时间事件及文件事件都简单的拉出来溜溜了。怎么理解kqueue的事件触发呢?联想PHP的socket_select函数去理解。推荐阅读:www.jianshu.com/p/397449cadc9a

  • socket_select函数是很早之前最开始使用的IO多路复用模式,它的工作机制就是在内核态接收到的事件通知给用户态,并且返回目前已准备好的数量。然后切换到用户态之后在循环已经准备好的数量,接收数据。
    socket_select($read, $write, $except, $sec, $usec);
  • $read是可读事件:将所有可读的事件放到该数组,然后从该数组循环遍历并且读取数组内文件的数据。
  • $write是可写事件:将已准备好的可写文件放在这数组,然后循环遍历调用写函数写入数据到该文件。

select的缺点就是循环遍历,并且最大只能支持1024个客户端同时连接。后来升级到了poll,但是poll只是去除了1024的限制。后来直到进化到了epoll,才得以解决select的所有弊端,kqueue和epoll类似。

都知道一个问题:Redis是串行的,那为什么速度还那么快呢?一般我们的回答都是IO多路复用。那么现在有个问题就是:Redis是如何保证串行的呢?

当你读完Redis串行地处理client的请求时,可能并没有多大感触。但是如果让你写一个基于多进程下的server和client通信时,你就会发现如何保证server和client接收到的数据准确可信是很难的问题。这里就不得不赞叹为什么Redis要用单进程处理了,因为实在是省去了很多并发问题的思考。
多进程下的serverclient通信问题,看beanstalk的issue:github.com/pheanstalk/pheanstalk/i...

解答这个问题必须再看源代码的时候,注意aeEventLoop的数据结构:

/* State of an event based program */
// 事件状态的描述
typedef struct aeEventLoop {
    // 最大的事件描述符
    int maxfd;   /* highest file descriptor currently registered */
    // 目前追踪的事件数量
    int setsize; /* max number of file descriptors tracked */
    // 时间事件的下一个待触发的id
    long long timeEventNextId;
    // 用于检测系统偏差
    time_t lastTime;     /* Used to detect system clock skew */
    // 已注册的文件事件:如服务端socket。是一个array。
    aeFileEvent *events; /* Registered events */
    // 触发的事件,是一个array。
    aeFiredEvent *fired; /* Fired events */
    // 时间事件是一个链表。指向了该链表的头指针。
    aeTimeEvent *timeEventHead;
    // 置为1则结束循环
    int stop;
    // 万能指针 void*:
    // 我的mac是使用的是kqueue,所以保存的数据结构如:
    /*typedef struct aeApiState {
            int kqfd; //指的是kqueue
            struct kevent *events; //提前申请好的setsize的kqueue的大小空间
        } aeApiState;*/
    void *apidata; /* This is used for polling API specific data */
    // 回调函数
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

先注意到两个属性:aeFileEvent *eventsaeFiredEvent *fired,然后对比下ae.c文件下的aeCreateFileEvent函数去了解。

// accept:server.el  server.ipfd[j]   AE_READABLE   acceptTcpHandler  null
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    // 已注册的事件,这里是fe是引用的地址。
    aeFileEvent *fe = &eventLoop->events[fd];
    // 注册kevent事件:readable | writeable
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    //如 accept是READBALE事件
    fe->mask |= mask;
    // rfileproc和wfileproc分别记录回调函数,和fd关联起来了
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    // ??
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

结合之前的accept操作:server.el server.ipfd[j] AE_READABLE acceptTcpHandler null,那么得到的如下:

/* File event structure
typedef struct aeFileEvent {
    int mask;  //one of AE_(READABLE|WRITABLE)
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent; */

aeFileEvent *events[fd1] = {
    int mask = AE_READABLE;
    aeFileProc *rfileProc = 'acceptTcpHandler';
    aeFileProc *wfileProc;
    void *clientData = null;
}; /* Registered events */
aeFiredEvent *fired; /* Fired events */

同时aeApiAddEventkevent注册了文件事件,代码如下:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct kevent ke;
    if (mask & AE_READABLE) {
        EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
        /* kevent参数说明
        kq            - kqueue() 返回的唯一描述符, 标记着一个内核队列
        changes       – 需要对kqueue进行修改的事件集合, 此参数就是kevent()对目前kqueue中的事件的操作,比如删除kqueue中已经存在的事件,或者向kqueue中添加新的事件,也就是说,kevent()通过此参数对kqueue的修改
        nchanges      – 需要修改的事件的个数
        events        – kevent()会把所有事件存储在events中
        nevents       – kevent()需要知道存储空间有多大, == 0 : kevent()会立即返回
        timeout       – 超时控制, = NULL:kevent()会一直等到有关注的事件发生; != NULL:kevent()会等待指定的时间
        */
        // 结合select理解:就是添加到了read数组,并且等待数据的读取。
        if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
    }
    if (mask & AE_WRITABLE) {
        EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
        // 结合select理解:就是添加到了write数组,并且等待数据的写入。
        if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
    }
    return 0;
}

当事件被触发时,那么就会触发执行回调函数acceptTcpHandler,简练的代码如下:

// 其实就是执行了`accept`函数,并且返回了client的文件描述符cfd。
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 设置成非阻塞模式,且将cfd注册到kevent:
// aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)
acceptCommonHandler(cfd,0);

那么当client请求server发送命令时,就会触发函数readQueryFromClient,Redis经过对命令一系列的处理之后,经过函数processCommand执行命令,最终执行的代码如:c->cmd->proc(c);,这里的函数就是执行命令的函数,如set|get等。执行完命令之后通过addReply函数返回给客户端结果。但是并不会立即返回给client,而是将事件注册到了kevent:aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c),待。
这里可想可知EventLoop的属性event又多加了一个事件。
此时就能得到如下的EventLoop的部分属性数据:

aeFileEvent 
events[fd1] = {
    int mask = AE_READABLE;
    aeFileProc *rfileProc = 'acceptTcpHandler';
    aeFileProc *wfileProc;
    void *clientData = null;
}; /* Registered events */
event[fd2] = {
    int mask = AE_READABLE;
    aeFileProc *rfileProc = 'readQueryFromClient';
    aeFileProc *wfileProc;
    void *clientData = c;
};
event[fd2] = {
    int mask = AE_READABLE;
    aeFileProc *rfileProc = 'sendReplyToClient';
    aeFileProc *wfileProc;
    void *clientData = c;
};
aeFiredEvent *fired; /* Fired events */

以上都只是介绍了事件被触发后的执行过程,但是这些事件是何时被触发的呢?这些事件的触发在main函数的aeMain函数中,跟踪代码如:

...
// 和时间事件联合起作用,可能等待至下一个时间事件的触发,也可能等待直到某个事件发生。
// 一旦某个事件被触发,就会将该事件记录到`EventLoop`的`fired`属性。然后异步调用了回调函数。
numevents = aeApiPoll(eventLoop, tvp);
// 这就是串行的根本原因所在:将所有触发的事件先保存到数组`fired`,然后在遍历这些事件依次处理。
// 由于此前已经将fd和回调函数相关联,则在遍历的时候就可以执行回调函数。
for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

        /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                /* 如socket等待accept,那么就是一个READABLE事件,然后执行的回调函数
                 * rfileProc=acceptTcpHandler,那么就会执行这个函数 */
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    // 如get命令,就是一个writeable事件
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }

看下aeApiPoll函数:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    if (tvp != NULL) {
        struct timespec timeout;
        timeout.tv_sec = tvp->tv_sec;
        timeout.tv_nsec = tvp->tv_usec * 1000;
        // 时间事件存在时,则等待下个时间事件的触发的时间。如果此区间触发了一个文件事件,
        // 则优先处理文件事件。时间事件可能就会延迟处理。
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        &timeout);
    } else {
        // 阻塞等待直到事件的发生
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        NULL);
    }

    if (retval > 0) {
        int j;
        numevents = retval;
        // 文件事件被触发了,优先执行文件事件
        for(j = 0; j < numevents; j++) {
            int mask = 0;
            struct kevent *e = state->events+j;
            // 将kevent监听到的事件类型和eventLoop的mask相关联。
            if (e->filter == EVFILT_READ) mask |= AE_READABLE;
            if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
            // 这就是串行,是一个数组
            eventLoop->fired[j].fd = e->ident;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

到这里,在整体的过一遍就是:

  • server注册服务端socket的fd到kevent,然后kevent就会异步的去监听这个fd,并且将这个fd关联回调函数acceptTcpHandler
  • kevent监听到了client的连接请求,然后调用了函数acceptTcpHandler,然后再把cfd注册到了kevent且关联了回调函数readQueryFromClient
  • kevent监听到了client的发送数据,然后调用了函数readQueryFromClient,再处理请求将cfd注册到了kevent且关联了回调函数sendReplyToClient

更具体的描述是:

  • initServer中先初始化server.el,这是全局公用的。
    server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
  • 通过aeCreateTimeEvent注册时间事件。这里只是注册,该函数会立即返回。
    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
  • 然后将初始化好的服务端Socket也通过aeCreateFileEvent注册文件事件。
    aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
    aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL);
  • 然后在aeMain函数中,处理时间事件和文件事件。
    void aeMain(aeEventLoop *eventLoop) {
      eventLoop->stop = 0;
      // 事件处理是一个循环
      while (!eventLoop->stop) {
          // 在此触发beforesleep函数。
          if (eventLoop->beforesleep != NULL)
              eventLoop->beforesleep(eventLoop);
          aeProcessEvents(eventLoop, AE_ALL_EVENTS);
      }
    }
  • aeProcessEvents函数会先执行时间事件,在链表timeEventHead找到最近的时间事件。
    ```php
    /* … 省略上述代码
    找到最近的时间事件并且计算下次执行的时间间隔。但是可以看到时间事件的优先级是低于文件事件的。
    即使时间事件已经到了该执行的时间,可是也会先调用kevent函数等待文件事件的发生。不过这里分为两种情况:
    1)无时间事件:kevent会一直阻塞等待直到文件事件发生。
    2)有时间事件:kevent会等待最近时间事件触发的时间间隙去处理文件事件。如果没有文件事件发生,则处理时间事件。
    aeApiPoll函数内,如果有文件事件触发,则将fd放到aeEventLoop->aeFiredEventfired数组,并且关联AE_READABLE|AE_WRITABLE。

*/
numevents = aeApiPoll(eventLoop, tvp);

- 循环`aeEventLoop->aeFiredEvent``fd`,并且关联起`aeEventLoop->aeFileEvent``event`,取出`fd`的对应关系`event[fd]`的操作函数`proc`即之前注册事件时的回调函数,然后开始执行回调函数`proc`。如此所有kevent触发的事件都会被串行放在`fired`数组,然后在依次执行这些`fd`的回调函数。

> 时间事件和文件事件:
时间事件的触发分为定时事件和周期性事件,定时事件触发一次就结束了,而周期性事件会每隔固定的时间就被触发一次,如`serverCron`。
在时间事件触发的时间窗内,就会触发`kevent`阻塞相应的等待时间。在这个时间窗口内,可能就会触发了某个文件事件。如果没有触发文件事件,则会执行时间事件。
[kqueue介绍](https://blog.csdn.net/bytxl/article/details/17526351 "kqueue介绍")

## 第二步:client
分析源文件`redis-cli.c``main`函数。
```c
/* Start interactive mode when no command is provided */
    if (argc == 0 && !config.eval) {
        /* Ignore SIGPIPE in interactive mode to force a reconnect */
        signal(SIGPIPE, SIG_IGN);

        /* Note that in repl mode we don't abort on connection error.
         * A new attempt will be performed for every command send. */
         // client是阻塞模式
        cliConnect(0);
        repl();
    }

repl()的代码继续跟踪到redisGetReply,再到redisBufferWrite方法,是写数据到server。

int redisGetReply(redisContext *c, void **reply) {
    int wdone = 0;
    void *aux = NULL;

    /* Try to read pending replies */
    if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
        return REDIS_ERR;

    /* For the blocking context, flush output buffer and read reply */
    if (aux == NULL && c->flags & REDIS_BLOCK) {
        /* Write until done */
        do {
            if (redisBufferWrite(c,&wdone) == REDIS_ERR)
                return REDIS_ERR;
        } while (!wdone);

        /* Read until there is a reply */
        do {
            if (redisBufferRead(c) == REDIS_ERR)
                return REDIS_ERR;
            if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
                return REDIS_ERR;
        } while (aux == NULL);
    }

    /* Set reply object */
    if (reply != NULL) *reply = aux;
    return REDIS_OK;
}
int redisBufferWrite(redisContext *c, int *done) {
    int nwritten;

    /* Return early when the context has seen an error. */
    if (c->err)
        return REDIS_ERR;

    if (sdslen(c->obuf) > 0) {
        nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
        if (nwritten == -1) {
            // errno==35 || errno == 4 这不是个错误,继续执行。注意到client是阻塞模式。
            if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
                /* Try again later */
            } else {
                __redisSetError(c,REDIS_ERR_IO,NULL);
                return REDIS_ERR;
            }
        } else if (nwritten > 0) {
            if (nwritten == (signed)sdslen(c->obuf)) {
                sdsfree(c->obuf);
                c->obuf = sdsempty();
            } else {
                sdsrange(c->obuf,nwritten,-1);
            }
        }
    }
    if (done != NULL) *done = (sdslen(c->obuf) == 0);
    return REDIS_OK;
}

读取server的数据代码如:

/* Use this function to handle a read event on the descriptor. It will try
 * and read some bytes from the socket and feed them to the reply parser.
 *
 * After this function is called, you may use redisContextReadReply to
 * see if there is a reply available. */
int redisBufferRead(redisContext *c) {
    char buf[1024*16];
    int nread;

    /* Return early when the context has seen an error. */
    if (c->err)
        return REDIS_ERR;

    nread = read(c->fd,buf,sizeof(buf));
    if (nread == -1) {
        if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
            /* Try again later */
        } else {
            __redisSetError(c,REDIS_ERR_IO,NULL);
            return REDIS_ERR;
        }
    } else if (nread == 0) {
        __redisSetError(c,REDIS_ERR_EOF,"Server closed the connection");
        return REDIS_ERR;
    } else {
        if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
            __redisSetError(c,c->reader->err,c->reader->errstr);
            return REDIS_ERR;
        }
    }
    return REDIS_OK;
}

思考

分析完Redis的kevent之后,联想了下Swoole协程,如代码:

Co\run(function() {
    go(function() {
        Co::sleep(1);
        // do something
    });
});
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!