消费者的竞争模式 Competing Consumers Pattern
描述#
多个并发用户处理同一个通讯通道的信息。这种模式使系统能够同时处理多条数据,以优化吞吐量,提高可扩展性和可用性,以及平衡工作负载。
背景和问题#
在高峰时间的系统可能需要处理许多每秒数百个请求,而在其他时间的数量可能是非常小的,处理这些请求可能是高度可变的。为了处理这种波动的负载,该系统可以运行多个消费者服务实例。而这些消费者之间是存在资源竞争的。
解决方案#
使用消息队列来实现应用和消费者服务实例之间的通信信道,简单说,就是把数据存到消息队列中,然后多个消费者同时消费消息队列。
注意事项#
- 增强系统的负载。可以承受请求量很大的变化。
- 提高可靠性。信息存在队列中,可以监控队列的数据。
- 不需要协调消费者的竞争。
- 可扩展性高。能够动态地增加或减少消费者服务的实例。
何时使用#
- 工作量为一个应用程序被分成可异步运行任务。
- 任务是独立的,可以并行地运行。
- 工作容积变化很大,需要一个可扩展的解决方案。
- 该解决方案必须提供高可用性,并且如果处理一个任务失败必须是有弹性的。
结构中包含的角色#
- MessageQueue 抽象队列
- ConcreteMessageQueue 具体队列
- Producer 生产者
- Consumer 消费者
最小可表达代码#
abstract class MessageQueue
{
protected $messages = [];
public function pop()
{
return array_shift($this->messages);
}
public function push($message)
{
$this->messages[] = $message;
}
}
class ConcreteMessageQueue extends MessageQueue {}
class Producer
{
protected $number;
protected $messageQueue;
public function __construct(MessageQueue $messageQueue, int $number)
{
$this->messageQueue = $messageQueue;
$this->number = $number;
}
public function handle()
{
$message = microtime();
var_dump("Producer {$this->number} : $message");
$this->messageQueue->push($message);
}
}
class Consumer
{
protected $number;
protected $messageQueue;
public function __construct(MessageQueue $messageQueue, int $number)
{
$this->messageQueue = $messageQueue;
$this->number = $number;
}
public function handle()
{
$message = $this->messageQueue->pop();
var_dump("Consumer {$this->number} : $message");
}
}
// 线程安全
$queue = new ConcreteMessageQueue();
$product1 = new Producer($queue, 1);
$product1->handle();
$product2 = new Producer($queue, 2);
$product2->handle();
$consumer1 = new Consumer($queue, 1);
$consumer1->handle();
$consumer2 = new Consumer($queue, 2);
$consumer2->handle();
推荐文章: