基于Yii2对RabbitMQ的封装及进程管理实现细节(四)

前言

web进程管理是以写文件的形式管理消费者子进程。在进程管理文件中的一个子进程对应的一行记录类似:
89104_89102 = 07_14_delay_queue,07_14
<子进程>_<父进程> = <队列名称>,<文件标志>

之所以选用文件进行管理的原因是因为实现起来简单,并且如果真的出错了人为操作文件也非常简便轻松。本着最初的设计思想开始了代码的实现。

实现的流程图

进程管理分为了几个部分实现,然后将这几个部分拼接在一起最终实现了消费者进程的管理。
1、socekt部分:接收web页面的请求并分发请求
2、Dispatcher部分:自身成为守护进程,并启动AMQP消费者
3、进程文件读写部分:读写文件中的子进程信息

每一部分的流程图如下图:

基于Yii2对RabbitMQ的封装及进程管理实现细节(四)

但是实现的过程中有几个问题需要解决:
1、进程文件的读写在多进程情况下如何保障顺序性?
2、子进程的管理及进程通信问题

其实保障文件读写的顺序性问题很容易解决,本身写的就是关于AMQP消息队列,所以很容易想到用消息队列即可实现。但是用哪种消息队列呢?类库中实现了三种方式:amqp、beanstalk、redis。进程通信也是使用的消息队列,除了上面的三种能够复用之外还增加了一个pipe(文件)。实现的过程中也并非一帆风顺,多亏了在这里发布了“问答”才得以解决我碰到的“多代子进程无法接收SIGTERM信号”的问题。这是一个有温度的社区,也因此让我决定在社区发布这系列文章。
详情问题可见:问答:多进程信号处理相关:父进程的第一代子进程可以接收到SIGTERM信号,...

消费者配置文件

消费者文件是启动消费者进程的关键,配置如:

[consumer]
program = 07_14 ;文件标志
queueName = 07_14_delay_queue
qos = 1 ;消费者预处理数
duplicate = 1 ;队列副本的数量,0或1都是直接取queueName的值
numprocs = 2 ;子进程的数量
script = /path/to/yii  ; Yii2的执行脚本
request = amqp/consumer ; Yii2的Command控制器

; {php} 是执行php的命令占位符,这个值在[common]模块下command已经配置过
; {script} php执行脚本
; {request} 请求的控制器
; {queueName} 是queueName属性的值
; {qos} 是qos属性的值,默认为1
; command = {php} {script} {request} {queueName} {qos}
; 最终执行的命令如:`/usr/bin/php /path/to/yii amqp/consumer {queueName} 1` ,queueName会因为duplicate队列副本的配置而改变。

amqp.ini配置文件

amqp.ini配置文件是启动整个进程管理的关键,配置如:

; 公用配置属性
[common]
; 日志也支持按日期记录 %Y-%m-%d
; %Y 年:2020,也可以写成 %y:20
; %m 月:08
; %d 日:07 
; ../log/%Y-%m/access_%Y-%m-%d.log
access_log = ../log/access.log   ; ../log/%Y-%m/access_%Y-%m-%d.log
error_log = ../log/error.log
; 可选择:DEBUG,INFO,NOTICE,WARNING,ERROR,CRITICAL,ALERT,EMERGENCY
; 选中的以逗号隔开,只记录选中的类型日志
level = INFO,WARNING,ERROR,NOTICE
; 保存AMQP消费者守护进程的PID
pidfile = /var/run/amqp_master.pid
; 执行ExecDispatcher.php脚本的命令
command = /usr/bin/php
; \pzr\amqp\cli\Server 启动unix连接的本地文件地址
unix = /var/run/amqp_consumer_serve.sock
; 进程文件管理路径
process_file = ./process_manager.ini

; AMQP消费者读取的连接配置
[amqp]
host = 127.0.0.1
port = 5672
user = guest
password = guest

; 进程文件处理:启用beanstalk
[beanstalk]
host = 127.0.0.1
port = 11300

; 进程文件处理:启用redis
[redis]
host = 127.0.0.1
port = 6379
user = 
password = 

; 通知父进程的通信方式
[pipe]
; 子进程和父进程通信的文件地址
pipe_file = /tmp/amqp_pipe

[communication]
; 可选择:redis、amqp、beanstalk、pipe(默认)
class = pipe

[handler]
; 可选择:beanstalk、redis、amqp
; 没有默认的值,所以必须配置一个
class = beanstalk

[include]
files = ./consumer/*.ini

配置文件中的相对路径都是以amqp.ini的所在目录为根本路径。

后续

再此之前的三篇主要展示的是基于Yii2实现的RabbitMQ在使用上的简易操作,对实现细节并未过多阐述。然而实际上对于RabbitMQ的基本操作封装的实现细节似乎也比较容易理解。这里且谈在实现过程中记录的点点滴滴。

为什么要写基于Yii2的RabbitMQ封装呢?

原因很简单:想要了解一些关于Yii2的实现细节。因为之前项目上没有用过Yii2,所以准备自己看看摸索下。但是看总归是看,所以决定动手做点什么可能更有助于理解,于是打算结合RabbitMQ的实现对Yii2展开探索。
可是最后慢慢的就偏离了最开始的初衷。Yii2看了大概一两星期后就觉得基本满足我对RabbitMQ的实现了,于是动手开始封装RabbitMQ却用了一个来月(当然是空余时间,那段时间回家的也晚,但很充实)。在实现之前大概看了下yii2-queue,发现它只能实现最简单的队列,但是它的很多实现方法值得借鉴,比如:消息体的封装思想,消息体的序列化实现,事件订阅思想等。消息体的序列化实现是原封不动的搬过来了,消息体的封装思想和事件订阅则沿用了思想但在实现上已经不一样了。

基本用法包括哪些呢?

1、队列实现
(1)各种队列的实现包括普通队列、延时队列、优先队列、RPC队列。
(2)路由类型支持四种 direct、topic、fanout,header。
(3)默认情况下启动的队列都会启动备份路由,可以防止消息路由失败而导致的丢失。
(4)默认情况下开启了客户端消息确认机制,可以通过事件订阅关闭。默认情况下开启消费者确认机制,也可以通过事件订阅关闭。
(5)增加了队列副本概念,可以通过配置实现。
(6)消息的发布可以是单条发布也可以是批量发布。
(7)消息体的序列化支持:json、igbinary、serialize。
2、消费者实现
消费者包括一般队列的消费者和RPC队列的消费者。由于RPC的实现和一般队列的实现不同,所以在实现和消费者实现都是单独的。
3、提供AMQP的api功能
通过api提供的一些信息可以实现对AMQP服务监管,镜像策略的实现、已有队列的保护、消费者连接的监控及管理等。

unknown delivery_tag 趣事记录

实现的过程中遇到一个有趣的问题,提了一个Issue给amqplib的作者。但是他们认为似乎并不是一个bug,更认为是一个错误的使用方式。

问题描述:分批多次发布批量消息时报错 PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPRuntimeException: Server ack'ed unknown delivery_tag "19"
问题原因:开启客户端消息确认机制之后,消息的发布都会等待服务端的返回结果以此确认消息是否发送成功。但是每次发送消息RabbitMQ都会将next_delivery_tag置成1,而服务端返回的确认的消息next_delivery_tag却不会每次都置成1,表现是递增。这也就导致最后发送的tag和返回的tag不一致的问题。详情见issue地址:github.com/php-amqplib/php-amqplib...

以此给碰到类似Server ack'ed unknown delivery_tag "19"这种问题的解决思路:报这个错误可能并不是因为消费者多次ack的问题,很有可能是因为多次批量发送消息的原因。(在网上寻求答案的时候都是清一色的说是ack重复,而且都是复制粘贴的那种特别讨厌!)

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!