优先级队列模式 Priority Queue Pattern
描述
具有优先级的请求被服务优先处理。
背景和问题
消息队列有时候要求某些紧急请求需要快速处理。
解决方案
基于优先队列的消息队列。
具有优先级的独立队列模式。
应用程序发布一条具有优先级的消息,队列中的消息都会自动重新排序,高优先级的消息会自动排在前面,会被优先消费。要注意的是,具有非常低的优先级的消息可能永远不会被处理。
每个优先级拥有至少一个独立队列的多队列模式。
每个队列都有单独的消费者池(多个消费者)。高优先级队列比低优先级队列有更好的硬件,更多的消费者。在这个模式下,低优先级的请求总会被处理,只是比高优先级的处理速度慢而已。
优点
- 提供了优先业务,可以对不同客户提供不同级别的业务。
- 最大限度降低运营成本。
- 最大限度地提高应用程序的性能和可扩展性。重要的任务可以优先运行,而不太重要的可以延后执行。
注意事项
- 先定义好什么是优先级的标准。例如信息需要在10秒内处理完毕。基于这个标准再做资源的规划和分配。
- 确定好任务机制。那些请求是最高级,那些请求是最低级,有没有可抢先或者可暂停的任务类型。
- 监控所有队列的速度,确保队列的速度是符合预期的。
- 消息的优先级是由系统业务决定的。
- 查询队列中的消息,要控制成本,尤其是查询多个队列的消息的时候。
- 在多队列模式中,所有的队列都是需要监控的。
- 在设计消费者的消费模式的时候,一定要支持动态调整(增加或者减少消费者)。
何时使用
- 不同用户对应不同的优先级任务。
- 系统中具有不同侧重点的任务。
可用到的设计模式思维
一个队列里面有多个元素,在形式上符合迭代器模式,但是实际业务上不适用使用。
结构中包含的角色
- AbstractQueue 抽象队列
- ConcreteQueue 具体队列
- Message 消息
- Consumer 消费者
最小可表达代码
// 抽象队列
abstract class AbstractQueue
{
protected $initMaxPriority = 100; // 初始化最大优先级
protected $initMinPriority = 0; // 初始化最小优先级
protected $currentOffsetPriority; // 当前偏移的优先级
protected $counter; // 计数器
protected $messages = [];
public function __construct()
{
$this->currentOffsetPriority = $this->initMaxPriority;
$this->counter = $this->initMinPriority;
}
// 消息入队列
public function push(Message $message)
{
$priority = $this->rewritePriority($message->getPriority());
$this->reloadOffsetPriority($priority);
$this->messages[$priority][] = $message;
}
// 消息出队列
public function pop()
{
for ($priority = $this->getCurrentOffsetPriority(); $priority >= 0; $priority--) {
if (empty($this->messages[$priority])) {
continue;
}
$message = array_shift($this->messages[$priority]);
if (empty($this->messages[$priority])) {
unset($this->messages[$priority]);
}
return $message;
}
return null;
}
protected function rewritePriority($priority)
{
$priority = $priority < $this->initMinPriority ? $this->initMinPriority : $priority;
$priority = $priority > $this->initMaxPriority ? $this->initMaxPriority : $priority;
return $priority;
}
// 这里线程不安全,需要自行加锁
protected function reloadOffsetPriority(int $priority)
{
// 判断当前插入的优先级是否比现在执行的优先级要高
if ($priority > $this->currentOffsetPriority) {
$this->currentOffsetPriority = $priority;
}
// 设置计数器,防止并发引起的优先级错乱
$this->counter++;
if ($this->initMaxPriority == $this->counter) {
$this->currentOffsetPriority = $this->initMaxPriority;
$this->counter = $this->initMinPriority;;
}
}
protected function getCurrentOffsetPriority() : int
{
return $this->currentOffsetPriority;
}
}
// 消息
class Message
{
private $priority;
public function __construct(int $priority)
{
$this->priority = $priority;
}
public function getPriority()
{
return $this->priority;
}
}
// 具体队列
class ConcreteQueue extends AbstractQueue {}
// 消费者
class Consumer
{
protected $queue;
public function __construct(AbstractQueue $queue)
{
$this->queue = $queue;
}
public function execute()
{
while($message = $this->queue->pop()) {
var_dump($message);
}
}
}
$queue = new ConcreteQueue();
$queue->push(new Message(88));
$queue->push(new Message(11));
$queue->push(new Message(66));
(new Consumer($queue))->execute();
推荐文章: