RabbitMQ三大模式示例
发布订阅模式(fanout)
生产者:
<?php
$host = '127.0.0.1';
//瞎写的,练习的时候区别其他人用的
const TEST_UNIQUE = 'guoyeye';
$conn = new AMQPConnection(['host' => $host, 'port' => 5672, 'login' => '', 'password' => '', 'vhost' => '/']);
try {
$conn->connect();
$channel = new AMQPChannel($conn);
$ex = new AMQPExchange($channel);
$ex->setName('amq_' . TEST_UNIQUE . '_test_fanout_exchange');
$ex->setType(AMQP_EX_TYPE_FANOUT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
$i = 0;
while (true) {
sleep(1);
$i++;
$channel->startTransaction();
//第二个参数填写空字符串即可
if($ex->publish('test-' . $i, '', AMQP_MANDATORY)) {
var_dump($i);
}
$channel->commitTransaction();
}
} catch (AMQPConnectionException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPExchangeException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPChannelException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} finally {
$conn->disconnect();
}
消费者:
<?php
$host = '127.0.0.1';
//瞎写的,练习的时候区别其他人用的
const TEST_UNIQUE = 'guoyeye';
$conn = new AMQPConnection(['host' => $host, 'port' => 5672, 'login' => '', 'password' => '', 'vhost' => '/']);
try {
$conn->connect();
$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
//订阅的队列
$q->setName('amq_'.TEST_UNIQUE.'_test_fanout_queue_2');
$q->setFlags(AMQP_DURABLE);
$q->declareQueue();
//第二个参数填写空字符串即可
$q->bind('amq_'.TEST_UNIQUE.'_test_fanout_exchange', '');
while (true) {
$q->consume(function (AMQPEnvelope $envelope, AMQPQueue $queue) {
try {
var_dump($queue->getName() . '-->' . $envelope->getBody());
} catch (Exception $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
}
}, AMQP_AUTOACK);
}
} catch (AMQPConnectionException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPChannelException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPQueueException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPEnvelopeException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} finally {
$conn->disconnect();
}
直接转发模式(direct)
生产者:
<?php
$host = '127.0.0.1';
//瞎写的,练习的时候区别其他人用的
const TEST_UNIQUE = 'guoyeye';
$conn = new AMQPConnection(['host' => $host, 'port' => 5672, 'login' => '', 'password' => '', 'vhost' => '/']);
try {
$conn->connect();
$channel = new AMQPChannel($conn);
$ex = new AMQPExchange($channel);
$ex->setName('amq_' . TEST_UNIQUE . '_test_direct_exchange');
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
$i = 0;
while (true) {
sleep(1);
$i++;
$channel->startTransaction();
//第二个参数填写具体的路由key,交换机会找到绑定了该key的队列,并将消息写入到队列
if($ex->publish('test-' . $i, 'queue_1', AMQP_MANDATORY)) {
var_dump($i);
}
$channel->commitTransaction();
}
} catch (AMQPConnectionException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPExchangeException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPChannelException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} finally {
$conn->disconnect();
}
消费者:
<?php
$host = '127.0.0.1';
//瞎写的,练习的时候区别其他人用的
const TEST_UNIQUE = 'guoyeye';
$conn = new AMQPConnection(['host' => $host, 'port' => 5672, 'login' => '', 'password' => '', 'vhost' => '/']);
try {
$conn->connect();
$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName('amq_'.TEST_UNIQUE.'_test_direct_queue');
$q->setFlags(AMQP_DURABLE);
$q->declareQueue();
//第二个参数填写具体的路由key,告诉交换机,当前队列的路由key
$q->bind('amq_'.TEST_UNIQUE.'_test_direct_exchange', 'queue_1');
while (true) {
$q->consume(function (AMQPEnvelope $envelope, AMQPQueue $queue) {
try {
var_dump($queue->getName() . '-->' . $envelope->getBody());
} catch (Exception $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
}
}, AMQP_AUTOACK);
}
} catch (AMQPConnectionException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPChannelException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPQueueException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPEnvelopeException $e) {
echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} finally {
$conn->disconnect();
}
主题模式(topic)
代码跟上面两个类似,只不过routing_key
参数不一样。发布订阅模式的routing_key
必须是空字符串,直接转发模式的routing_key
必须是一个单词或者是一个不包含.
的字符串,主题模式的routing_key
是一个包含.
字符串做分割的字符串,该字符串可以用*
、#
做模糊匹配,其中*
表示任意单词,#
任意多个单词。
最后
这记录文章写不下去了,记个学习的课程地址:
RabbitMQ 入门与实践
本作品采用《CC 协议》,转载必须注明作者和本文链接