rabbitmq客户端

rabbitmq queue 消息队列

项目地址:github.com/2723659854/rabbitmq

项目介绍

消息队列主要用于业务解耦,本项目采用rabbitmq,支持thinkPHP,laravel,webman,yii等常用框架,也可以单独使用。

安装方法 install

composer require xiaosongshu/rabbitmq

示例 demo

定义一个队列 queue

<?php
namespace app\commands;

require_once __DIR__.'/vendor/autoload.php';

class Demo extends \Xiaosongshu\Rabbitmq\Client
{

 /** 以下是rabbitmq配置 ,请填写您自己的配置 */ 
 /** @var string $host 服务器地址 */ 
 public static $host = "127.0.0.1";
 /** @var int $port 服务器端口 */ 
 public static $port = 5672;
 /** @var string $user 服务器登陆用户 */ 
 public static $user = "guest";
 /** @var string $pass 服务器登陆密码 */ 
 public static $pass = "guest";
 /** 
 * 业务处理
 * @param array $params 
 * @return int 
 */ 
 public static function handle(array $params): int { 
     //TODO 这里写你的业务逻辑
     // ... var_dump($params); 
     return self::ACK; 
     //return self::NACK; 
     }
 }

投递消息 publish

\app\commands\Demo::publish(['name'=>'tome','age'=>15]);

你可以在任何地方投递消息。

开启消费

\app\commands\Demo::consume();

你可以把消费者放到command命令行里面,使用命令行执行队列消费。举个例子(这里以yii为例子,你也可以换成laravel,webman,thinkPHP等其他框架):

<?php

namespace app\commands;

use yii\console\Controller;

/**
 * @purpose 开启队列消费
 * @note 我只是一个例子
 */class QueueController extends Controller
{

 /** 
 * @api php yii queue/index 
 * @return void 
 * @throws \Exception 
 * @comment 开启消费者
 */ 
 public function actionIndex() {
     Demo::consume(); 
     }
 }

开启消费者命令 consume

php yii queue/index

注:如果你需要开启多个消费者,那么可以在多个窗口执行开启消费者命令即可。当然你也可以使用多进程来处理。

关闭消费者

\app\commands\Demo::close();

异常 Exception

队列使用过程中请使用 \RuntimeException和\Exception捕获异常

若需要使用延迟队列,那么rabbitmq服务需要安装延迟插件,否则会报错

测试

本项目根目录有一个demo.php的测试文件,可以复制到你的项目根目录,在命令行窗口直接在命令行执行以下命令即可。

php demo.php

测试文件代码如下:

<?php

namespace xiaosongshu\test;
require_once __DIR__ . '/vendor/autoload.php';

/**
 * demo * @purpose 定义一个队列演示
 */
 class Demo extends \Xiaosongshu\Rabbitmq\Client
{

 /** 以下是rabbitmq配置 ,请填写您自己的配置 */ 
 /** @var string $host 服务器地址 */ 
 public static $host = "127.0.0.1";
 /** @var int $port 服务器端口 */ 
 public static $port = 5672;
 /** @var string $user 服务器登陆用户 */ 
 public static $user = "guest";
 /** @var string $pass 服务器登陆密码 */ 
 public static $pass = "guest";
 /** 
 * 业务处理
 * @param array $params 
 * @return int 
 */ 
 public static function handle(array $params): int { 
     //TODO 这里写你的业务逻辑
     // ... var_dump($params); 
    /** 成功,返回ack */
     return self::ACK; 
    /** 失败,返回NACK*/
     //return self::NACK; 
    }
}

/** 投递普通消息 */
\xiaosongshu\test\Demo::publish(['name' => 'tom']);
\xiaosongshu\test\Demo::publish(['name' => 'jim']);
\xiaosongshu\test\Demo::publish(['name' => 'jack']);
/** 开启消费,本函数为阻塞,后面的代码不会执行 */
\xiaosongshu\test\Demo::consume();
/** 关闭消费者 */
\xiaosongshu\test\Demo::close();
联系作者:

2723659854@qq.com
你也可以直接提issues

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 3

你这消费者 在webman 能用嘛? 能辛苦大佬写一下简单的文档嘛

7个月前 评论
lantaigongzhu (楼主) 7个月前

定义队列业务处理逻辑

<?php

namespace app\command;

use Xiaosongshu\Rabbitmq\Client;

/**
 * @purpose 定义处理业务逻辑任务
 */
class DemoJob extends Client
{

    /** 以下是rabbitmq配置 ,请填写您自己的配置 */
    /** @var string $host 服务器地址 */
    public static $host = "127.0.0.1";
    /** @var int $port 服务器端口 */
    public static $port = 5672;
    /** @var string $user 服务器登陆用户 */
    public static $user = "guest";
    /** @var string $pass 服务器登陆密码 */
    public static $pass = "guest";
    /**
     * 业务处理
     * @param array $params
     * @return int
     */
    public static function handle(array $params): int {
        /** 假设这里处理一堆业务,巴拉巴拉 */
        var_dump($params);
        return self::ACK;
    }
}


定义消费者

<?php

namespace app\command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Output\OutputInterface;


/**
 * @purpose 定义一个消费者
 * @comment 本命令行是阻塞的,会一直消费数据
 * @command php webman queue
 */
class Queue extends Command
{
    protected static $defaultName = 'Queue';
    protected static $defaultDescription = '一个rabbitmq队列消费者示例';

    /**
     * @return void
     */
    protected function configure()
    {
        //$this->addArgument('name', InputArgument::OPTIONAL, 'Name description');
    }

    /**
     * 业务逻辑处理函数
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return int
     */
    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $output->writeln("开启消费者");
        /** 开启消费者 */
        DemoJob::consume();
    }

}

假设用命令行投递消息。

<?php

namespace app\command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Output\OutputInterface;


/**
 * @purpose 投递消息到队列
 * @command php webman publish
 */
class Publish extends Command
{
    protected static $defaultName = 'Publish';
    protected static $defaultDescription = '测试投递消息示例';

    /**
     * @return void
     */
    protected function configure()
    {
        //$this->addArgument('name', InputArgument::OPTIONAL, 'Name description');
    }

    /**
     * 业务逻辑
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return int
     */
    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        DemoJob::publish(['name'=>'tome','age'=>15]);
        $output->writeln('投递消费者完成');
        return self::SUCCESS;
    }

}

首先开启消费者

php webman queue

然后投递一个消息

php webman publish

@tanhongbin

7个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!