PHP7 + beanstalkd 生产环境队列使用姿势

应用场景

  为什么要用呢,有什么好处?这应该放在最开头说,一件东西你只有了解它是干什么的,适合干什么,才能更好的与自己的项目相结合,用到哪里学到哪里,学了不用等于不会,我们平时就应该多考虑一些这样的问题:自己做个什么项目功能能跟 xx 技术相结合呢?这个 xx 技术放在这种业务场景下行不行呢?而不是 “学了这个 xx 技术能干嘛呢,公司现在也没有用这个的呀,学了也没用啊”,带着这样心情去学习 xx 技术,肯定很痛苦。
  队列大家都知道是将一些耗时的操作先不去做,先埋点,再异步去处理,这样对一些发邮件发短信之类的耗时操作,用户是感觉不到的,因为埋点结束,操作也就结束了,消费队列都是在服务器上做的。主要应用在短信或邮件通知,访问第三方接口订阅消息,商城的一些秒杀活动,都可以结合队列来完成。

Beanstalkd 介绍

  Beanstalkd是一个高性能,轻量级的分布式内存队列,C 代码,典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。
  beanstalkd 的最初设计意图是在高并发的网络请求下,通过异步执行耗时较多的请求,及时返回结果,减少请求的响应延迟。

Ubuntu 安装

sudo apt-get install beanstalkd

配置文件

vim /etc/default/beanstalkd

查看状态

service beanstalkd status

# 命令回显 #
root@:/www/server/php/72/etc# service beanstalkd status
● beanstalkd.service - Simple, fast work queue
   Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled)
   Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago
     Docs: man:beanstalkd(1)
 Main PID: 7033 (beanstalkd)
    Tasks: 1 (limit: 4634)
   CGroup: /system.slice/beanstalkd.service
           └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd

Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.

配置连通性 + 持久化

ip 用 0.0.0.0 允许所有连接,靠配置安全组或防火墙去约束连接,放开 -b 参数 (默认没有持久化),内存的队列消息可以落地到硬盘 binlog 实现持久化,断电可重新读取队列消息。

vim /etc/default/beanstalkd

BEANSTALKD_LISTEN_ADDR=0.0.0.0
BEANSTALKD_LISTEN_PORT=11300
BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"

beanstalkd 任务状态

状态 注释
delayed 延迟状态
ready 准备好状态
reserved 消费者把任务读出来,处理时
buried 预留状态
delete 删除状态

管理工具

亲测了很多网上能找到的 beanstalkd 工具,这两款是我最中意的了,一个命令行,一个web的。
命令行:https://github.com/src-d/beanstool
web 界面:https://github.com/ptrofimov/beanstalk_con...

编程语言客户端

PHP 客户端
https://packagist.org/packages/pda/pheanst...

composer require pda/pheanstalk

写入 job

<?php
//创建队列消息
require_once('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);

$tubeName = 'email_list';

$jobData = [
    'email' => '123456@163.com',
    'message' => 'Hello World !!',
    'dtime' => date('Y-m-d H:i:s'),
];

$pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );

消费 job

<?php
ini_set('default_socket_timeout', 86400*7);
ini_set( 'memory_limit', '256M' );

// 消费队列消息
require_once('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;

$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName = 'email_list';
while ( true )
{
    // 获取队列信息, reserve 阻塞获取
    $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve();
    if ( $job !== false )
    {
        $data = $job->getData();
        /* TODO 逻辑操作 */

        /* 处理完成,删除 job */
        $pheanstalk->delete( $job );
    }
}

default_socket_timeout 这个参数是一定要加的,php默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s之内如果没有 job 产生,脚本就会报 socket 错误,我写的是 7 天超时,您可以根据业务去调整,记住一定要配置,网上很多搜的 consumer 脚本都没有配置这个,根本不能投入生产环境使用,这是我亲自实践的结果。
  关于 while true 是否死循环,很明确告诉你是死循环,但是不会一直耗性能的那样执行下去,它会在 reserve 这里阻塞不动,直到有消息产生才会往下走,所以大可放心使用,我的项目代码里面是使用了方法调用方法自身去实现循环的。

就是这样的代码,供参考:

    public function watchJob()
    {
        $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve();
        if ( $job !== false )
        {
            $job_data = $job->getData();

            $this->subscribe( $job_data );
            $this->pheanstalk->delete( $job );
            /* 继续 Watch 下一个 job */
            $this->watchJob();
        }
        else
        {
            $this->log->error( 'reserve false', 'reserve false' );
        }
    }

监控 beanstalkd 状态

<?php

//监控服务状态
require_once('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$isAlive = $pheanstalk->getConnection()->isServiceListening();

var_dump( $isAlive );

可以配合 email 做一个报警邮件,脚本每分钟去执行,判断状态是 false,就给管理员发送邮件报警。

一些相关命令

查看 beanstalkd 服务内存占用

top -u beanstalkd

后台运行 consumer 脚本

nohup php googlehome_subscribe.php & 

查看 consumer 脚本运行时间

ps -A -opid,stime,etime,args | grep consumer.php

手工重启 consumer 脚本

ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9 
nohup php googlehome_subscribe.php &

一些总结

  php 要把错误日志打开,方便收集 consumer 脚本 crash 的 log,脚本跑出一些致命的 error 一定要及时修复,因为一旦有错就会挂掉,这会影响你脚本的可用性,后期稳定之后可以上 supervisor 这种进程管理程序来管控脚本生命周期。
  一些网络请求操作,一定要 try catch 到所有错误,一旦没有 catch 到,脚本就崩。我用的是 Guzzle 去做的网络请求,下面是我 catch 的一些错误,代码片段供参考。

try
{
    /* TODO: 逻辑操作 */
}
catch ( ClientException $e )
{
    $results['mid']    = $this->mid;
    $results['code']   = $e->getResponse()->getStatusCode();
    $results['reason'] = $e->getResponse()->getReasonPhrase();
    $this->log->error( 'properties-changed ClientException', $results );
}
catch ( ServerException $e )
{
    $results['mid']    = $this->mid;
    $results['code']   = $e->getResponse()->getStatusCode();
    $results['reason'] = $e->getResponse()->getReasonPhrase();
    $this->log->error( 'properties-changed ServerException', $results );
}
catch ( ConnectException $e )
{
    $results['mid'] = $this->mid;
    $this->log->error( 'properties-changed ConnectException', $results );
}

  job 消费之后一定要删除掉,如果长时间不删除,php 客户端会有 false 返回,是因为有 DEADLINE_SOON 这个超时错误产生,所以处理完任务,一定要记得删除,这一点跟 kafka 不一样,beanstalkd 需要开发者自己去删除 job。

参考资料

Beanstalkd Message Queue
http://ongx1e5a0.bkt.clouddn.com/Beanstalk...

Beanstalkd FAQ 中文版
http://www.fzb.me/2015-7-31-beanstalkd-faq...

PHP消息队列beanstalkd
https://www.kancloud.cn/daiji/beanstalkd/7...

Beanstalkd-带你玩转消息队列
https://www.imooc.com/learn/912

我的博客地址:
http://yusure.cn/linux/257.html

本帖由系统于 10个月前 自动加精
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
讨论数量: 8

很好,消费脚本用 Supervisor 保平安呀,启动、停止、重启都很方便,还可以轻松实现,多进程消费

10个月前 评论

@Hachiko
先裸奔把 crash 的情况暴露出来,后面稳了再加。

10个月前 评论

这东西我也研究过,不过用的人太少了,目前没有遇到哪家公司用这个做消息队列

7个月前 评论

刚安装的,用laravel的事件队列,就出现这个错误

Argument 1 passed to Pheanstalk\Pheanstalk::__construct() must be an instance of Pheanstalk\Connection, string given,
我什么配置都没动啊,有人遇到过吗

5个月前 评论

@lun1bz 先看下 Pheanstalk 这个包的版本,再打印一下实例化时候,往里传的是什么

5个月前 评论

@Yusure 嗯,我没看清楚,用错版本了, :sweat_smile:5.7只能用~3.0 的

5个月前 评论

问一个小白的问题,什么时候执行队列任务呢,现在我将任务加入到队列后,立刻调用消费队列的方法,但因为是while的时候是死循环,就会报500的错误

file

2个月前 评论

@king-wang 先不着急消费,先查一下消息是否入到 queue 里面了,可以用这个工具 https://github.com/src-d/beanstool。确定进入到 queue 里面了,再运行 consumer 脚本

2个月前 评论
king-wang 2个月前
Yusure (作者) (楼主) 2个月前
king-wang 2个月前
Yusure (作者) (楼主) 2个月前
king-wang 2个月前
Yusure (作者) (楼主) 2个月前

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!