Laravel Schedule(任务调度)源码分析
基于laravel10
分析
基于schedule:run
命令分析
我们知道在 App\Console\Kernel
类的 schedule
方法中定义调度任务
但是他是怎么被触发执行的呢
通过运行schedule:run
命令
会执行Illuminate\Console\Scheduling\ScheduleRunCommand
类的handle
方法
会先初始化类,在这个类的构造函数__construct
中定义了一个开始时间,这个用途是检测这个时间是否存在可执行的任务
public function __construct()
{
$this->startedAt = Date::now();
parent::__construct();
}
现在我们来分析一下handle
方法做了什么事情
我们可以看到在handle
方法参数,这四个参数都是通过依赖注入进来的Illuminate\Console\Scheduling\Schedule $schedule
这个是计划任务对象(这个是我们主要分析的)Illuminate\Contracts\Events\Dispatcher\Dispatcher $dispatcher
这个是事件调度类象Illuminate\Contracts\Cache\Repository $cache
这个是缓存对象Illuminate\Contracts\Debug\ExceptionHandler $handler
这个是异常处理对象
public function handle(
Schedule $schedule,
Dispatcher $dispatcher,
Cache $cache,
ExceptionHandler $handler
) {
$this->schedule = $schedule;
$this->dispatcher = $dispatcher;
$this->cache = $cache;
$this->handler = $handler;
//php二进制可执行文件
$this->phpBinary = Application::phpBinary();
...
}
我们来主要分析一下Illuminate\Console\Scheduling\Schedule $schedule
计划任务对象
我们可以找到他是一个单例绑定
在Illuminate\Foundation\Console\Kernel
类的defineConsoleSchedule
方法中定义了单例绑定
protected function defineConsoleSchedule()
{
$this->app->singleton(Schedule::class, function ($app) {
return tap(new Schedule($this->scheduleTimezone()), function ($schedule) {
$this->schedule($schedule->useCache($this->scheduleCache()));
});
});
}
app容器去解析Illuminate\Console\Scheduling\Schedule
类的时候,先会去找容器中是否有注册,就会触发这个闭包 , 初始化Schedule
对象,初始化对象后 会触发这个里面的闭包,就会调用这个对象的schedule
方法,这个方法就是我们定义任务调度的地方,所以我们定义的任务就会被放到schedule
对象的events
属性数组中
在Schedule
类的构造函数__contruct
中,有定义两种互斥锁 eventMutex
和schedulingMutex
eventMutex
这个是单个任务检测重复,例如一个任务是一分钟执行一次,但是可能有时候执行比较久,下一分钟到的时候,上一分钟的任务还没执行完,但是需要等这次任务执行完,才能执行下一次的任务schedulingMutex
这个是用于多台服务器之间同时运行任务,但是只能同一时间只能有一台服务器把这个时间点的任务
定义任务有4中方式command
exec
call
job
先看一下exec
这里是组装命令字串,例如php artisan test --ccc=1 --ccc=2
然后统一包装成Illuminate\Console\Scheduling\Event
类对象 放到events
属性数组中
public function exec(string $command, array $parameters = [])
{
if (count($parameters)) {
$command .= ' '.$this->compileParameters($parameters);
}
$this->events[] = $event = new Event($this->eventMutex, $command, $this->timezone);
return $event;
}
再来看command
方法,可以这个方法里面最终是调用了exec
方法,中间是组装command
命令字串传入到exec
方法中,所以command
方法注册的任务也是Illuminate\Console\Scheduling\Event
类对象
public function command($command, array $parameters = [])
{
if (class_exists($command)) {
$command = Container::getInstance()->make($command);
return $this->exec(
Application::formatCommandString($command->getName()), $parameters,
)->description($command->getDescription());
}
return $this->exec(
Application::formatCommandString($command), $parameters
);
}
call
方法
统一包装成Illuminate\Console\Scheduling\CallbackEvent
类对象 放到events
属性数组中CallbackEvent
有一个父类就是Event
,然后CallbackEvent
对象包装的任务不能使用runInBackground(后台执行)
方法,因为后台执行的原理是通过命令行去运行命令去执行,但是闭包没有命令,所以不能使用runInBackground
方法,
所以尽量不要在call
方法中执行时间
public function call($callback, array $parameters = [])
{
$this->events[] = $event = new CallbackEvent(
$this->eventMutex, $callback, $parameters, $this->timezone
);
return $event;
}
再来看一下job
方法,就是把job封装成闭包,调用call
方法
public function job($job, $queue = null, $connection = null)
{
return $this->call(function () use ($job, $queue, $connection) {
$job = is_string($job) ? Container::getInstance()->make($job) : $job;
if ($job instanceof ShouldQueue) {
$this->dispatchToQueue($job, $queue ?? $job->queue, $connection ?? $job->connection);
} else {
$this->dispatchNow($job);
}
})->name(is_string($job) ? $job : get_class($job));
}
我们再回到ScheduleRunCommand
类的handle
方法,看一下这个方法后续做了什么处理
public function handle(...)
{
...
//这里是清除中断信号
$this->clearInterruptSignal();
//换行
$this->newLine();
//这里是获取到期可执行的任务
$events = $this->schedule->dueEvents($this->laravel);
...
}
我们来看一下Illuminate\Console\Scheduling\Schedule
类的dueEvents
方法做了什么处理
这里是通过filter来过滤掉为false的
public function dueEvents($app)
{
return collect($this->events)->filter->isDue($app);
}
看一下Illuminate\Console\Scheduling\Event
类中的isDue
方法做了什么处理
public function isDue($app)
{
//如果没有设置在维护模式下运行且现在是维护模式
if (! $this->runsInMaintenanceMode() && $app->isDownForMaintenance()) {
return false;
}
//检测时间是否到期且是在指定的环境中
return $this->expressionPasses() &&
$this->runsInEnvironment($app->environment());
}
我们再回到ScheduleRunCommand
类的handle
方法,看一下这个方法后续做了什么处理
public function handle(...)
{
...
foreach ($events as $event) {
//这里是检测任务是否会跳过
if (!①$event->filtersPass($this->laravel)) {
//触发任务跳过事件
$this->dispatcher->dispatch(new ScheduledTaskSkipped($event));
continue;
}
//判断是否只允许在一台服务上运行
if ($event->onOneServer) {
②$this->runSingleServerEvent($event);
} else {
$this->runEvent($event);
}
$this->eventsRan = true;
}
...
}
①看一下Illuminate\Console\Scheduling\Event
类中的filtersPass
方法做了什么处理
public function filtersPass($app)
{
//设置最后一次检测时间
$this->lastChecked = Date::now();
//filters属性就是通过when方法放入的闭包
foreach ($this->filters as $callback) {
if (! $app->call($callback)) {
return false;
}
}
//rejects属性就是通过skip方法来放入的闭包
foreach ($this->rejects as $callback) {
if ($app->call($callback)) {
return false;
}
}
return true;
}
②看一下ScheduleRunCommand
类的runSingleServerEvent
方法做了什么处理
protected function runSingleServerEvent($event)
{
if (③$this->schedule->serverShouldRun($event, $this->startedAt)) {
④$this->runEvent($event);
} else {
$this->components->info(sprintf(
'Skipping [%s], as command already run on another server.', $event->getSummaryForDisplay()
));
}
}
③看一下Illuminate\Console\Scheduling\Schedule
类的serverShouldRun
方法做了什么处理
如果返回true则设置成功了 否则就是已经被其他服务器先执行了
public function serverShouldRun(Event $event, DateTimeInterface $time)
{
//这里就是去创建锁 赋值到mutexCache数组中
return $this->mutexCache[$event->mutexName()] ??= $this->schedulingMutex->create($event, $time);
}
④看一下ScheduleRunCommand
类的runEvent
方法做了什么处理
protected function runEvent($event)
{
//获取要显示的事件摘要
$summary = $event->getSummaryForDisplay();
//拿到命令 因为闭包没有命名 所以直接那摘要
$command = $event instanceof CallbackEvent
? $summary
: trim(str_replace($this->phpBinary, '', $event->command));
$description = sprintf(
'<fg=gray>%s</> Running [%s]%s',
Carbon::now()->format('Y-m-d H:i:s'),
$command,
$event->runInBackground ? ' in background' : '',
);
//输出到终端 执行闭包任务
$this->components->task($description, function () use ($event) {
//任务开始执行事件
$this->dispatcher->dispatch(new ScheduledTaskStarting($event));
开始时间
$start = microtime(true);
try {
//执行
⑤$event->run($this->laravel);
//任务完成后事件 如果是后台任务只要丢到后台后就算完成了
$this->dispatcher->dispatch(new ScheduledTaskFinished(
$event,
round(microtime(true) - $start, 2)
));
//用于判断是否至少有一个任务运行
$this->eventsRan = true;
} catch (Throwable $e) {
//任务失败后事件
$this->dispatcher->dispatch(new ScheduledTaskFailed($event, $e));
//记录日志
$this->handler->report($e);
}
//判断是否没有异常
return $event->exitCode == 0;
});
if (! $event instanceof CallbackEvent) {
//输出命令到终端
$this->components->bulletList([
$event->getSummaryForDisplay(),
]);
}
}
⑤看一下Illuminate\Console\Scheduling\Event
类中的run
方法做了什么处理
public function run(Container $container)
{
//检测任务是否应该跳过 任务重复
if (⑥$this->shouldSkipDueToOverlapping()) {
return;
}
//开始执行
⑦$exitCode = $this->start($container);
//如果任务不是后台运行
if (! $this->runInBackground) {
//完成处理
⑧$this->finish($container, $exitCode);
}
}
看一下Illuminate\Console\Scheduling\CallbackEvent
类的run
方法做了什么处理
public function run(Container $container)
{
parent::run($container);
//如果是闭包任务 存在异常会抛出异常
if ($this->exception) {
throw $this->exception;
}
return $this->result;
}
⑥看一下Illuminate\Console\Scheduling\Event
类的shouldSkipDueToOverlapping
方法做了什么处理
public function shouldSkipDueToOverlapping()
{
//设置不能重复执行 且能拿到锁
return $this->withoutOverlapping && ! $this->mutex->create($this);
}
⑦看一下Illuminate\Console\Scheduling\Event
类的start
方法做了什么处理
protected function start($container)
{
try {
//运行前回调闭包 对应before方法
$this->callBeforeCallbacks($container);
⑨return $this->execute($container);
} catch (Throwable $exception) {
//异常释放锁
$this->removeMutex();
throw $exception;
}
}
⑧看一下Illuminate\Console\Scheduling\Event
类的finish
方法做了什么处理
public function finish(Container $container, $exitCode)
{
$this->exitCode = (int) $exitCode;
try {
//这里就是去执行after方法放入的闭包
$this->callAfterCallbacks($container);
} finally {
//释放锁 这里只是释放单个任务检测重复的锁
$this->removeMutex();
}
}
⑨看一下Illuminate\Console\Scheduling\Event
类的execute
方法做了什么处理
这里就是去执行命令
protected function execute($container)
{
return Process::fromShellCommandline(
$this->buildCommand(), base_path(), null, null, null
)->run();
}
看一下Illuminate\Console\Scheduling\CallbackEvent
类的execute
方法做了什么处理
protected function execute($container)
{
try {
$this->result = is_object($this->callback)
? $container->call([$this->callback, '__invoke'], $this->parameters)
: $container->call($this->callback, $this->parameters);
return $this->result === false ? 1 : 0;
} catch (Throwable $e) {
$this->exception = $e;
return 1;
}
}
我们再回到ScheduleRunCommand
类的handle
方法,看一下这个方法后续做了什么处理
public function handle(...)
{
...
//判断任务事件是否已配置为每分钟重复多次(可以做到秒级任务)
if ($events->contains->isRepeatable()) {
⑪$this->repeatEvents($events->filter->isRepeatable());
}
//这里就是判断如果一个任务都没有执行则
if (! $this->eventsRan) {
$this->components->info('No scheduled commands are ready to run.');
} else {
$this->newLine();
}
}
⑪看一下ScheduleRunCommand
类的repeatEvents
方法做了什么处理
protected function repeatEvents($events)
{
$hasEnteredMaintenanceMode = false;
//检测是否小于等于开始时间分钟的最后一秒
while (Date::now()->lte($this->startedAt->endOfMinute())) {
foreach ($events as $event) {
//如果设置了中断
if ($this->shouldInterrupt()) {
return;
}
//检测现在是否可以执行
if (! $event->shouldRepeatNow()) {
continue;
}
//是否维护模式
$hasEnteredMaintenanceMode = $hasEnteredMaintenanceMode || $this->laravel->isDownForMaintenance();
//如果是维护模式且不能在维护模式下运行
if ($hasEnteredMaintenanceMode && ! $event->runsInMaintenanceMode()) {
continue;
}
//同上
if (! $event->filtersPass($this->laravel)) {
$this->dispatcher->dispatch(new ScheduledTaskSkipped($event));
continue;
}
//同上
if ($event->onOneServer) {
$this->runSingleServerEvent($event);
} else {
$this->runEvent($event);
}
$this->eventsRan = true;
}
//减轻压力
Sleep::usleep(100000);
}
}
从上面可以看出 现在定时任务可以支持秒级别的了
最后我们来看一下 后台运行的任务 命令是怎样的
看一下Illuminate\Console\Scheduling\CommandBuilder
类的buildBackgroundCommand
方法
protected function buildBackgroundCommand(Event $event)
{
$output = ProcessUtils::escapeArgument($event->output);
$redirect = $event->shouldAppendOutput ? ' >> ' : ' > ';
//这里是完成后执行的命令
$finished = Application::formatCommandString('schedule:finish').' "'.$event->mutexName().'"';
if (windows_os()) {
return 'start /b cmd /v:on /c "('.$event->command.' & '.$finished.' ^!ERRORLEVEL^!)'.$redirect.$output.' 2>&1"';
}
//$? 这个是拿上一个命令的返回码
return $this->ensureCorrectUser($event,
'('.$event->command.$redirect.$output.' 2>&1 ; '.$finished.' "$?") > '
.ProcessUtils::escapeArgument($event->getDefaultOutput()).' 2>&1 &'
);
}
看一下schedule:finish
这个命令里面做了什么处理
public function handle(Schedule $schedule)
{
collect($schedule->events())->filter(function ($value) {
//这里是判断是否是当前任务
return $value->mutexName() == $this->argument('id');
})->each(function ($event) {
//做完成操作 同上的finish
$event->finish($this->laravel, $this->argument('code'));
//触发后台任务执行完事件
$this->laravel->make(Dispatcher::class)->dispatch(new ScheduledBackgroundTaskFinished($event));
});
}
以上就是事件注册和事件执行的这个流程了,如果有写的有误的地方,请大佬们指正
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: