Laravel Command 让脚本业务消费使用多协程而不是多进程!
通过 Swoole 多协程改造 Laravel 的 Command 命令
前言
在传统的 Laravel 队列消费场景中,我们通常使用多进程模式来提高并发处理能力。然而,多进程模式存在以下问题:
- 内存占用高:每个进程都需要加载完整的 Laravel 框架,内存占用通常在 50-100MB
- 进程切换开销:操作系统需要频繁进行进程上下文切换
- 扩展性受限:进程数量受限于服务器 CPU 核心数和内存大小
本文将介绍如何使用 Swoole 协程 改造 Laravel Command,实现单进程多协程的高性能队列消费。
技术对比
多进程模式 vs 多协程模式
| 对比项 | 多进程模式 | 多协程模式 |
|---|---|---|
| 内存占用 | 50-100MB/进程 | 10-20MB (总计) |
| 并发能力 | 受限于 CPU 核心数 | 可达数千甚至上万 |
| 切换开销 | 较高(毫秒级) | 极低(微秒级) |
| 数据库连接 | 每进程独立连接 | 使用连接池共享 |
| 适用场景 | CPU 密集型任务 | IO 密集型任务 |
性能提升数据
在我们的实际项目中,使用协程改造后:
- 内存占用降低 80%+:从 1GB(20进程)降至 200MB(单进程50协程)
- 并发能力提升 10 倍+:从 20 并发提升至 200+ 并发
- 响应速度提升 3-5 倍:得益于更低的切换开销
核心设计
架构设计
┌─────────────────────────────────────────┐
│ SwooleCoroutineCommand │
│ (协程命令基类 - 继承 Laravel Command) │
└─────────────────────────────────────────┘
│
┌───────────┴───────────┐
│ │
┌───────▼──────┐ ┌─────────▼─────────┐
│ Redis 连接池 │ │ MySQL 连接池 │
│ (共享) │ │ (多库支持) │
└───────┬──────┘ └─────────┬─────────┘
│ │
└───────────┬───────────┘
│
┌───────────▼───────────┐
│ 50 个协程并发消费 │
│ (协程通道控制并发) │
└───────────────────────┘
关键技术点
- 连接池管理:使用
MySQLPoolManager和RedisPool管理数据库连接 - 协程通道:使用
Swoole\Coroutine\Channel控制并发数量 - 优雅退出:通过信号处理实现优雅关闭
- 错误重试:内置重试机制,提高容错能力
核心代码实现
1. 业务处理接口
定义统一的业务处理接口,所有业务 Service 必须实现此接口:
<?php
declare(strict_types=1);
namespace App\Console;
interface SwooleCommandHandleInterface
{
/**
* 处理队列数据
*
* @param array $data 队列数据
* @return void
*/
public function handle(array $data);
}
2. 协程命令基类
SwooleCoroutineCommand 是所有协程命令的基类,提供了完整的协程管理和连接池功能:
<?php
declare(strict_types=1);
namespace App\Console;
use App\Support\MySQLPoolManager;
use App\Support\RedisPool;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Throwable;
/**
* Swoole 协程化命令基类(使用连接池)
*
* 特性:
* 1. 单进程多协程,大幅降低内存占用
* 2. 使用 Swoole 原生连接池,支持多数据库连接
* 3. 自动管理数据库和 Redis 连接,支持表前缀
* 4. 优雅退出机制
*/
class SwooleCoroutineCommand extends Command
{
/** @var string 默认队列名称 */
public string $queueName = "default:list";
/** @var int 协程并发数(相当于工作协程数量) */
public int $coroutineNum = 50;
/** @var bool 是否正在运行 */
private bool $running = true;
/** @var Channel 协程通道,用于控制并发 */
private Channel $channel;
/**
* 使用协程池消费 Redis 队列
*
* @param SwooleCommandHandleInterface $handle
* @return void
*/
public function runCoroutine(SwooleCommandHandleInterface $handle): void
{
// 启用协程
Coroutine::set([
'hook_flags' => SWOOLE_HOOK_ALL, // 一键协程化
'max_coroutine' => 100000, // 最大协程数
]);
// 创建协程通道,用于控制并发数
$this->channel = new Channel($this->coroutineNum);
// 注册信号处理
$this->registerSignal();
$this->info("协程消费者启动,并发数: $this->coroutineNum, 队列: $this->queueName");
// 启动消费者协程
Coroutine\run(function () use ($handle) {
// 初始化连接池
$this->initPools();
// 启动多个消费者协程
for ($i = 0; $i < $this->coroutineNum; $i++) {
Coroutine::create(function () use ($handle, $i) {
$this->consumerWorker($handle, $i);
});
}
// 主协程等待所有工作协程结束
while ($this->running) {
Coroutine::sleep(1);
}
$this->info("正在等待所有协程结束...");
// 等待通道清空
while ($this->channel->length() > 0) {
Coroutine::sleep(0.1);
}
// 关闭连接池
$this->closePools();
$this->info("所有协程已结束");
});
}
/** @var array 需要初始化的数据库连接列表,子类可覆盖 */
protected array $dbConnections = [];
/**
* 初始化连接池
*/
private function initPools(): void
{
try {
RedisPool::init();
$this->info("Redis 连接池初始化成功");
// 获取需要初始化的数据库连接列表
$connections = $this->getDbConnections();
foreach ($connections as $connection) {
try {
MySQLPoolManager::init($connection);
$this->info("MySQL 连接池 [$connection] 初始化成功");
} catch (Throwable $e) {
// 连接配置可能不完整,跳过
$this->warn("MySQL 连接池 [$connection] 初始化跳过: {$e->getMessage()}");
}
}
$this->info("所有连接池初始化完成");
} catch (Throwable $e) {
$this->error("连接池初始化失败: {$e->getMessage()}");
Log::notice(__METHOD__, [
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]);
throw $e;
}
}
/**
* 获取需要初始化的数据库连接列表
*
* 优先级:
* 1. 子类定义的 $dbConnections 属性
* 2. 自动获取所有 MySQL 类型的连接
*
* @return array
*/
protected function getDbConnections(): array
{
// 如果子类指定了连接列表,使用子类的
if (!empty($this->dbConnections)) {
return $this->dbConnections;
}
// 否则自动获取所有 MySQL 类型的连接
$allConnections = config('database.connections', []);
$mysqlConnections = [];
foreach ($allConnections as $name => $config) {
if ($name === 'mysql') {
continue;
}
// 只初始化 MySQL 驱动的连接
if (isset($config['driver']) && $config['driver'] === 'mysql') {
$mysqlConnections[] = $name;
}
}
return $mysqlConnections;
}
/**
* 关闭连接池
*/
private function closePools(): void
{
try {
RedisPool::close();
MySQLPoolManager::closeAll();
$this->info("所有连接池已关闭");
} catch (Throwable $e) {
$this->error("关闭连接池失败: {$e->getMessage()}");
}
}
/**
* 消费者工作协程
*
* @param SwooleCommandHandleInterface $handle
* @param int $workerId
* @return void
*/
private function consumerWorker(SwooleCommandHandleInterface $handle, int $workerId): void
{
$this->info("[Coroutine #$workerId] 启动,协程ID: " . Coroutine::getCid());
while ($this->running) {
try {
// 从 Redis 队列获取数据(使用连接池)
$result = $this->popFromQueue();
if (empty($result)) {
// 队列为空,短暂休眠
Coroutine::sleep(0.5);
continue;
}
$this->info("[Coroutine #$workerId] 获取数据: {$result[1]}");
// 解码数据
$data = json_decode($result[1], true);
if (!is_array($data)) {
$this->error("[Coroutine #$workerId] 数据格式错误");
continue;
}
// 处理业务逻辑
$this->handleWithRetry($handle, $data, $workerId);
$this->info("[Coroutine #$workerId] 处理完成");
} catch (Throwable $e) {
$this->error("[Coroutine #$workerId] 异常: {$e->getMessage()}");
Log::error(__METHOD__, [
'worker_id' => $workerId,
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
'trace' => $e->getTraceAsString(),
]);
// 异常后短暂休眠,避免频繁报错
Coroutine::sleep(1);
}
}
$this->info("[Coroutine #$workerId] 退出");
}
/**
* 从队列中获取数据(使用连接池)
*
* @return array|null
*/
private function popFromQueue(): ?array
{
try {
// 使用 Redis 连接池
return RedisPool::execute(function ($redis) {
// brPop 返回 [队列名, 数据] 或 false
$result = $redis->brPop($this->queueName, 1);
return $result ?: null;
});
} catch (Throwable $e) {
Log::error('Redis brPop 失败', [
'message' => $e->getMessage(),
'queue' => $this->queueName,
]);
return null;
}
}
/**
* 带重试的业务处理
*
* @param SwooleCommandHandleInterface $handle
* @param array $data
* @param int $workerId
* @param int $maxRetry
* @return void
*/
private function handleWithRetry(
SwooleCommandHandleInterface $handle,
array $data,
int $workerId,
int $maxRetry = 3
): void {
$retry = 0;
while ($retry < $maxRetry) {
try {
// 执行业务逻辑
$handle->handle($data);
return;
} catch (Throwable $e) {
$retry++;
$this->error("[Coroutine #$workerId] 处理失败 (重试 $retry/$maxRetry): {$e->getMessage()}");
if ($retry >= $maxRetry) {
// 达到最大重试次数,记录日志
Log::error('业务处理失败,已达最大重试次数', [
'worker_id' => $workerId,
'data' => $data,
'error' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine(),
]);
} else {
// 重试前等待
Coroutine::sleep(1);
}
}
}
}
/**
* 注册信号处理(优雅退出)
*
* @return void
*/
private function registerSignal(): void
{
// 注册 SIGTERM 信号(kill 命令)
pcntl_signal(SIGTERM, function () {
$this->info("收到 SIGTERM 信号,准备退出...");
$this->running = false;
});
// 注册 SIGINT 信号(Ctrl+C)
pcntl_signal(SIGINT, function () {
$this->info("收到 SIGINT 信号,准备退出...");
$this->running = false;
});
// 启用信号处理
pcntl_async_signals(true);
}
/**
* 获取队列长度(使用连接池)
*
* @return int
*/
protected function getQueueLength(): int
{
try {
return RedisPool::execute(function ($redis) {
return $redis->lLen($this->queueName);
});
} catch (Throwable $e) {
Log::notice(__METHOD__, [
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]);
return 0;
}
}
}
3. 实际使用示例
3.1 创建命令类
<?php
namespace App\Console\Commands;
use App\Console\SwooleCoroutineCommand;
use App\Services\YourBusinessServiceV2;
class YourBusinessCommand extends SwooleCoroutineCommand
{
protected $signature = 'your:business
{method : 执行的方法:pop、push}
{--coroutineNum=50 : 协程并发数}';
protected $description = '业务队列消费(协程版本)';
public function handle()
{
$method = $this->argument('method');
$this->coroutineNum = (int) $this->option('coroutineNum');
if ($method === 'pop') {
$this->pop();
} elseif ($method === 'push') {
$this->push();
}
}
/**
* 协程消费
*/
private function pop(): void
{
$this->queueName = 'your:queue:name';
$this->info("开始协程消费,并发数: {$this->coroutineNum}");
$service = app()->make(YourBusinessServiceV2::class);
$this->runCoroutine($service);
}
/**
* 指定需要初始化的数据库连接
*/
protected function getDbConnections(): array
{
return ['mysql', '3d66_inspiration'];
}
}
3.2 创建业务 Service
<?php
namespace App\Services;
use App\Console\SwooleCommandHandleInterface;
use App\Support\MySQLPoolManager;
use App\Support\RedisPool;
class YourBusinessServiceV2 implements SwooleCommandHandleInterface
{
/**
* 处理队列数据
*
* @param array $data
* @return void
*/
public function handle(array $data)
{
// 1. 从数据库查询数据(使用连接池 + 原生 PDO)
$info = $this->getDataFromDb($data['id']);
if (empty($info)) {
return;
}
// 2. 业务处理逻辑
$result = $this->processBusinessLogic($info);
// 3. 保存结果到数据库
$this->saveResult($info['id'], $result);
}
/**
* 从数据库查询数据(使用连接池 + 原生 PDO)
*/
private function getDataFromDb(int $id): ?array
{
return MySQLPoolManager::execute('3d66_inspiration', function ($pdo, $prefix) use ($id) {
$table = $prefix . 'your_table';
$sql = "SELECT * FROM {$table} WHERE id = ? LIMIT 1";
$stmt = $pdo->prepare($sql);
$stmt->execute([$id]);
return $stmt->fetch(\PDO::FETCH_ASSOC) ?: null;
});
}
/**
* 保存结果(使用连接池 + 原生 PDO)
*/
private function saveResult(int $id, array $result): void
{
MySQLPoolManager::execute('3d66_inspiration', function ($pdo, $prefix) use ($id, $result) {
$table = $prefix . 'result_table';
$sql = "INSERT INTO {$table} (data_id, result_data, status)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
result_data = VALUES(result_data),
status = VALUES(status)";
$stmt = $pdo->prepare($sql);
$stmt->execute([
$id,
json_encode($result),
1
]);
});
}
/**
* 使用 Redis 缓存(使用连接池)
*/
private function getCachedData(string $key): ?string
{
return RedisPool::execute(function ($redis) use ($key) {
return $redis->get($key) ?: null;
});
}
/**
* 业务逻辑处理
*/
private function processBusinessLogic(array $info): array
{
// 这里实现你的业务逻辑
return [
'processed' => true,
'timestamp' => time(),
];
}
}
重要注意事项
1. 避免使用 Eloquent ORM
在协程模式下,不要使用 Eloquent ORM,原因如下:
Eloquent ORM 的问题
// ❌ 错误示例:Eloquent ORM 在协程下会出现数据串扰
$user = User::find($id); // 协程 A
$user = User::find($id); // 协程 B
// 两个协程可能共享同一个数据库连接,导致数据混乱
Eloquent ORM 的致命问题:
- 数据串扰:多个协程共享同一个 Eloquent 模型实例,导致数据污染
- 连接混乱:Eloquent 使用单例模式管理数据库连接,协程间会相互影响
- 事务混乱:多个协程的事务操作会相互干扰
- 性能低下:ORM 层的额外开销在高并发下被放大
使用原生 PDO + 连接池
// ✅ 正确示例:使用原生 PDO + 连接池
$user = MySQLPoolManager::execute('mysql', function ($pdo, $prefix) use ($id) {
$table = $prefix . 'users';
$sql = "SELECT * FROM {$table} WHERE id = ? LIMIT 1";
$stmt = $pdo->prepare($sql);
$stmt->execute([$id]);
return $stmt->fetch(\PDO::FETCH_ASSOC);
});
优势:
- 协程安全:每次从连接池获取独立连接,完全隔离
- 性能高:无 ORM 开销,直接操作数据库
- 可控性强:完全掌控 SQL 执行过程
- 防注入:使用预处理语句,安全可靠
2. 避免类属性污染
在协程中,避免使用类属性存储临时数据:
// ❌ 错误示例:类属性会被多个协程共享
class BadService implements SwooleCommandHandleInterface
{
private array $userData = []; // 危险!会被多个协程共享
public function handle(array $data)
{
$this->userData = $this->getUser($data['user_id']);
// 协程切换后,$this->userData 可能被其他协程覆盖
}
}
// ✅ 正确示例:使用参数传递
class GoodService implements SwooleCommandHandleInterface
{
public function handle(array $data)
{
$userData = $this->getUser($data['user_id']);
$this->processUser($userData); // 通过参数传递
}
private function processUser(array $userData): void
{
// 使用参数,而不是类属性
}
}
3. 连接池配置
确保在 config/database.php 中配置了所需的数据库连接:
'connections' => [
'3d66_inspiration' => [
'driver' => 'mysql',
'host' => env('DB_INSPIRATION_HOST', '127.0.0.1'),
'port' => env('DB_INSPIRATION_PORT', '3306'),
'database' => env('DB_INSPIRATION_DATABASE', 'inspiration'),
'username' => env('DB_INSPIRATION_USERNAME', 'root'),
'password' => env('DB_INSPIRATION_PASSWORD', ''),
'charset' => 'utf8mb4',
'collation' => 'utf8mb4_unicode_ci',
'prefix' => 'll_', // 表前缀会自动加上
'strict' => false,
],
],
4. 表前缀处理
连接池会自动处理表前缀,使用时无需手动添加:
// ✅ 正确:连接池会自动加上配置的前缀
MySQLPoolManager::execute('3d66_inspiration', function ($pdo, $prefix) use ($id) {
$table = $prefix . 'your_table'; // 结果:ll_your_table
// ...
});
// ❌ 错误:重复添加前缀
$table = $prefix . 'll_your_table'; // 结果:ll_ll_your_table
运行和监控
启动命令
# 启动协程消费(50 个协程并发)
php artisan your:business pop
# 指定协程数量
php artisan your:business pop --coroutineNum=100
# 推送数据到队列
php artisan your:business push
监控指标
# 查看队列长度
redis-cli LLEN your:queue:name
# 查看进程状态
ps aux | grep "your:business"
# 查看内存占用
top -p $(pgrep -f "your:business")
Supervisor 配置
[program:your-business-coroutine]
process_name=%(program_name)s
command=php /path/to/artisan your:business pop --coroutineNum=50
autostart=true
autorestart=true
user=www-data
numprocs=1
redirect_stderr=true
stdout_logfile=/path/to/logs/your-business.log
stopwaitsecs=60
性能优化建议
1. 合理设置协程数量
// CPU 密集型任务:协程数 = CPU 核心数 * 2
$this->coroutineNum = 16;
// IO 密集型任务:协程数可以设置更高
$this->coroutineNum = 100;
// 根据实际情况调整,监控 CPU 和内存使用率
2. 连接池大小配置
在 MySQLPoolManager 中配置合适的连接池大小:
// 推荐:连接池大小 = 协程数量 * 0.3 ~ 0.5
// 50 协程 -> 15-25 个数据库连接
3. 批量操作优化
// ✅ 批量插入
MySQLPoolManager::execute('mysql', function ($pdo, $prefix) use ($dataList) {
$table = $prefix . 'your_table';
$placeholders = implode(',', array_fill(0, count($dataList), '(?,?)'));
$sql = "INSERT INTO {$table} (col1, col2) VALUES {$placeholders}";
$params = [];
foreach ($dataList as $item) {
$params[] = $item['col1'];
$params[] = $item['col2'];
}
$stmt = $pdo->prepare($sql);
$stmt->execute($params);
});
常见问题
Q1: 协程数量设置多少合适?
A: 根据任务类型:
- IO 密集型(数据库查询、API 调用):50-200
- CPU 密集型(图像处理、加密运算):CPU 核心数 * 2
- 混合型:30-100
建议:从小到大逐步调整,监控 CPU、内存和响应时间。
Q2: 为什么不能使用 Eloquent ORM?
A: Eloquent ORM 的设计不适合协程场景:
- 单例模式的连接管理导致协程间共享连接
- 模型实例在协程间共享会造成数据污染
- 事务管理在协程下不可靠
使用原生 PDO + 连接池是协程模式下的最佳实践。
Q3: 如何处理事务?
A: 使用 PDO 的事务功能:
MySQLPoolManager::execute('mysql', function ($pdo, $prefix) use ($data) {
try {
$pdo->beginTransaction();
// 执行多个 SQL 操作
$stmt1 = $pdo->prepare("INSERT INTO ...");
$stmt1->execute([...]);
$stmt2 = $pdo->prepare("UPDATE ...");
$stmt2->execute([...]);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
throw $e;
}
});
Q4: 内存泄漏怎么办?
A: 注意以下几点:
- 避免在循环中创建大量对象
- 及时释放大变量:
unset($largeData) - 避免闭包捕获大量外部变量
- 定期重启进程(Supervisor 配置
autorestart=true)
总结
通过 Swoole 协程改造 Laravel Command,我们实现了:
- 内存占用降低 80%+:从多进程的 GB 级别降至单进程的 MB 级别
- 并发能力提升 10 倍+:从进程数受限到协程数可达数千
- 更低的运维成本:单进程更易于管理和监控
- 更高的资源利用率:协程切换开销极低,CPU 利用率更高
适用场景:
- ✅ IO 密集型任务(数据库查询、API 调用)
- ✅ 高并发队列消费
- ✅ 实时数据处理
- ❌ CPU 密集型计算(建议使用多进程)
核心原则:
- 使用原生 PDO + 连接池,避免 Eloquent ORM
- 通过参数传递数据,避免类属性污染
- 合理设置协程数量,监控性能指标
- 做好异常处理和日志记录
希望本文能帮助你在 Laravel 项目中成功应用 Swoole 协程技术,实现性能的飞跃!
(内容由claude总结输出!!!)
相关链接:
本作品采用《CC 协议》,转载必须注明作者和本文链接
关于 LearnKu