Laravel Schedule(任务调度)源码分析

基于laravel10分析
基于schedule:run命令分析
我们知道在 App\Console\Kernel 类的 schedule 方法中定义调度任务
但是他是怎么被触发执行的呢
通过运行schedule:run命令
会执行Illuminate\Console\Scheduling\ScheduleRunCommand类的handle方法
会先初始化类,在这个类的构造函数__construct中定义了一个开始时间,这个用途是检测这个时间是否存在可执行的任务

public function __construct()
{
        $this->startedAt = Date::now();

        parent::__construct();
 }

现在我们来分析一下handle方法做了什么事情
我们可以看到在handle方法参数,这四个参数都是通过依赖注入进来的
Illuminate\Console\Scheduling\Schedule $schedule 这个是计划任务对象(这个是我们主要分析的)
Illuminate\Contracts\Events\Dispatcher\Dispatcher $dispatcher 这个是事件调度类象
Illuminate\Contracts\Cache\Repository $cache 这个是缓存对象
Illuminate\Contracts\Debug\ExceptionHandler $handler 这个是异常处理对象

 public function handle(
     Schedule $schedule, 
     Dispatcher $dispatcher,
     Cache $cache, 
     ExceptionHandler $handler
 ) {
     $this->schedule = $schedule;
    $this->dispatcher = $dispatcher;
    $this->cache = $cache;
    $this->handler = $handler;
    //php二进制可执行文件
    $this->phpBinary = Application::phpBinary();
     ...
 }

我们来主要分析一下Illuminate\Console\Scheduling\Schedule $schedule计划任务对象
我们可以找到他是一个单例绑定
Illuminate\Foundation\Console\Kernel类的defineConsoleSchedule方法中定义了单例绑定

protected function defineConsoleSchedule()
{
        $this->app->singleton(Schedule::class, function ($app) {
            return tap(new Schedule($this->scheduleTimezone()), function ($schedule) {
                $this->schedule($schedule->useCache($this->scheduleCache()));
            });
        });
 }

app容器去解析Illuminate\Console\Scheduling\Schedule类的时候,先会去找容器中是否有注册,就会触发这个闭包 , 初始化Schedule 对象,初始化对象后 会触发这个里面的闭包,就会调用这个对象的schedule方法,这个方法就是我们定义任务调度的地方,所以我们定义的任务就会被放到schedule对象的events属性数组中
Schedule类的构造函数__contruct中,有定义两种互斥锁 eventMutexschedulingMutex
eventMutex 这个是单个任务检测重复,例如一个任务是一分钟执行一次,但是可能有时候执行比较久,下一分钟到的时候,上一分钟的任务还没执行完,但是需要等这次任务执行完,才能执行下一次的任务
schedulingMutex这个是用于多台服务器之间同时运行任务,但是只能同一时间只能有一台服务器把这个时间点的任务
定义任务有4中方式command exec call job
先看一下exec 这里是组装命令字串,例如php artisan test --ccc=1 --ccc=2
然后统一包装成Illuminate\Console\Scheduling\Event类对象 放到events属性数组中

public function exec(string $command, array $parameters = [])
 {
        if (count($parameters)) {
            $command .= ' '.$this->compileParameters($parameters);
        }

        $this->events[] = $event = new Event($this->eventMutex, $command, $this->timezone);

        return $event;
}

再来看command方法,可以这个方法里面最终是调用了exec方法,中间是组装command命令字串传入到exec方法中,所以command方法注册的任务也是Illuminate\Console\Scheduling\Event类对象

public function command($command, array $parameters = [])
 {
        if (class_exists($command)) {
            $command = Container::getInstance()->make($command);

            return $this->exec(
                Application::formatCommandString($command->getName()), $parameters,
            )->description($command->getDescription());
        }

        return $this->exec(
            Application::formatCommandString($command), $parameters
        );
 }

call方法
统一包装成Illuminate\Console\Scheduling\CallbackEvent类对象 放到events属性数组中
CallbackEvent有一个父类就是Event,然后CallbackEvent对象包装的任务不能使用runInBackground(后台执行)方法,因为后台执行的原理是通过命令行去运行命令去执行,但是闭包没有命令,所以不能使用runInBackground方法,
所以尽量不要在call方法中执行时间

public function call($callback, array $parameters = [])
{
        $this->events[] = $event = new CallbackEvent(
            $this->eventMutex, $callback, $parameters, $this->timezone
        );

        return $event;
}

再来看一下job方法,就是把job封装成闭包,调用call方法

public function job($job, $queue = null, $connection = null)
{
        return $this->call(function () use ($job, $queue, $connection) {
            $job = is_string($job) ? Container::getInstance()->make($job) : $job;

            if ($job instanceof ShouldQueue) {
                $this->dispatchToQueue($job, $queue ?? $job->queue, $connection ?? $job->connection);
            } else {
                $this->dispatchNow($job);
            }
        })->name(is_string($job) ? $job : get_class($job));
 }

我们再回到ScheduleRunCommand类的handle方法,看一下这个方法后续做了什么处理

public function handle(...)
{
    ...
    //这里是清除中断信号
    $this->clearInterruptSignal();
    //换行
    $this->newLine();
    //这里是获取到期可执行的任务
    $events = $this->schedule->dueEvents($this->laravel);
    ...
}

我们来看一下Illuminate\Console\Scheduling\Schedule类的dueEvents方法做了什么处理
这里是通过filter来过滤掉为false的

public function dueEvents($app)
{
        return collect($this->events)->filter->isDue($app);
}

看一下Illuminate\Console\Scheduling\Event类中的isDue方法做了什么处理

public function isDue($app)
{
        //如果没有设置在维护模式下运行且现在是维护模式
        if (! $this->runsInMaintenanceMode() && $app->isDownForMaintenance()) {
            return false;
        }
        //检测时间是否到期且是在指定的环境中
        return $this->expressionPasses() &&
               $this->runsInEnvironment($app->environment());
}

我们再回到ScheduleRunCommand类的handle方法,看一下这个方法后续做了什么处理

public function handle(...)
{
    ...
    foreach ($events as $event) {
        //这里是检测任务是否会跳过
        if (!①$event->filtersPass($this->laravel)) {
            //触发任务跳过事件
            $this->dispatcher->dispatch(new ScheduledTaskSkipped($event));

            continue;
        }
        //判断是否只允许在一台服务上运行
        if ($event->onOneServer) {
            ②$this->runSingleServerEvent($event);
        } else {
            $this->runEvent($event);
        }

        $this->eventsRan = true;
    }
    ...
}

①看一下Illuminate\Console\Scheduling\Event类中的filtersPass方法做了什么处理

public function filtersPass($app)
 {        
         //设置最后一次检测时间
        $this->lastChecked = Date::now();
        //filters属性就是通过when方法放入的闭包
        foreach ($this->filters as $callback) {
            if (! $app->call($callback)) {
                return false;
            }
        }
        //rejects属性就是通过skip方法来放入的闭包
        foreach ($this->rejects as $callback) {
            if ($app->call($callback)) {
                return false;
            }
        }

        return true;
 }

②看一下ScheduleRunCommand类的runSingleServerEvent方法做了什么处理

protected function runSingleServerEvent($event)
{        
        if (③$this->schedule->serverShouldRun($event, $this->startedAt)) {
               ④$this->runEvent($event);
        } else {
            $this->components->info(sprintf(
                'Skipping [%s], as command already run on another server.', $event->getSummaryForDisplay()
            ));
        }
}

③看一下Illuminate\Console\Scheduling\Schedule类的serverShouldRun方法做了什么处理
如果返回true则设置成功了 否则就是已经被其他服务器先执行了

public function serverShouldRun(Event $event, DateTimeInterface $time)
 {        
         //这里就是去创建锁 赋值到mutexCache数组中 
        return $this->mutexCache[$event->mutexName()] ??= $this->schedulingMutex->create($event, $time);
}

