swoole 协程源码解读 (协程的调度)

Reactor调度

以下代码基于swoole4.4.5-alpha, php7.1.26

那么定时事件什么时候会被执行呢? 这是通过内部的Reactor事件循环去实现的, 下面来看具体实现:

创建协程时会判断reactor是否已经初始化, 没有初始化则会调用activate函数初始化reactor, activate函数大概有这几个步骤:

  • 初始化reactor结构, 注册各种回调函数(读写事件采用对应平台效率最高的多路复用api, 封装成统一的回调函数有助于屏蔽不同api实现细节)
  • 通过php_swoole_register_shutdown_function("Swoole\Event::rshutdown")注册一个在request_shutdown阶段调用的函数(回忆一下php的生命周期, 脚本结束的时候会调用此函数), 实际上事件循环就在这个阶段执行
  • 开启抢占式调度线程(这个后面会说)

    long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
    {
        ...
        if (sw_unlikely(!active))
        {
            activate();
        }
            ...
    }
    
    inline void PHPCoroutine::activate()
    {
        ...
        /* init reactor and register event wait */
        php_swoole_check_reactor();
    
        /* replace interrupt function */
        orig_interrupt_function = zend_interrupt_function; // 保存原来的中断回调函数
        zend_interrupt_function = coro_interrupt_function; // 替换中断函数
    
          // 开启抢占式调度
        if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler)
        {
            /* create a thread to interrupt the coroutine that takes up too much time */
            interrupt_thread_start();
        }
        ...
        active = true;
    }
    
    static sw_inline int php_swoole_check_reactor()
    {
        ...
        if (sw_unlikely(!SwooleG.main_reactor))
        {
            return php_swoole_reactor_init() == SW_OK ? 1 : -1;
        }
        ...
    }
    
    int php_swoole_reactor_init()
    {
            ...
        if (!SwooleG.main_reactor)
        {
            swoole_event_init();
            SwooleG.main_reactor->wait_exit = 1;
                 // 注册rshutdown函数
            php_swoole_register_shutdown_function("Swoole\\Event::rshutdown");
        }
            ...
    }
    
    #define sw_reactor()           (SwooleG.main_reactor)
    #define SW_REACTOR_MAXEVENTS             4096
    
    int swoole_event_init()
    {
        SwooleG.main_reactor = (swReactor *) sw_malloc(sizeof(swReactor));
    
        if (swReactor_create(sw_reactor(), SW_REACTOR_MAXEVENTS) < 0)
        {
            ...
        }
            ...
    }
    
    int swReactor_create(swReactor *reactor, int max_event)
    {
        int ret;
        bzero(reactor, sizeof(swReactor));
    
    #ifdef HAVE_EPOLL
        ret = swReactorEpoll_create(reactor, max_event);
    #elif defined(HAVE_KQUEUE)
        ret = swReactorKqueue_create(reactor, max_event);
    #elif defined(HAVE_POLL)
        ret = swReactorPoll_create(reactor, max_event);
    #else
        ret = swReactorSelect_create(reactor);
    #endif
            ...
        reactor->onTimeout = reactor_timeout; // 有定时器超时时触发的回调
            ...
    
        Socket::init_reactor(reactor);
        ...
    }
    
    int swReactorEpoll_create(swReactor *reactor, int max_event_num)
    {
        ...
        //binding method
        reactor->add = swReactorEpoll_add;
        reactor->set = swReactorEpoll_set;
        reactor->del = swReactorEpoll_del;
        reactor->wait = swReactorEpoll_wait;
        reactor->free = swReactorEpoll_free;
    }

request_shutdown阶段会执行注册的Swoole\Event::rshutdown函数, swoole_event_rshutdown会执行之前注册的wait函数:

static PHP_FUNCTION(swoole_event_rshutdown)
{
    /* prevent the program from jumping out of the rshutdown */
    zend_try
    {
        PHP_FN(swoole_event_wait)(INTERNAL_FUNCTION_PARAM_PASSTHRU);
    }
    zend_end_try();
}

int swoole_event_wait()
{
    int retval = sw_reactor()->wait(sw_reactor(), NULL);
    swoole_event_free();
    return retval;
}

我们再来看看定时事件的注册, 首先会初始化timer:

int System::sleep(double sec)
{
    Coroutine* co = Coroutine::get_current_safe(); // 获取当前coroutine
    if (swoole_timer_add((long) (sec * 1000), SW_FALSE, sleep_timeout, co) == NULL)
    {
        ...
    }
}

swTimer_node* swoole_timer_add(long ms, uchar persistent, swTimerCallback callback, void *private_data)
{
    return swTimer_add(sw_timer(), ms, persistent, private_data, callback);
}

swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
{
        if (sw_unlikely(!timer->initialized))
    {
        if (sw_unlikely(swTimer_init(timer, _msec) != SW_OK)) // 初始化timer
        {
            return NULL;
        }
    }
        ...
}

static int swTimer_init(swTimer *timer, long msec)
{
      ...
    timer->heap = swHeap_new(1024, SW_MIN_HEAP); // 初始化最小堆
    timer->map = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL);
    timer->_current_id = -1; // 当前定时器id
    timer->_next_msec = msec; // 定时器里最短的超时时间
    timer->_next_id = 1;
    timer->round = 0;
    ret = swReactorTimer_init(SwooleG.main_reactor, timer, msec);
    ...
}

static int swReactorTimer_init(swReactor *reactor, swTimer *timer, long exec_msec)
{
    reactor->check_timer = SW_TRUE;
    reactor->timeout_msec = exec_msec; // 定时器里最短的超时时间
    reactor->timer = timer;
    timer->reactor = reactor;
    timer->set = swReactorTimer_set;
    timer->close = swReactorTimer_close;
        ...
}

接着是添加事件, 需要注意的是:

  • time._next_msec和reactor.timeout_msec一直保持所有计时器里最短的超时时间(相对值)
  • tnode.exec_msec和tnode用最小堆来保存, 这样一来堆顶的元素就是最早超时的元素

    swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
    {
        swTimer_node *tnode = sw_malloc(sizeof(swTimer_node));
    
        int64_t now_msec = swTimer_get_relative_msec();
    
        tnode->data = data;
        tnode->type = SW_TIMER_TYPE_KERNEL;
        tnode->exec_msec = now_msec + _msec; // 绝对时间
        tnode->interval = interval ? _msec : 0; // 是否需要一直调用
        tnode->removed = 0;
        tnode->callback = callback;
        tnode->round = timer->round;
        tnode->dtor = NULL;
    
        if (timer->_next_msec < 0 || timer->_next_msec > _msec) // 必要时更新, 始终保持最小超时时间
        {
            timer->set(timer, _msec);
            timer->_next_msec = _msec;
        }
    
        tnode->id = timer->_next_id++;
    
        tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode); // 放入堆, priority = tnode->exec_msec
        if (sw_unlikely(swHashMap_add_int(timer->map, tnode->id, tnode) != SW_OK)) // hashmap保存tnodeid和tnode映射关系
        {
            ...
        }
        ...
    }

定时时间注册完就可以等待被事件循环执行了, 我们以epoll为例:

使用epoll_wait等待fd读写事件, 传入reactor->timeout_msec, 等待fd事件到来

  • 如果epoll_wait超时时还未获取到任何fd读写事件, 执行onTimeout函数, 处理定时事件
  • 有fd事件则处理fd读写事件, 处理完这次所以触发的事件后, 进入下一次循环

    static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
    {
        ...
        reactor->running = 1;
        reactor->start = 1;
    
        while (reactor->running > 0)
        {
            ...
            n = epoll_wait(epoll_fd, events, max_event_num, reactor->timeout_msec);
            if (n < 0)
            {
                            ...
                // 错误处理
            }
            else if (n == 0)
            {
                reactor->onTimeout(reactor);
            }
            for (i = 0; i < n; i++)
            {
                            ...
                            // fd读写事件处理
            }
                    ...
        }
        return 0;
    }

如果这期间没有任何fd事件, 定时事件会被执行, onTimeout是之前已经注册过的函数reactor_timeout, swTimer_select函数会把当前所以已经到期的事件执行完再退出循环, 执行到上文我们注册的sleep_timeout函数时, 就会唤醒因为sleep休眠的协程继续执行:

static void reactor_timeout(swReactor *reactor)
{
    reactor_finish(reactor);
        ...
}

static void reactor_finish(swReactor *reactor)
{
    //check timer
    if (reactor->check_timer)
    {
        swTimer_select(reactor->timer);
    }
        ...
    //the event loop is empty
    if (reactor->wait_exit && reactor->is_empty(reactor)) // 没有任务了, 退出循环
    {
        reactor->running = 0;
    }
}

int swTimer_select(swTimer *timer)
{
    int64_t now_msec = swTimer_get_relative_msec(); // 当前时间

    while ((tmp = swHeap_top(timer->heap))) // 获取最早到期的事件
    {
        tnode = tmp->data;
        if (tnode->exec_msec > now_msec) // 未到时间
        {
            break;
        }

                if (!tnode->removed)
        {
                        tnode->callback(timer, tnode); // 执行定时事件注册的回调函数
        }

        timer->num--;
        swHeap_pop(timer->heap);
        swHashMap_del_int(timer->map, tnode->id);
    }
       ...
}

到这里, 整个流程都已经介绍完了, 总结一下:

  • 在没有主动干预协程调度的情况下, 协程都是在执行IO/定时事件时主动让出, 注册对应事件, 然后通过request_shutdown阶段里的事件循环等待事件到来, 触发协程的resume, 达到多协程并发的效果
  • IO/定时事件不一定准时

抢占式调度

通过上面我们可以知道, 如果协程里没有任何IO/定时事件, 实际上协程是没有切换时机的, 对于CPU密集型的场景,一些协程会因为得不到CPU时间片被饿死, Swoole 4.4引入了抢占式调度就是为了解决这个问题.

vm interrupt是php7.1.0后引入的执行机制, swoole就是使用这个特性实现的抢占式调度:

  • ZEND_VM_INTERRUPT_CHECK会在指令是jumpcall的时候执行
  • ZEND_VM_INTERRUPT_CHECK会检查EG(vm_interrupt)这个标志位, 如果为1, 则触发zend_interrupt_function的执行

    // php 7.1.26 src
    #define ZEND_VM_INTERRUPT_CHECK() do { \
        if (UNEXPECTED(EG(vm_interrupt))) { \
                ZEND_VM_INTERRUPT(); \
          } \
    } while (0)
    
    #define ZEND_VM_INTERRUPT()      ZEND_VM_TAIL_CALL(zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS_PASSTHRU));
    
    static ZEND_OPCODE_HANDLER_RET ZEND_FASTCALL zend_interrupt_helper_SPEC(ZEND_OPCODE_HANDLER_ARGS)
    {
          ...
          EG(vm_interrupt) = 0;
          if (zend_interrupt_function) {
                zend_interrupt_function(execute_data);
          }
    }

下面来看具体实现:
初始化:

  • 保存原来的中断函数, zend_interrupt_function替换成新的中断函数
  • 开启线程执行interrupt_thread_loop
  • interrupt_thread_loop里每隔5ms将EG(vm_interrupt)设置为1

    inline void PHPCoroutine::activate()
    {
        ...
        /* replace interrupt function */
        orig_interrupt_function = zend_interrupt_function; // 保存原来的中断回调函数
        zend_interrupt_function = coro_interrupt_function; // 替换中断函数
    
          // 开启抢占式调度
        if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler) // 配置要开启enable_preemptive_scheduler选项
        {
            /* create a thread to interrupt the coroutine that takes up too much time */
            interrupt_thread_start();
        }
    }
    
    void PHPCoroutine::interrupt_thread_start()
    {
        zend_vm_interrupt = &EG(vm_interrupt);
        interrupt_thread_running = true;
        if (pthread_create(&interrupt_thread_id, NULL, (void * (*)(void *)) interrupt_thread_loop, NULL) < 0)
        {
            ...
        }
    }
    
    static const uint8_t MAX_EXEC_MSEC = 10;
    void PHPCoroutine::interrupt_thread_loop()
    {
        static const useconds_t interval = (MAX_EXEC_MSEC / 2) * 1000;
        while (interrupt_thread_running)
        {
            *zend_vm_interrupt = 1; // EG(vm_interrupt) = 1
            usleep(interval); // 休眠5ms
        }
        pthread_exit(0);
    }

中断函数coro_interrupt_function会检查当前的协程是否可调度(距离上一次切换时间超过10ms), 如果可以, 直接让出当前协程, 完成抢占调度

static void coro_interrupt_function(zend_execute_data *execute_data)
{
    php_coro_task *task = PHPCoroutine::get_task();
    if (task && task->co && PHPCoroutine::is_schedulable(task))
    {
        task->co->yield(); // 让出当前协程
    }
    if (orig_interrupt_function)
    {
        orig_interrupt_function(execute_data); // 执行原有的中断函数
    }
}

static const uint8_t MAX_EXEC_MSEC = 10;
static inline bool is_schedulable(php_coro_task *task)
{
        // enable_scheduler属性为1并且已经连续执行超过10ms了
    return task->enable_scheduler && (swTimer_get_absolute_msec() - task->last_msec > MAX_EXEC_MSEC); 
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!