基于Yii2对RabbitMQ的基本用法封装及RPC队列(二)
前言
RPC即跨项目调用。可以将一个大的项目分成多个子项目,然后子项目之间通过对外提供API从而实现了项目之间的解耦,再者便于各个子项目的单独部署和管理。而子项目之间的调用除了提供API之外,还可以通过比如Yar
将项目的类对外提供服务,其他项目对类的调用就犹如调用自己的类一样。
再此通过RabbitMQ实现的RPC队列也可以完成跨项目调用,但是实现的方式和Yar不同。实现的原理图如下:
1、client将request(包含corrid和reply_to参数)发送到了事先定义好的队列rpc_queue
2、启动rpc_queue
队列的消费者去处理队列中的消息,并且将响应结果投递到reply_to指定的临时队列
3、等待临时队列的消费者返回rpc_queue
的结果并校验,最终返回client。
4、client根据返回消息的corrid和request的corrid对比(批量请求时用到)。
RPC队列的配置和使用
配置和普通队列的配置一样简单,在Yii的配置文件中配置RPC队列的组件属性:
/** RPC队列 */
'rpcQueue' => [
'class' => \pzr\amqp\queue\RpcQueue::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'queueName' => 'rpc',
'exchangeName' => 'rpc',
'routingKey' => 'rpc',
'duplicate' => 2,
],
然后在控制器中调用组件对象:
public function actionRpc() {
Yii::$app->rpcQueue->on(AmqpBase::EVENT_BEFORE_PUSH, function(PushEvent $event) {
Yii::$app->rpcQueue->bind();
});
// 批量请求
for ($i=1; $i<=10; $i++) {
$jobs[] = new RequestJob([
'request' => 'request_' . $i,
]);
}
$response = Yii::$app->rpcQueue->publish($jobs);
return $response;
}
这里RequestJob是消费体对象必须继承AmqpJob,代码如:
class RequestJob extends AmqpJob
{
public $request;
public function execute()
{
$response = $this->request . ', corrid:' . $this->getUuid();
return $response;
}
}
不出意外得到:
request_1,corrid:xxxx
request_2,corrid:xxxx
... //省略
request_9,corrid:xxxx
这里的corrid是请求的唯一标志。因为支持批量请求,如果对结果的返回需要对号入座那么就需要用到corrid。
RPC队列的实现
竟然讲到了RPC,那么就以跨项目调用举例(很细致):
场景:A项目期望调用B项目的某个方法
step1:在A和B项目中都执行引入pzr/amqp包操作
composer require pzr/amqp
step2:引入包之后,可以看到在vendor目录下有pzr/amqp目录,目录大致情况是:
pzr/amqp
forntend
src
如果只是为了测试功能大可以在frontend目录下修改Yii2的配置和控制器,但是一旦执行composer update操作可能就会被覆盖。因此建议copy frontend目录。
frontend目录是AMQP消费者进程web管理,所以很多和web相关的代码对于Rpc调用来说其实并无作用。frontend目录大致情况是:
assets/
commands/
config/
controllers/
models/
vagrant/
views/
web/
widgets/
如果只是为了实现RPC功能只有以下目录留着就行:
commands/
config/
models/
这里假设copy的frontend重新命名为yii
step3:A调用B,所以可以理解为A为客户端,B为服务端
A的使命就是发出请求并且等待响应,B的使命就是响应请求。因为A和B是两个项目,所以项目环境的不同也将导致一个问题:B为了响应A的请求,队列的消费者“必须”在B工作环境下启动,否则可能导致A接收不到预期的结果。使用过Yar的同学肯定知道Yar的一个方便之处在于RPC调用的API也提供可视化,对于Client调用方来说非常友好!但是这里似乎做不到API的可视化,调用的基础是A已经了解B项目的调用方法。基于此去展开的RPC调用。
之前希望B可以返回A调用的序列化对象,但是发现对象能被序列化的只有非静态化变量,方法和静态属性等都无法序列化。因此只能采取Client告知Server我需要调用哪个class、哪个method,传入什么参数。Server得到Client的请求之后就会按请求封装对象,然后响应方法。如:
(new $class())->method($args)
。为了解决命名空间的问题,往往Client在请求的时候需要带上如:\namespace\class 去请求Server,于是Server才能够正确找到需要实例化的对象。对于本身使用Composer管理的项目,那么很轻松的就能加载并找到\namespace\class,而对于不是使用Composer管理的项目,那么需要自己去引入命名空间的autoload方法。
step4:准备工作完了之后,开始编写代码
在A项目中yii/commands
下实现ClientController类发出请求(无关代码不体现):
class ClientController extends Controller
{
// qos消费者预处理数
// timeout 请求超时时间,如果超过时间还没接收到完整数据则返回结果
public function actionRequest($jobs, $qos=1, $timeout=3) {
if (empty($jobs)) {
return null;
}
// 发送消息之前先绑定消息,因为对于请求方来说如果发送的请求丢失可以自己重新请求,所以关闭发送方消息确认机制
Yii::$app->on(AmqpBase::EVENT_BEFORE_PUSH, function(PushEvent $event) {
$event->noWait = true;
Yii::$app->rpcQueue->bind();
});
$response = Yii::$app->rpcQueue->setQos($qos)
->setTimeout($timeout)
->publish($jobs);
return $response;
}
}
A项目中的Client算是完成了,接下来需要在models编写Job类。Job类可以写成通用型,即A项目中所有的RPC调用都可以复用的那种。
class RpcJob extends AmqpJob
{
public $object; //请求的对象
public $action; //请求的方法
public $params = []; //方法可能需要的参数
// 这是执行请求响应的方法,只有在Server项目中启动的消费者才能够有效。如果在其他环境启动消费者,那么可能导致请求接收到不到响应数据(因为其他工作环境无法实例化并且响应请求)。
public function execute()
{
try {
$object = $this->object;
$obj = new $object();
if (!is_object($obj)) {
return false;
}
if (!method_exists($obj, $this->action)) {
return false;
}
return call_user_func_array([$obj, $this->action], $this->params);
} catch (Exception $e) {
} catch (Throwable $e) {
}
return false;
}
}
最后只需要在A项目中实际需要RPC调用的地方去调用Client/request即可,如在A项目中的IndexController(个人项目的控制器,和Yii2无关)中进行RPC调用。
class IndexController
{
public function init()
{
$yii = new MyYii();
$chargeStatusUuid = uniqid(true);
$jobs[] = new RpcJob([
'object' => 'PayConfig',
'action' => 'getChargeStatus',
'params' => [],
'uuid' => $chargeStatusUuid, //唯一标志请求ID,当然如果你喜欢也可以自己随便写,只要能够区分请求即可。
]);
$response = $yii->request([
'client/request', // 采用的Yii2的请求规则,类似:php yii client/request ,但是这里我对yii脚本改造了下,实现了MyYii
$jobs
]);
$chargeStatus = $response[$chargeStatusUuid];
}
}
上面的实例代码实现的是:A在init方法中想要调用B项目中的某个配置方法。然后B和A的实现步骤完全一致即可。
step5:在B项目中的实现和A完全一模一样,不同的是在B项目中需要启动消费者去响应
最后在A得到结果截图如:
B中关于chargeStatus 的配置截图如:
如果是多个请求同时发送,那么按corrid对应获取。
后言
在后来对这个项目也升级过很多次,当然存在很多缺陷也一直优化。也在使用的过程当中经常碰到各种问题。
1、找不到对应的文件vendor/bower-asset/jquery/dist
方案1:composer require “fxp/composer-asset-plugin” -vvv (这个成功了)
方案2:在composer.json添加如下(未尝试):
"config": {
"fxp-asset": {
"installer-paths": {
"npm-asset-library": "vendor/npm",
"bower-asset-library": "vendor/bower"
}
}
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: