调度程序代理管理者模式 Scheduler Agent Supervisor Pattern
描述
在分布式系统中,服务访问远程服务或资源是需要协调一系列行为的,这些行为包括故障处理,操作撤销,失败重试,异步调用等等。
这些行为集合叫调度程序代理管理者模式。这种模式可以分布式系统中增加弹性和灵活性。
背景和问题
服务访问远程服务或资源是可能发生故障的。 例如网络崩溃,通信终端,资源无法访问,资源约束等情况。
何时使用
在分布式环境中运行。有下面特征
1. 调用了远程服务或资源
2. 能以透明的方式处理通讯故障
3. 各个流程都支持异步机制。
解决方案
成员
- 远程服务或资源。
- 代理。负责调用远程服务或资源,包含错误处理和重试机制。
- 调度程序。负责执行工作流,记录状态到状态存储器,请求代理。
- 状态存储器。负责存储整个工作流状态的生命周期以及其他一些附加信息。
- 管理者。负责重试超时或异常的步骤(请求调度程序或维护状态存储器重试状态实现重试机制)和监控状态存储中每一个步骤的状态并且可能更新状态。
流程
- 把获取远程资源的过程分成多个步骤。
- 各个步骤组合成一个管道或工作流。
- 调度器负责这个工作流程中的步骤,以适当的顺序执行,并记录任务的进度和步骤的状态(未开始,运行中,完成)持久化到状态存储器中。
- 当一个步骤需要访问远程服务或资源时,调度程序调用(通常采用异步调用)代理程序,通过代理访问远程资源。
- 管理者负责重试和监控状态。
注意事项
- 所有可能发生的故障都要测试覆盖。
- 每一个步骤都需要幂等。
- 状态存储器中需要持久化补偿事物所需要的所有信息。
- 监控的频率需要合适,不能成为性能开销。
具体实现
实现流程
结构中包含的角色
- RemoteResource 远程资源
- Agent 代理
- Scheduler 调度程序
- SateStore 状态存储器
- Supervisor 管理者
- Crontab 定时任务/执行者
可用到的设计模式思维
代理的角色就可以使用代理模式来实现。
最小可表达代码
class RemoteResource
{
public function debug()
{
var_dump('我是远程资源...');
}
}
class Agent
{
public static function debug()
{
(new RemoteResource())->debug();
}
}
class SateStore
{
public function getWaitingTasks()
{
return [
['id' => 1, 'status' => 1],
['id' => 2, 'status' => 2],
];
}
public function start()
{
var_dump('开始状态');
}
public function end()
{
var_dump('结束状态');
}
public function reset()
{
var_dump('重置状态,让调度器可以重新执行');
}
}
class Supervisor
{
private $sateStore;
public function __construct()
{
$this->sateStore = new SateStore();
}
public function monitor()
{
var_dump('监控状态');
}
public function retry()
{
$this->sateStore->reset();
}
public function handle()
{
$this->monitor();
$hasTimeOut = rand(0, 1);
if ($hasTimeOut) {
var_dump('有超时的状态');
$this->retry();
return;
}
var_dump('状态正常');
}
}
class Scheduler
{
private $sateStore;
public function __construct()
{
$this->sateStore = new SateStore();
}
public function firstStep()
{
$this->sateStore->start();
Agent::debug();
$this->sateStore->end();
}
public function secondStep()
{
$this->sateStore->start();
Agent::debug();
$this->sateStore->end();
}
public function handle()
{
$tasks = $this->sateStore->getWaitingTasks();
foreach ($tasks as $task) {
switch ($task['status']) {
case 1 :
var_dump("task id : {$task['status']} 执行第一步");
$this->firstStep();
break;
case 2 :
var_dump("task id : {$task['status']} 执行第二步");
$this->secondStep();
break;
}
}
}
}
class Crontab
{
public function handle()
{
(new Supervisor)->handle();
(new Scheduler)->handle();
}
}
// 定时任务执行
(new Crontab)->handle();
推荐文章: