基于Yii2对RabbitMQ的基本用法封装及RPC队列(二)

前言

RPC即跨项目调用。可以将一个大的项目分成多个子项目,然后子项目之间通过对外提供API从而实现了项目之间的解耦,再者便于各个子项目的单独部署和管理。而子项目之间的调用除了提供API之外,还可以通过比如Yar将项目的类对外提供服务,其他项目对类的调用就犹如调用自己的类一样。
再此通过RabbitMQ实现的RPC队列也可以完成跨项目调用,但是实现的方式和Yar不同。实现的原理图如下:

基于Yii2对RabbitMQ的基本用法封装及RPC队列(二)

1、client将request(包含corrid和reply_to参数)发送到了事先定义好的队列rpc_queue
2、启动rpc_queue队列的消费者去处理队列中的消息,并且将响应结果投递到reply_to指定的临时队列
3、等待临时队列的消费者返回rpc_queue的结果并校验,最终返回client。
4、client根据返回消息的corrid和request的corrid对比(批量请求时用到)。

RPC队列的配置和使用

配置和普通队列的配置一样简单,在Yii的配置文件中配置RPC队列的组件属性:

/** RPC队列 */
    'rpcQueue' => [
        'class' =>  \pzr\amqp\queue\RpcQueue::class,
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'queueName' => 'rpc',
        'exchangeName' => 'rpc',
        'routingKey' => 'rpc',
        'duplicate' => 2,
    ],

然后在控制器中调用组件对象:

public function actionRpc() {
        Yii::$app->rpcQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function(PushEvent $event) {
            Yii::$app->rpcQueue->bind();
        });

        // 批量请求
        for ($i=1; $i<=10; $i++) {
            $jobs[] = new RequestJob([
                'request' => 'request_' . $i,
            ]);
        }
        $response = Yii::$app->rpcQueue->publish($jobs);
        return $response;
    }

这里RequestJob是消费体对象必须继承AmqpJob,代码如:

class RequestJob extends AmqpJob
{    
    public $request;
    public function execute()
    {
        $response = $this->request . ', corrid:' . $this->getUuid();
        return $response;
    }
}

不出意外得到:

request_1,corrid:xxxx
request_2,corrid:xxxx
... //省略
request_9,corrid:xxxx

这里的corrid是请求的唯一标志。因为支持批量请求,如果对结果的返回需要对号入座那么就需要用到corrid。

RPC队列的实现

竟然讲到了RPC,那么就以跨项目调用举例(很细致):
场景:A项目期望调用B项目的某个方法
step1:在A和B项目中都执行引入pzr/amqp包操作

composer require pzr/amqp

step2:引入包之后,可以看到在vendor目录下有pzr/amqp目录,目录大致情况是:

pzr/amqp
    forntend
    src

如果只是为了测试功能大可以在frontend目录下修改Yii2的配置和控制器,但是一旦执行composer update操作可能就会被覆盖。因此建议copy frontend目录。
frontend目录是AMQP消费者进程web管理,所以很多和web相关的代码对于Rpc调用来说其实并无作用。frontend目录大致情况是:

assets/
commands/
config/
controllers/
models/
vagrant/
views/
web/
widgets/

如果只是为了实现RPC功能只有以下目录留着就行:

commands/
config/
models/

这里假设copy的frontend重新命名为yii
step3:A调用B,所以可以理解为A为客户端,B为服务端
A的使命就是发出请求并且等待响应,B的使命就是响应请求。因为A和B是两个项目,所以项目环境的不同也将导致一个问题:B为了响应A的请求,队列的消费者“必须”在B工作环境下启动,否则可能导致A接收不到预期的结果。使用过Yar的同学肯定知道Yar的一个方便之处在于RPC调用的API也提供可视化,对于Client调用方来说非常友好!但是这里似乎做不到API的可视化,调用的基础是A已经了解B项目的调用方法。基于此去展开的RPC调用。

之前希望B可以返回A调用的序列化对象,但是发现对象能被序列化的只有非静态化变量,方法和静态属性等都无法序列化。因此只能采取Client告知Server我需要调用哪个class、哪个method,传入什么参数。Server得到Client的请求之后就会按请求封装对象,然后响应方法。如:(new $class())->method($args)。为了解决命名空间的问题,往往Client在请求的时候需要带上如:\namespace\class 去请求Server,于是Server才能够正确找到需要实例化的对象。对于本身使用Composer管理的项目,那么很轻松的就能加载并找到\namespace\class,而对于不是使用Composer管理的项目,那么需要自己去引入命名空间的autoload方法。

step4:准备工作完了之后,开始编写代码
在A项目中yii/commands下实现ClientController类发出请求(无关代码不体现):

class ClientController extends Controller
{
    // qos消费者预处理数
    // timeout 请求超时时间,如果超过时间还没接收到完整数据则返回结果 
    public  function  actionRequest($jobs, $qos=1, $timeout=3) {
        if (empty($jobs)) {
            return  null;
        }
        // 发送消息之前先绑定消息,因为对于请求方来说如果发送的请求丢失可以自己重新请求,所以关闭发送方消息确认机制
        Yii::$app->on(AmqpBase::EVENT_BEFORE_PUSH, function(PushEvent  $event) {
            $event->noWait = true;
            Yii::$app->rpcQueue->bind();
        });
        $response = Yii::$app->rpcQueue->setQos($qos)
            ->setTimeout($timeout)
            ->publish($jobs);
        return  $response;
    }
}

A项目中的Client算是完成了,接下来需要在models编写Job类。Job类可以写成通用型,即A项目中所有的RPC调用都可以复用的那种。

class RpcJob extends AmqpJob
{
    public $object; //请求的对象
    public $action;    //请求的方法
    public $params = []; //方法可能需要的参数

    // 这是执行请求响应的方法,只有在Server项目中启动的消费者才能够有效。如果在其他环境启动消费者,那么可能导致请求接收到不到响应数据(因为其他工作环境无法实例化并且响应请求)。
    public function execute()
    {
        try {
            $object = $this->object;
            $obj = new $object();
            if (!is_object($obj)) {
                return false;
            }
            if (!method_exists($obj, $this->action)) {
                return false;
            }
            return call_user_func_array([$obj, $this->action], $this->params);
        } catch (Exception $e) {
        } catch (Throwable $e) {
        }

        return false;
    }
}

最后只需要在A项目中实际需要RPC调用的地方去调用Client/request即可,如在A项目中的IndexController(个人项目的控制器,和Yii2无关)中进行RPC调用。

class IndexController
{
    public function init() 
    {
        $yii = new  MyYii();
        $chargeStatusUuid = uniqid(true);
        $jobs[] = new RpcJob([
            'object' => 'PayConfig',
            'action' => 'getChargeStatus',
            'params' => [],
            'uuid' => $chargeStatusUuid, //唯一标志请求ID,当然如果你喜欢也可以自己随便写,只要能够区分请求即可。
            ]);
        $response = $yii->request([
            'client/request', // 采用的Yii2的请求规则,类似:php yii client/request ,但是这里我对yii脚本改造了下,实现了MyYii
            $jobs
        ]);
        $chargeStatus = $response[$chargeStatusUuid];
    }
}

上面的实例代码实现的是:A在init方法中想要调用B项目中的某个配置方法。然后B和A的实现步骤完全一致即可。
step5:在B项目中的实现和A完全一模一样,不同的是在B项目中需要启动消费者去响应
最后在A得到结果截图如:
基于Yii2对RabbitMQ的基本用法封装及消费进程管理控制(二)

B中关于chargeStatus 的配置截图如:

基于Yii2对RabbitMQ的基本用法封装及消费进程管理控制(二)
如果是多个请求同时发送,那么按corrid对应获取。

后言

在后来对这个项目也升级过很多次,当然存在很多缺陷也一直优化。也在使用的过程当中经常碰到各种问题。

1、找不到对应的文件vendor/bower-asset/jquery/dist
方案1:composer require “fxp/composer-asset-plugin” -vvv (这个成功了)
方案2:在composer.json添加如下(未尝试):

"config": {
        "fxp-asset": {
            "installer-paths": {
                "npm-asset-library": "vendor/npm",
                "bower-asset-library": "vendor/bower"
            }
        }
    }
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!