RabbitMQ三大模式示例

发布订阅模式(fanout)

生产者:

<?php

$host = '127.0.0.1';
//瞎写的,练习的时候区别其他人用的
const TEST_UNIQUE = 'guoyeye';

$conn = new AMQPConnection(['host' => $host, 'port' => 5672, 'login' => '', 'password' => '', 'vhost' => '/']);

try {
    $conn->connect();
    $channel = new AMQPChannel($conn);
    $ex = new AMQPExchange($channel);
    $ex->setName('amq_' . TEST_UNIQUE . '_test_fanout_exchange');
    $ex->setType(AMQP_EX_TYPE_FANOUT);
    $ex->setFlags(AMQP_DURABLE);
    $ex->declareExchange();
    $i = 0;
    while (true) {
        sleep(1);
        $i++;
        $channel->startTransaction();
        //第二个参数填写空字符串即可
        if($ex->publish('test-' . $i, '', AMQP_MANDATORY)) {
            var_dump($i);
        }
        $channel->commitTransaction();
    }
} catch (AMQPConnectionException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPExchangeException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPChannelException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} finally {
    $conn->disconnect();
}

消费者:

<?php

$host = '127.0.0.1';
//瞎写的,练习的时候区别其他人用的
const TEST_UNIQUE = 'guoyeye';

$conn = new AMQPConnection(['host' => $host, 'port' => 5672, 'login' => '', 'password' => '', 'vhost' => '/']);

try {
    $conn->connect();
    $channel = new AMQPChannel($conn);
    $q = new AMQPQueue($channel);
    //订阅的队列
    $q->setName('amq_'.TEST_UNIQUE.'_test_fanout_queue_2');
    $q->setFlags(AMQP_DURABLE);
    $q->declareQueue();
    //第二个参数填写空字符串即可
    $q->bind('amq_'.TEST_UNIQUE.'_test_fanout_exchange', '');
    while (true) {
        $q->consume(function (AMQPEnvelope $envelope, AMQPQueue $queue) {
            try {
                var_dump($queue->getName() . '-->' . $envelope->getBody());
            } catch (Exception $e) {
                echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
            }
        }, AMQP_AUTOACK);
    }
} catch (AMQPConnectionException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPChannelException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPQueueException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPEnvelopeException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} finally {
    $conn->disconnect();
}

直接转发模式(direct)

生产者:

<?php

$host = '127.0.0.1';
//瞎写的,练习的时候区别其他人用的
const TEST_UNIQUE = 'guoyeye';

$conn = new AMQPConnection(['host' => $host, 'port' => 5672, 'login' => '', 'password' => '', 'vhost' => '/']);

try {
    $conn->connect();
    $channel = new AMQPChannel($conn);
    $ex = new AMQPExchange($channel);
    $ex->setName('amq_' . TEST_UNIQUE . '_test_direct_exchange');
    $ex->setType(AMQP_EX_TYPE_DIRECT);
    $ex->setFlags(AMQP_DURABLE);
    $ex->declareExchange();
    $i = 0;
    while (true) {
        sleep(1);
        $i++;
        $channel->startTransaction();
        //第二个参数填写具体的路由key,交换机会找到绑定了该key的队列,并将消息写入到队列
        if($ex->publish('test-' . $i, 'queue_1', AMQP_MANDATORY)) {
            var_dump($i);
        }
        $channel->commitTransaction();
    }
} catch (AMQPConnectionException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPExchangeException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPChannelException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} finally {
    $conn->disconnect();
}

消费者:

<?php

$host = '127.0.0.1';
//瞎写的,练习的时候区别其他人用的
const TEST_UNIQUE = 'guoyeye';

$conn = new AMQPConnection(['host' => $host, 'port' => 5672, 'login' => '', 'password' => '', 'vhost' => '/']);

try {
    $conn->connect();
    $channel = new AMQPChannel($conn);
    $q = new AMQPQueue($channel);
    $q->setName('amq_'.TEST_UNIQUE.'_test_direct_queue');
    $q->setFlags(AMQP_DURABLE);
    $q->declareQueue();
    //第二个参数填写具体的路由key,告诉交换机,当前队列的路由key
    $q->bind('amq_'.TEST_UNIQUE.'_test_direct_exchange', 'queue_1');
    while (true) {
        $q->consume(function (AMQPEnvelope $envelope, AMQPQueue $queue) {
            try {
                var_dump($queue->getName() . '-->' . $envelope->getBody());
            } catch (Exception $e) {
                echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
            }
        }, AMQP_AUTOACK);
    }
} catch (AMQPConnectionException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPChannelException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPQueueException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} catch (AMQPEnvelopeException $e) {
    echo $e->getCode() . '-->' . $e->getMessage() . PHP_EOL;
} finally {
    $conn->disconnect();
}

主题模式(topic)

代码跟上面两个类似,只不过routing_key参数不一样。发布订阅模式的routing_key必须是空字符串,直接转发模式的routing_key必须是一个单词或者是一个不包含.的字符串,主题模式的routing_key是一个包含.字符串做分割的字符串,该字符串可以用*#做模糊匹配,其中*表示任意单词,#任意多个单词。

最后

这记录文章写不下去了,记个学习的课程地址:
RabbitMQ 入门与实践

本作品采用《CC 协议》,转载必须注明作者和本文链接
梦想星辰大海
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!