swoole 协程源码解读 (协程的调度)
Reactor调度
以下代码基于swoole4.4.5-alpha, php7.1.26
那么定时事件什么时候会被执行呢? 这是通过内部的Reactor事件循环去实现的, 下面来看具体实现:
创建协程时会判断reactor是否已经初始化, 没有初始化则会调用activate函数初始化reactor, activate函数大概有这几个步骤:
- 初始化reactor结构, 注册各种回调函数(读写事件采用对应平台效率最高的多路复用api, 封装成统一的回调函数有助于屏蔽不同api实现细节)
- 通过php_swoole_register_shutdown_function("Swoole\Event::rshutdown")注册一个在request_shutdown阶段调用的函数(回忆一下php的生命周期, 脚本结束的时候会调用此函数), 实际上事件循环就在这个阶段执行
-
开启抢占式调度线程(这个后面会说)
long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv) { ... if (sw_unlikely(!active)) { activate(); } ... } inline void PHPCoroutine::activate() { ... /* init reactor and register event wait */ php_swoole_check_reactor(); /* replace interrupt function */ orig_interrupt_function = zend_interrupt_function; // 保存原来的中断回调函数 zend_interrupt_function = coro_interrupt_function; // 替换中断函数 // 开启抢占式调度 if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler) { /* create a thread to interrupt the coroutine that takes up too much time */ interrupt_thread_start(); } ... active = true; } static sw_inline int php_swoole_check_reactor() { ... if (sw_unlikely(!SwooleG.main_reactor)) { return php_swoole_reactor_init() == SW_OK ? 1 : -1; } ... } int php_swoole_reactor_init() { ... if (!SwooleG.main_reactor) { swoole_event_init(); SwooleG.main_reactor->wait_exit = 1; // 注册rshutdown函数 php_swoole_register_shutdown_function("Swoole\\Event::rshutdown"); } ... } #define sw_reactor() (SwooleG.main_reactor) #define SW_REACTOR_MAXEVENTS 4096 int swoole_event_init() { SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor)); if (swReactor_create(sw_reactor(), SW_REACTOR_MAXEVENTS) < 0) { ... } ... } int swReactor_create(swReactor *reactor, int max_event) { int ret; bzero(reactor, sizeof(swReactor)); #ifdef HAVE_EPOLL ret = swReactorEpoll_create(reactor, max_event); #elif defined(HAVE_KQUEUE) ret = swReactorKqueue_create(reactor, max_event); #elif defined(HAVE_POLL) ret = swReactorPoll_create(reactor, max_event); #else ret = swReactorSelect_create(reactor); #endif ... reactor->onTimeout = reactor_timeout; // 有定时器超时时触发的回调 ... Socket::init_reactor(reactor); ... } int swReactorEpoll_create(swReactor *reactor, int max_event_num) { ... //binding method reactor->add = swReactorEpoll_add; reactor->set = swReactorEpoll_set; reactor->del = swReactorEpoll_del; reactor->wait = swReactorEpoll_wait; reactor->free = swReactorEpoll_free; }
request_shutdown阶段会执行注册的Swoole\Event::rshutdown函数, swoole_event_rshutdown会执行之前注册的wait函数:
static PHP_FUNCTION(swoole_event_rshutdown)
{
/* prevent the program from jumping out of the rshutdown */
zend_try
{
PHP_FN(swoole_event_wait)(INTERNAL_FUNCTION_PARAM_PASSTHRU);
}
zend_end_try();
}
int swoole_event_wait()
{
int retval = sw_reactor()->wait(sw_reactor(), NULL);
swoole_event_free();
return retval;
}
我们再来看看定时事件的注册, 首先会初始化timer:
int System::sleep(double sec)
{
Coroutine* co = Coroutine::get_current_safe(); // 获取当前coroutine
if (swoole_timer_add((long) (sec * 1000), SW_FALSE, sleep_timeout, co) == NULL)
{
...
}
}
swTimer_node* swoole_timer_add(long ms, uchar persistent, swTimerCallback callback, void *private_data)
{
return swTimer_add(sw_timer(), ms, persistent, private_data, callback);
}
swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
{
if (sw_unlikely(!timer->initialized))
{
if (sw_unlikely(swTimer_init(timer, _msec) != SW_OK)) // 初始化timer
{
return NULL;
}
}
...
}
static int swTimer_init(swTimer *timer, long msec)
{
...
timer->heap = swHeap_new(1024, SW_MIN_HEAP); // 初始化最小堆
timer->map = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL);
timer->_current_id = -1; // 当前定时器id
timer->_next_msec = msec; // 定时器里最短的超时时间
timer->_next_id = 1;
timer->round = 0;
ret = swReactorTimer_init(SwooleG.main_reactor, timer, msec);
...
}
static int swReactorTimer_init(swReactor *reactor, swTimer *timer, long exec_msec)
{
reactor->check_timer = SW_TRUE;
reactor->timeout_msec = exec_msec; // 定时器里最短的超时时间
reactor->timer = timer;
timer->reactor = reactor;
timer->set = swReactorTimer_set;
timer->close = swReactorTimer_close;
...
}
接着是添加事件, 需要注意的是:
- time._next_msec和reactor.timeout_msec一直保持所有计时器里最短的超时时间(相对值)
-
tnode.exec_msec和tnode用最小堆来保存, 这样一来堆顶的元素就是最早超时的元素
swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback) { swTimer_node *tnode = sw_malloc(sizeof(swTimer_node)); int64_t now_msec = swTimer_get_relative_msec(); tnode->data = data; tnode->type = SW_TIMER_TYPE_KERNEL; tnode->exec_msec = now_msec + _msec; // 绝对时间 tnode->interval = interval ? _msec : 0; // 是否需要一直调用 tnode->removed = 0; tnode->callback = callback; tnode->round = timer->round; tnode->dtor = NULL; if (timer->_next_msec < 0 || timer->_next_msec > _msec) // 必要时更新, 始终保持最小超时时间 { timer->set(timer, _msec); timer->_next_msec = _msec; } tnode->id = timer->_next_id++; tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode); // 放入堆, priority = tnode->exec_msec if (sw_unlikely(swHashMap_add_int(timer->map, tnode->id, tnode) != SW_OK)) // hashmap保存tnodeid和tnode映射关系 { ... } ... }
定时时间注册完就可以等待被事件循环执行了, 我们以epoll为例:
使用epoll_wait等待fd读写事件, 传入reactor->timeout_msec, 等待fd事件到来
- 如果epoll_wait超时时还未获取到任何fd读写事件, 执行onTimeout函数, 处理定时事件
-
有fd事件则处理fd读写事件, 处理完这次所以触发的事件后, 进入下一次循环
static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo) { ... reactor->running = 1; reactor->start = 1; while (reactor->running > 0) { ... n = epoll_wait(epoll_fd, events, max_event_num, reactor->timeout_msec); if (n < 0) { ... // 错误处理 } else if (n == 0) { reactor->onTimeout(reactor); } for (i = 0; i < n; i++) { ... // fd读写事件处理 } ... } return 0; }
如果这期间没有任何fd事件, 定时事件会被执行, onTimeout是之前已经注册过的函数reactor_timeout, swTimer_select函数会把当前所以已经到期的事件执行完再退出循环, 执行到上文我们注册的sleep_timeout函数时, 就会唤醒因为sleep休眠的协程继续执行:
static void reactor_timeout(swReactor *reactor)
{
reactor_finish(reactor);
...
}
static void reactor_finish(swReactor *reactor)
{
//check timer
if (reactor->check_timer)
{
swTimer_select(reactor->timer);
}
...
//the event loop is empty
if (reactor->wait_exit && reactor->is_empty(reactor)) // 没有任务了, 退出循环
{
reactor->running = 0;
}
}
int swTimer_select(swTimer *timer)
{
int64_t now_msec = swTimer_get_relative_msec(); // 当前时间
while ((tmp = swHeap_top(timer->heap))) // 获取最早到期的事件
{
tnode = tmp->data;
if (tnode->exec_msec > now_msec) // 未到时间
{
break;
}
if (!tnode->removed)
{
tnode->callback(timer, tnode); // 执行定时事件注册的回调函数
}
timer->num--;
swHeap_pop(timer->heap);
swHashMap_del_int(timer->map, tnode->id);
}
...
}
到这里, 整个流程都已经介绍完了, 总结一下:
- 在没有主动干预协程调度的情况下, 协程都是在执行IO/定时事件时主动让出, 注册对应事件, 然后通过request_shutdown阶段里的事件循环等待事件到来, 触发协程的resume, 达到多协程并发的效果
- IO/定时事件不一定准时
抢占式调度
通过上面我们可以知道, 如果协程里没有任何IO/定时事件, 实际上协程是没有切换时机的, 对于CPU密集型的场景,一些协程会因为得不到CPU时间片被饿死, Swoole 4.4引入了抢占式调度就是为了解决这个问题.
vm interrupt是php7.1.0后引入的执行机制, swoole就是使用这个特性实现的抢占式调度:
- ZEND_VM_INTERRUPT_CHECK会在指令是jump和call的时候执行
-
ZEND_VM_INTERRUPT_CHECK会检查EG(vm_interrupt)这个标志位, 如果为1, 则触发zend_interrupt_function的执行
// php 7.1.26 src #define ZEND_VM_INTERRUPT_CHECK() do { \ if (UNEXPECTED(EG(vm_interrupt))) { \ ZEND_VM_INTERRUPT(); \ } \ } while (0) #define ZEND_VM_INTERRUPT() ZEND_VM_TAIL_CALL(zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS_PASSTHRU)); static ZEND_OPCODE_HANDLER_RET ZEND_FASTCALL zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS) { ... EG(vm_interrupt) = 0; if (zend_interrupt_function) { zend_interrupt_function(execute_data); } }
下面来看具体实现:
初始化:
- 保存原来的中断函数, zend_interrupt_function替换成新的中断函数
- 开启线程执行interrupt_thread_loop
-
interrupt_thread_loop里每隔5ms将EG(vm_interrupt)设置为1
inline void PHPCoroutine::activate() { ... /* replace interrupt function */ orig_interrupt_function = zend_interrupt_function; // 保存原来的中断回调函数 zend_interrupt_function = coro_interrupt_function; // 替换中断函数 // 开启抢占式调度 if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler) // 配置要开启enable_preemptive_scheduler选项 { /* create a thread to interrupt the coroutine that takes up too much time */ interrupt_thread_start(); } } void PHPCoroutine::interrupt_thread_start() { zend_vm_interrupt = &EG(vm_interrupt); interrupt_thread_running = true; if (pthread_create(&interrupt_thread_id, NULL, (void * (*)(void *)) interrupt_thread_loop, NULL) < 0) { ... } } static const uint8_t MAX_EXEC_MSEC = 10; void PHPCoroutine::interrupt_thread_loop() { static const useconds_t interval = (MAX_EXEC_MSEC / 2) * 1000; while (interrupt_thread_running) { *zend_vm_interrupt = 1; // EG(vm_interrupt) = 1 usleep(interval); // 休眠5ms } pthread_exit(0); }
中断函数coro_interrupt_function会检查当前的协程是否可调度(距离上一次切换时间超过10ms), 如果可以, 直接让出当前协程, 完成抢占调度
static void coro_interrupt_function(zend_execute_data *execute_data)
{
php_coro_task *task = PHPCoroutine::get_task();
if (task && task->co && PHPCoroutine::is_schedulable(task))
{
task->co->yield(); // 让出当前协程
}
if (orig_interrupt_function)
{
orig_interrupt_function(execute_data); // 执行原有的中断函数
}
}
static const uint8_t MAX_EXEC_MSEC = 10;
static inline bool is_schedulable(php_coro_task *task)
{
// enable_scheduler属性为1并且已经连续执行超过10ms了
return task->enable_scheduler && (swTimer_get_absolute_msec() - task->last_msec > MAX_EXEC_MSEC);
}
本作品采用《CC 协议》,转载必须注明作者和本文链接