延时队列报错
生产者代码
<?php
namespace App\Amqp\Producers;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\ProducerMessage;
use Hyperf\Amqp\Message\Type;
#[Producer]
class DelayDirectProducer extends ProducerMessage
{
use ProducerDelayedMessageTrait;
// 设置交换机
protected string $exchange = ‘ext.hyperf.delay’;
// 设置交换机类型
protected Type|string $type = Type::DIRECT;
// 设置路由键
protected array|string $routingKey = ‘delay-routing-key’; // 这里可以根据需求设置路由键
// 构造函数,接收数据和延迟时间
public function __construct($data, int $delayTime = 5000) // 默认延迟时间为 5000 毫秒
{
$this->payload = $data;
$this->applicationHeaders = [‘x-delay’ => $delayTime]; // 设置延迟时间
}
// 你也可以在这里添加日志记录
public function getPayload(): string
{
// Log::info(“Sending delayed message: “ . json_encode($this->payload));
return $this->payload;
}
}
消费者代码
<?php
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerDelayedMessageTrait;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;
#[Consumer(nums: 10)]
class DelayDirectConsumer extends ConsumerMessage
{
use ConsumerDelayedMessageTrait;
protected string $exchange = ‘ext.hyperf.delay’;
protected ?string $queue = ‘queue.hyperf.delay’;
protected Type|string $type = Type::DIRECT; // 或者 Type::FANOUT
protected array|string $routingKey = ‘’;
public function consumeMessage($data, AMQPMessage $message): Result
{
var_dump($data, ‘delay+direct consumeTime:’ . (microtime(true)));
return Result::ACK;
}
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
这不是提醒队列类型不支持吗?