④看一下ScheduleRunCommand类的runEvent方法做了什么处理

protected function runEvent($event)
    {
        //获取要显示的事件摘要
        $summary = $event->getSummaryForDisplay();

        //拿到命令 因为闭包没有命名 所以直接那摘要
        $command = $event instanceof CallbackEvent
            ? $summary
            : trim(str_replace($this->phpBinary, '', $event->command));

        $description = sprintf(
            '<fg=gray>%s</> Running [%s]%s',
            Carbon::now()->format('Y-m-d H:i:s'),
            $command,
            $event->runInBackground ? ' in background' : '',
        );
        //输出到终端 执行闭包任务
        $this->components->task($description, function () use ($event) {
            //任务开始执行事件
            $this->dispatcher->dispatch(new ScheduledTaskStarting($event));
            开始时间
            $start = microtime(true);

            try {
                //执行    
                ⑤$event->run($this->laravel);
                //任务完成后事件  如果是后台任务只要丢到后台后就算完成了
                $this->dispatcher->dispatch(new ScheduledTaskFinished(
                    $event,
                    round(microtime(true) - $start, 2)
                ));
                //用于判断是否至少有一个任务运行
                $this->eventsRan = true;
            } catch (Throwable $e) {
                //任务失败后事件
                $this->dispatcher->dispatch(new ScheduledTaskFailed($event, $e));
                //记录日志
                $this->handler->report($e);
            }
            //判断是否没有异常
            return $event->exitCode == 0;
        });

        if (! $event instanceof CallbackEvent) {
            //输出命令到终端
            $this->components->bulletList([
                $event->getSummaryForDisplay(),
            ]);
        }
}

⑤看一下Illuminate\Console\Scheduling\Event类中的run方法做了什么处理

public function run(Container $container)
{
        //检测任务是否应该跳过 任务重复
        if (⑥$this->shouldSkipDueToOverlapping()) {
            return;
        }
        //开始执行
        ⑦$exitCode = $this->start($container);
        //如果任务不是后台运行
        if (! $this->runInBackground) {
            //完成处理
            ⑧$this->finish($container, $exitCode);
        }
}

看一下Illuminate\Console\Scheduling\CallbackEvent类的run方法做了什么处理

public function run(Container $container)
{
        parent::run($container);
        //如果是闭包任务 存在异常会抛出异常
        if ($this->exception) {
            throw $this->exception;
        }

        return $this->result;
}

⑥看一下Illuminate\Console\Scheduling\Event类的shouldSkipDueToOverlapping方法做了什么处理

public function shouldSkipDueToOverlapping()
{
        //设置不能重复执行 且能拿到锁
        return $this->withoutOverlapping && ! $this->mutex->create($this);
}

⑦看一下Illuminate\Console\Scheduling\Event类的start方法做了什么处理

protected function start($container)
{
        try {
            //运行前回调闭包 对应before方法
            $this->callBeforeCallbacks($container);return $this->execute($container);
        } catch (Throwable $exception) {
            //异常释放锁
            $this->removeMutex();

            throw $exception;
        }
}

⑧看一下Illuminate\Console\Scheduling\Event类的finish方法做了什么处理

public function finish(Container $container, $exitCode)
    {
        $this->exitCode = (int) $exitCode;

        try {
            //这里就是去执行after方法放入的闭包
            $this->callAfterCallbacks($container);
        } finally {
            //释放锁 这里只是释放单个任务检测重复的锁
            $this->removeMutex();
        }
    }

⑨看一下Illuminate\Console\Scheduling\Event类的execute方法做了什么处理
这里就是去执行命令

protected function execute($container)
{
        return Process::fromShellCommandline(
            $this->buildCommand(), base_path(), null, null, null
        )->run();
}

看一下Illuminate\Console\Scheduling\CallbackEvent类的execute方法做了什么处理

protected function execute($container)
 {
        try {
            $this->result = is_object($this->callback)
                ? $container->call([$this->callback, '__invoke'], $this->parameters)
                : $container->call($this->callback, $this->parameters);

            return $this->result === false ? 1 : 0;
        } catch (Throwable $e) {
            $this->exception = $e;

            return 1;
        }
 }

我们再回到ScheduleRunCommand类的handle方法,看一下这个方法后续做了什么处理

public function handle(...)
{    
    ...
    //判断任务事件是否已配置为每分钟重复多次(可以做到秒级任务)
    if ($events->contains->isRepeatable()) {
        ⑪$this->repeatEvents($events->filter->isRepeatable());
    }
    //这里就是判断如果一个任务都没有执行则
    if (! $this->eventsRan) {
            $this->components->info('No scheduled commands are ready to run.');
        } else {
            $this->newLine();
    }
}

⑪看一下ScheduleRunCommand类的repeatEvents方法做了什么处理

protected function repeatEvents($events)
{
        $hasEnteredMaintenanceMode = false;
        //检测是否小于等于开始时间分钟的最后一秒
        while (Date::now()->lte($this->startedAt->endOfMinute())) {
            foreach ($events as $event) {
                //如果设置了中断
                if ($this->shouldInterrupt()) {
                    return;
                }
                //检测现在是否可以执行
                if (! $event->shouldRepeatNow()) {
                    continue;
                }
                //是否维护模式
                $hasEnteredMaintenanceMode = $hasEnteredMaintenanceMode || $this->laravel->isDownForMaintenance();
                //如果是维护模式且不能在维护模式下运行
                if ($hasEnteredMaintenanceMode && ! $event->runsInMaintenanceMode()) {
                    continue;
                }
                //同上
                if (! $event->filtersPass($this->laravel)) {
                    $this->dispatcher->dispatch(new ScheduledTaskSkipped($event));

                    continue;
                }
                //同上
                if ($event->onOneServer) {
                    $this->runSingleServerEvent($event);
                } else {
                    $this->runEvent($event);
                }

                $this->eventsRan = true;
            }
            //减轻压力
            Sleep::usleep(100000);
        }
}

从上面可以看出 现在定时任务可以支持秒级别的了

最后我们来看一下 后台运行的任务 命令是怎样的
看一下Illuminate\Console\Scheduling\CommandBuilder类的buildBackgroundCommand方法

protected function buildBackgroundCommand(Event $event)
{
        $output = ProcessUtils::escapeArgument($event->output);

        $redirect = $event->shouldAppendOutput ? ' >> ' : ' > ';
        //这里是完成后执行的命令
        $finished = Application::formatCommandString('schedule:finish').' "'.$event->mutexName().'"';

        if (windows_os()) {
            return 'start /b cmd /v:on /c "('.$event->command.' & '.$finished.' ^!ERRORLEVEL^!)'.$redirect.$output.' 2>&1"';
        }
        //$? 这个是拿上一个命令的返回码
        return $this->ensureCorrectUser($event,
            '('.$event->command.$redirect.$output.' 2>&1 ; '.$finished.' "$?") > '
            .ProcessUtils::escapeArgument($event->getDefaultOutput()).' 2>&1 &'
        );
}

看一下schedule:finish这个命令里面做了什么处理

public function handle(Schedule $schedule)
{
        collect($schedule->events())->filter(function ($value) {
            //这里是判断是否是当前任务
            return $value->mutexName() == $this->argument('id');
        })->each(function ($event) {
            //做完成操作 同上的finish
            $event->finish($this->laravel, $this->argument('code'));
            //触发后台任务执行完事件
            $this->laravel->make(Dispatcher::class)->dispatch(new ScheduledBackgroundTaskFinished($event));
        });
}

以上就是事件注册和事件执行的这个流程了,如果有写的有误的地方,请大佬们指正

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 1年前 自动加精
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 3

缺少定时任务运行流程图

1年前 评论
cccdz (楼主) 1年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!