小白谈一谈:命令在redis中是怎么被执行的呢?
当我准备谈谈redis的命令是如何执行时,发现其实在网上已经有很多优秀的文章讲过这类问题了。
推荐看过的一个github上的源码分析:github.com/menwengit/redis_source_...
当然,即使很多人已经讲解了这类问题,可是依然不影响我去写一些自己的所感。因为,读书百遍不如自己写一遍。这也是防止眼高手低。
命令是如何被执行的呢?
以在命令行执行命令redis-cli
为例。这个我们太熟悉了,只要执行redis-cli
就连接redis-server
成功了,接下来就可以执行一些get
或set
命令。再然后就得到了我们要的结果。但是你从来有想过,这些命令的背后到底干了些什么呢?接下里是我翻看源码(版本3.0.7)之后,得到的一些答案。
第一步:启动服务器
文件入口:redis.c
的main
函数
重点看如下的2个方法:
initServer();
aeMain(server.el);
initServer
中初始化的数据很多,其中和本节讨论相关的有:
- 初始化
server
的数据存储结构 - 初始化服务端
socket
- 初始化
EventLoop
初始化server
的数据存储结构是贯穿redis的生命周期的,因为任何操作都会涉及到从redis的数据结构中获取或者设置数据。
初始化服务端socket
:
1)在server的配置文件中,是支持多个socket的配置。也支持IPv4和IPv6。这么做的好处就是redis-server既可以支持本地连接的unix,又支持本地的网络连接127.0.0.1
,还能支持远程的网络连接remote_address:port
等。
2)默认的socket初始化后设置的是非阻塞模式。
初始化EventLoop
:
对于小白的我来说,刚学redis听得最多的就是redis中的事件,之所以redis很快就是因为事件和IO多路复用及单进程等等。听的再多,如果不自己去探究源码终究是
纸上得来终觉浅,绝知此事要躬行
。所以即便是看完了本篇文章依然要去看源代码,每个人的理解都会不一样。我本地是mac,redis底层自动会使用kqueue。以下的分析都是基于kqueue的IO多路复用。
// 初始化 EventLoop对象
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
// 注册时间事件到el对象,回调函数是`serverCron`
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
// 注册网络连接的文件事件:就是服务端socket监听client的连接,并且关联fd和回调函数`acceptTcpHandler`。
for (j = 0; j < server.ipfd_count; j++) {
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL)
}
// 注册本地文件事件:监听本地client的连接并且关联回调函数`acceptUnixHandler`。
aeCreateFileEvent(server.el,server.sofd,AE_READABLE,acceptUnixHandler,NULL);
到这里,将服务端的socket和时间事件及文件事件都简单的拉出来溜溜了。怎么理解kqueue的事件触发呢?联想PHP的socket_select
函数去理解。推荐阅读:www.jianshu.com/p/397449cadc9a
- socket_select函数是很早之前最开始使用的IO多路复用模式,它的工作机制就是在内核态接收到的事件通知给用户态,并且返回目前已准备好的数量。然后切换到用户态之后在循环已经准备好的数量,接收数据。
socket_select($read, $write, $except, $sec, $usec);
- $read是可读事件:将所有可读的事件放到该数组,然后从该数组循环遍历并且读取数组内文件的数据。
- $write是可写事件:将已准备好的可写文件放在这数组,然后循环遍历调用写函数写入数据到该文件。
select的缺点就是循环遍历,并且最大只能支持1024个客户端同时连接。后来升级到了poll,但是poll只是去除了1024的限制。后来直到进化到了epoll,才得以解决select的所有弊端,kqueue和epoll类似。
都知道一个问题:Redis是串行的,那为什么速度还那么快呢?一般我们的回答都是IO多路复用
。那么现在有个问题就是:Redis是如何保证串行的呢?
当你读完Redis串行地处理client的请求时,可能并没有多大感触。但是如果让你写一个基于
多进程
下的server和client通信时,你就会发现如何保证server和client接收到的数据准确可信是很难的问题。这里就不得不赞叹为什么Redis
要用单进程处理了,因为实在是省去了很多并发问题的思考。
多进程下的server
和client
通信问题,看beanstalk的issue:github.com/pheanstalk/pheanstalk/i...
解答这个问题必须再看源代码的时候,注意aeEventLoop
的数据结构:
/* State of an event based program */
// 事件状态的描述
typedef struct aeEventLoop {
// 最大的事件描述符
int maxfd; /* highest file descriptor currently registered */
// 目前追踪的事件数量
int setsize; /* max number of file descriptors tracked */
// 时间事件的下一个待触发的id
long long timeEventNextId;
// 用于检测系统偏差
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件:如服务端socket。是一个array。
aeFileEvent *events; /* Registered events */
// 触发的事件,是一个array。
aeFiredEvent *fired; /* Fired events */
// 时间事件是一个链表。指向了该链表的头指针。
aeTimeEvent *timeEventHead;
// 置为1则结束循环
int stop;
// 万能指针 void*:
// 我的mac是使用的是kqueue,所以保存的数据结构如:
/*typedef struct aeApiState {
int kqfd; //指的是kqueue
struct kevent *events; //提前申请好的setsize的kqueue的大小空间
} aeApiState;*/
void *apidata; /* This is used for polling API specific data */
// 回调函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
先注意到两个属性:aeFileEvent *events
和 aeFiredEvent *fired
,然后对比下ae.c
文件下的aeCreateFileEvent
函数去了解。
// accept:server.el server.ipfd[j] AE_READABLE acceptTcpHandler null
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 已注册的事件,这里是fe是引用的地址。
aeFileEvent *fe = &eventLoop->events[fd];
// 注册kevent事件:readable | writeable
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
//如 accept是READBALE事件
fe->mask |= mask;
// rfileproc和wfileproc分别记录回调函数,和fd关联起来了
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
// ??
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
结合之前的accept
操作:server.el server.ipfd[j] AE_READABLE acceptTcpHandler null,那么得到的如下:
/* File event structure
typedef struct aeFileEvent {
int mask; //one of AE_(READABLE|WRITABLE)
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent; */
aeFileEvent *events[fd1] = {
int mask = AE_READABLE;
aeFileProc *rfileProc = 'acceptTcpHandler';
aeFileProc *wfileProc;
void *clientData = null;
}; /* Registered events */
aeFiredEvent *fired; /* Fired events */
同时aeApiAddEvent
往kevent
注册了文件事件,代码如下:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct kevent ke;
if (mask & AE_READABLE) {
EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
/* kevent参数说明
kq - kqueue() 返回的唯一描述符, 标记着一个内核队列
changes – 需要对kqueue进行修改的事件集合, 此参数就是kevent()对目前kqueue中的事件的操作,比如删除kqueue中已经存在的事件,或者向kqueue中添加新的事件,也就是说,kevent()通过此参数对kqueue的修改
nchanges – 需要修改的事件的个数
events – kevent()会把所有事件存储在events中
nevents – kevent()需要知道存储空间有多大, == 0 : kevent()会立即返回
timeout – 超时控制, = NULL:kevent()会一直等到有关注的事件发生; != NULL:kevent()会等待指定的时间
*/
// 结合select理解:就是添加到了read数组,并且等待数据的读取。
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
if (mask & AE_WRITABLE) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
// 结合select理解:就是添加到了write数组,并且等待数据的写入。
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
return 0;
}
当事件被触发时,那么就会触发执行回调函数acceptTcpHandler
,简练的代码如下:
// 其实就是执行了`accept`函数,并且返回了client的文件描述符cfd。
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 设置成非阻塞模式,且将cfd注册到kevent:
// aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)
acceptCommonHandler(cfd,0);
那么当client请求server发送命令时,就会触发函数readQueryFromClient
,Redis经过对命令一系列的处理之后,经过函数processCommand
执行命令,最终执行的代码如:c->cmd->proc(c);
,这里的函数就是执行命令的函数,如set|get
等。执行完命令之后通过addReply
函数返回给客户端结果。但是并不会立即返回给client,而是将事件注册到了kevent:aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c)
,待。
这里可想可知EventLoop
的属性event
又多加了一个事件。
此时就能得到如下的EventLoop
的部分属性数据:
aeFileEvent
events[fd1] = {
int mask = AE_READABLE;
aeFileProc *rfileProc = 'acceptTcpHandler';
aeFileProc *wfileProc;
void *clientData = null;
}; /* Registered events */
event[fd2] = {
int mask = AE_READABLE;
aeFileProc *rfileProc = 'readQueryFromClient';
aeFileProc *wfileProc;
void *clientData = c;
};
event[fd2] = {
int mask = AE_READABLE;
aeFileProc *rfileProc = 'sendReplyToClient';
aeFileProc *wfileProc;
void *clientData = c;
};
aeFiredEvent *fired; /* Fired events */
以上都只是介绍了事件被触发后的执行过程,但是这些事件是何时被触发的呢?这些事件的触发在main
函数的aeMain
函数中,跟踪代码如:
...
// 和时间事件联合起作用,可能等待至下一个时间事件的触发,也可能等待直到某个事件发生。
// 一旦某个事件被触发,就会将该事件记录到`EventLoop`的`fired`属性。然后异步调用了回调函数。
numevents = aeApiPoll(eventLoop, tvp);
// 这就是串行的根本原因所在:将所有触发的事件先保存到数组`fired`,然后在遍历这些事件依次处理。
// 由于此前已经将fd和回调函数相关联,则在遍历的时候就可以执行回调函数。
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
/* 如socket等待accept,那么就是一个READABLE事件,然后执行的回调函数
* rfileProc=acceptTcpHandler,那么就会执行这个函数 */
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
// 如get命令,就是一个writeable事件
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
看下aeApiPoll
函数:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
if (tvp != NULL) {
struct timespec timeout;
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
// 时间事件存在时,则等待下个时间事件的触发的时间。如果此区间触发了一个文件事件,
// 则优先处理文件事件。时间事件可能就会延迟处理。
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
&timeout);
} else {
// 阻塞等待直到事件的发生
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
NULL);
}
if (retval > 0) {
int j;
numevents = retval;
// 文件事件被触发了,优先执行文件事件
for(j = 0; j < numevents; j++) {
int mask = 0;
struct kevent *e = state->events+j;
// 将kevent监听到的事件类型和eventLoop的mask相关联。
if (e->filter == EVFILT_READ) mask |= AE_READABLE;
if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
// 这就是串行,是一个数组
eventLoop->fired[j].fd = e->ident;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
到这里,在整体的过一遍就是:
- server注册服务端socket的fd到
kevent
,然后kevent就会异步的去监听这个fd,并且将这个fd关联回调函数acceptTcpHandler
。 - kevent监听到了client的连接请求,然后调用了函数
acceptTcpHandler
,然后再把cfd注册到了kevent且关联了回调函数readQueryFromClient
。 - kevent监听到了client的发送数据,然后调用了函数
readQueryFromClient
,再处理请求将cfd注册到了kevent且关联了回调函数sendReplyToClient
。
更具体的描述是:
- 在
initServer
中先初始化server.el
,这是全局公用的。server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
- 通过
aeCreateTimeEvent
注册时间事件。这里只是注册,该函数会立即返回。aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
- 然后将初始化好的服务端Socket也通过
aeCreateFileEvent
注册文件事件。aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL); aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL);
- 然后在
aeMain
函数中,处理时间事件和文件事件。void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; // 事件处理是一个循环 while (!eventLoop->stop) { // 在此触发beforesleep函数。 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
aeProcessEvents
函数会先执行时间事件,在链表timeEventHead
找到最近的时间事件。
```php
/* … 省略上述代码
找到最近的时间事件并且计算下次执行的时间间隔。但是可以看到时间事件的优先级是低于文件事件的。
即使时间事件已经到了该执行的时间,可是也会先调用kevent
函数等待文件事件的发生。不过这里分为两种情况:
1)无时间事件:kevent会一直阻塞等待直到文件事件发生。
2)有时间事件:kevent会等待最近时间事件触发的时间间隙去处理文件事件。如果没有文件事件发生,则处理时间事件。aeApiPoll
函数内,如果有文件事件触发,则将fd
放到aeEventLoop->aeFiredEvent
的fired
数组,并且关联AE_READABLE|AE_WRITABLE。
*/
numevents = aeApiPoll(eventLoop, tvp);
- 循环`aeEventLoop->aeFiredEvent`的`fd`,并且关联起`aeEventLoop->aeFileEvent`的`event`,取出`fd`的对应关系`event[fd]`的操作函数`proc`即之前注册事件时的回调函数,然后开始执行回调函数`proc`。如此所有kevent触发的事件都会被串行放在`fired`数组,然后在依次执行这些`fd`的回调函数。
> 时间事件和文件事件:
时间事件的触发分为定时事件和周期性事件,定时事件触发一次就结束了,而周期性事件会每隔固定的时间就被触发一次,如`serverCron`。
在时间事件触发的时间窗内,就会触发`kevent`阻塞相应的等待时间。在这个时间窗口内,可能就会触发了某个文件事件。如果没有触发文件事件,则会执行时间事件。
[kqueue介绍](https://blog.csdn.net/bytxl/article/details/17526351 "kqueue介绍")
## 第二步:client
分析源文件`redis-cli.c`的`main`函数。
```c
/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
/* Ignore SIGPIPE in interactive mode to force a reconnect */
signal(SIGPIPE, SIG_IGN);
/* Note that in repl mode we don't abort on connection error.
* A new attempt will be performed for every command send. */
// client是阻塞模式
cliConnect(0);
repl();
}
在repl()
的代码继续跟踪到redisGetReply
,再到redisBufferWrite
方法,是写数据到server。
int redisGetReply(redisContext *c, void **reply) {
int wdone = 0;
void *aux = NULL;
/* Try to read pending replies */
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
/* For the blocking context, flush output buffer and read reply */
if (aux == NULL && c->flags & REDIS_BLOCK) {
/* Write until done */
do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR)
return REDIS_ERR;
} while (!wdone);
/* Read until there is a reply */
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
} while (aux == NULL);
}
/* Set reply object */
if (reply != NULL) *reply = aux;
return REDIS_OK;
}
int redisBufferWrite(redisContext *c, int *done) {
int nwritten;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
if (sdslen(c->obuf) > 0) {
nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
if (nwritten == -1) {
// errno==35 || errno == 4 这不是个错误,继续执行。注意到client是阻塞模式。
if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nwritten > 0) {
if (nwritten == (signed)sdslen(c->obuf)) {
sdsfree(c->obuf);
c->obuf = sdsempty();
} else {
sdsrange(c->obuf,nwritten,-1);
}
}
}
if (done != NULL) *done = (sdslen(c->obuf) == 0);
return REDIS_OK;
}
读取server的数据代码如:
/* Use this function to handle a read event on the descriptor. It will try
* and read some bytes from the socket and feed them to the reply parser.
*
* After this function is called, you may use redisContextReadReply to
* see if there is a reply available. */
int redisBufferRead(redisContext *c) {
char buf[1024*16];
int nread;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
nread = read(c->fd,buf,sizeof(buf));
if (nread == -1) {
if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nread == 0) {
__redisSetError(c,REDIS_ERR_EOF,"Server closed the connection");
return REDIS_ERR;
} else {
if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
__redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR;
}
}
return REDIS_OK;
}
思考
分析完Redis的kevent之后,联想了下Swoole
的协程
,如代码:
Co\run(function() {
go(function() {
Co::sleep(1);
// do something
});
});
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: