Laravel Command 让脚本业务消费使用多协程而不是多进程!

AI摘要
本文是一篇技术知识分享,详细介绍了如何利用Swoole协程技术对Laravel框架的Command命令进行高性能改造。文章对比了传统的多进程模式与多协程模式在内存占用、并发能力和切换开销上的差异,并提供了完整的架构设计、核心代码实现、使用示例及性能优化建议。核心要点包括使用连接池管理数据库连接、通过协程通道控制并发、避免使用Eloquent ORM以防止数据串扰,以及如何实现优雅退出和错误重试机制。该方案旨在显著提升IO密集型队列消费任务的性能,降低资源消耗。

通过 Swoole 多协程改造 Laravel 的 Command 命令

前言

在传统的 Laravel 队列消费场景中,我们通常使用多进程模式来提高并发处理能力。然而,多进程模式存在以下问题:

  1. 内存占用高:每个进程都需要加载完整的 Laravel 框架,内存占用通常在 50-100MB
  2. 进程切换开销:操作系统需要频繁进行进程上下文切换
  3. 扩展性受限:进程数量受限于服务器 CPU 核心数和内存大小

本文将介绍如何使用 Swoole 协程 改造 Laravel Command,实现单进程多协程的高性能队列消费。

技术对比

多进程模式 vs 多协程模式

对比项 多进程模式 多协程模式
内存占用 50-100MB/进程 10-20MB (总计)
并发能力 受限于 CPU 核心数 可达数千甚至上万
切换开销 较高(毫秒级) 极低(微秒级)
数据库连接 每进程独立连接 使用连接池共享
适用场景 CPU 密集型任务 IO 密集型任务

性能提升数据

在我们的实际项目中,使用协程改造后:

  • 内存占用降低 80%+:从 1GB(20进程)降至 200MB(单进程50协程)
  • 并发能力提升 10 倍+:从 20 并发提升至 200+ 并发
  • 响应速度提升 3-5 倍:得益于更低的切换开销

核心设计

架构设计

┌─────────────────────────────────────────┐
│       SwooleCoroutineCommand            │
  (协程命令基类 - 继承 Laravel Command)   │
└─────────────────────────────────────────┘
                    │
        ┌───────────┴───────────┐
        │                       │
┌───────▼──────┐      ┌─────────▼─────────┐
│  Redis 连接池 │      │  MySQL 连接池      │
  (共享)  (多库支持)        │
└───────┬──────┘      └─────────┬─────────┘
        │                       │
        └───────────┬───────────┘
                    │
        ┌───────────▼───────────┐
        │  50 个协程并发消费     │
          (协程通道控制并发)    │
        └───────────────────────┘

关键技术点

  1. 连接池管理:使用 MySQLPoolManagerRedisPool 管理数据库连接
  2. 协程通道:使用 Swoole\Coroutine\Channel 控制并发数量
  3. 优雅退出:通过信号处理实现优雅关闭
  4. 错误重试:内置重试机制,提高容错能力

核心代码实现

1. 业务处理接口

定义统一的业务处理接口,所有业务 Service 必须实现此接口:

<?php

declare(strict_types=1);

namespace App\Console;

interface SwooleCommandHandleInterface
{
    /**
     * 处理队列数据
     *
     * @param array $data 队列数据
     * @return void
     */
    public function handle(array $data);
}

2. 协程命令基类

SwooleCoroutineCommand 是所有协程命令的基类,提供了完整的协程管理和连接池功能:

<?php

declare(strict_types=1);

namespace App\Console;

use App\Support\MySQLPoolManager;
use App\Support\RedisPool;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Throwable;

/**
 * Swoole 协程化命令基类(使用连接池)
 *
 * 特性:
 * 1. 单进程多协程,大幅降低内存占用
 * 2. 使用 Swoole 原生连接池,支持多数据库连接
 * 3. 自动管理数据库和 Redis 连接,支持表前缀
 * 4. 优雅退出机制
 */
class SwooleCoroutineCommand extends Command
{
    /** @var string 默认队列名称 */
    public string $queueName = "default:list";

    /** @var int 协程并发数(相当于工作协程数量) */
    public int $coroutineNum = 50;

    /** @var bool 是否正在运行 */
    private bool $running = true;

    /** @var Channel 协程通道,用于控制并发 */
    private Channel $channel;

    /**
     * 使用协程池消费 Redis 队列
     *
     * @param SwooleCommandHandleInterface $handle
     * @return void
     */
    public function runCoroutine(SwooleCommandHandleInterface $handle): void
    {
        // 启用协程
        Coroutine::set([
            'hook_flags'    => SWOOLE_HOOK_ALL, // 一键协程化
            'max_coroutine' => 100000,        // 最大协程数
        ]);

        // 创建协程通道,用于控制并发数
        $this->channel = new Channel($this->coroutineNum);

        // 注册信号处理
        $this->registerSignal();

        $this->info("协程消费者启动,并发数: $this->coroutineNum, 队列: $this->queueName");

        // 启动消费者协程
        Coroutine\run(function () use ($handle) {
            // 初始化连接池
            $this->initPools();

            // 启动多个消费者协程
            for ($i = 0; $i < $this->coroutineNum; $i++) {
                Coroutine::create(function () use ($handle, $i) {
                    $this->consumerWorker($handle, $i);
                });
            }

            // 主协程等待所有工作协程结束
            while ($this->running) {
                Coroutine::sleep(1);
            }

            $this->info("正在等待所有协程结束...");
            // 等待通道清空
            while ($this->channel->length() > 0) {
                Coroutine::sleep(0.1);
            }

            // 关闭连接池
            $this->closePools();
            $this->info("所有协程已结束");
        });
    }

    /** @var array 需要初始化的数据库连接列表,子类可覆盖 */
    protected array $dbConnections = [];

    /**
     * 初始化连接池
     */
    private function initPools(): void
    {
        try {
            RedisPool::init();
            $this->info("Redis 连接池初始化成功");

            // 获取需要初始化的数据库连接列表
            $connections = $this->getDbConnections();

            foreach ($connections as $connection) {
                try {
                    MySQLPoolManager::init($connection);
                    $this->info("MySQL 连接池 [$connection] 初始化成功");
                } catch (Throwable $e) {
                    // 连接配置可能不完整,跳过
                    $this->warn("MySQL 连接池 [$connection] 初始化跳过: {$e->getMessage()}");
                }
            }

            $this->info("所有连接池初始化完成");
        } catch (Throwable $e) {
            $this->error("连接池初始化失败: {$e->getMessage()}");
            Log::notice(__METHOD__, [
                'file'    => $e->getFile(),
                'line'    => $e->getLine(),
                'message' => $e->getMessage(),
            ]);
            throw $e;
        }
    }

    /**
     * 获取需要初始化的数据库连接列表
     *
     * 优先级:
     * 1. 子类定义的 $dbConnections 属性
     * 2. 自动获取所有 MySQL 类型的连接
     *
     * @return array
     */
    protected function getDbConnections(): array
    {
        // 如果子类指定了连接列表,使用子类的
        if (!empty($this->dbConnections)) {
            return $this->dbConnections;
        }

        // 否则自动获取所有 MySQL 类型的连接
        $allConnections = config('database.connections', []);
        $mysqlConnections = [];

        foreach ($allConnections as $name => $config) {
            if ($name === 'mysql') {
                continue;
            }

            // 只初始化 MySQL 驱动的连接
            if (isset($config['driver']) && $config['driver'] === 'mysql') {
                $mysqlConnections[] = $name;
            }
        }

        return $mysqlConnections;
    }

    /**
     * 关闭连接池
     */
    private function closePools(): void
    {
        try {
            RedisPool::close();
            MySQLPoolManager::closeAll();
            $this->info("所有连接池已关闭");
        } catch (Throwable $e) {
            $this->error("关闭连接池失败: {$e->getMessage()}");
        }
    }

    /**
     * 消费者工作协程
     *
     * @param SwooleCommandHandleInterface $handle
     * @param int $workerId
     * @return void
     */
    private function consumerWorker(SwooleCommandHandleInterface $handle, int $workerId): void
    {
        $this->info("[Coroutine #$workerId] 启动,协程ID: " . Coroutine::getCid());

        while ($this->running) {
            try {
                // 从 Redis 队列获取数据(使用连接池)
                $result = $this->popFromQueue();

                if (empty($result)) {
                    // 队列为空,短暂休眠
                    Coroutine::sleep(0.5);
                    continue;
                }

                $this->info("[Coroutine #$workerId] 获取数据: {$result[1]}");

                // 解码数据
                $data = json_decode($result[1], true);
                if (!is_array($data)) {
                    $this->error("[Coroutine #$workerId] 数据格式错误");
                    continue;
                }

                // 处理业务逻辑
                $this->handleWithRetry($handle, $data, $workerId);

                $this->info("[Coroutine #$workerId] 处理完成");
            } catch (Throwable $e) {
                $this->error("[Coroutine #$workerId] 异常: {$e->getMessage()}");
                Log::error(__METHOD__, [
                    'worker_id' => $workerId,
                    'file'      => $e->getFile(),
                    'line'      => $e->getLine(),
                    'message'   => $e->getMessage(),
                    'trace'     => $e->getTraceAsString(),
                ]);

                // 异常后短暂休眠,避免频繁报错
                Coroutine::sleep(1);
            }
        }

        $this->info("[Coroutine #$workerId] 退出");
    }

    /**
     * 从队列中获取数据(使用连接池)
     *
     * @return array|null
     */
    private function popFromQueue(): ?array
    {
        try {
            // 使用 Redis 连接池
            return RedisPool::execute(function ($redis) {
                // brPop 返回 [队列名, 数据] 或 false
                $result = $redis->brPop($this->queueName, 1);
                return $result ?: null;
            });
        } catch (Throwable $e) {
            Log::error('Redis brPop 失败', [
                'message' => $e->getMessage(),
                'queue'   => $this->queueName,
            ]);
            return null;
        }
    }

    /**
     * 带重试的业务处理
     *
     * @param SwooleCommandHandleInterface $handle
     * @param array $data
     * @param int $workerId
     * @param int $maxRetry
     * @return void
     */
    private function handleWithRetry(
        SwooleCommandHandleInterface $handle,
        array $data,
        int $workerId,
        int $maxRetry = 3
    ): void {
        $retry = 0;
        while ($retry < $maxRetry) {
            try {
                // 执行业务逻辑
                $handle->handle($data);
                return;
            } catch (Throwable $e) {
                $retry++;
                $this->error("[Coroutine #$workerId] 处理失败 (重试 $retry/$maxRetry): {$e->getMessage()}");

                if ($retry >= $maxRetry) {
                    // 达到最大重试次数,记录日志
                    Log::error('业务处理失败,已达最大重试次数', [
                        'worker_id' => $workerId,
                        'data'      => $data,
                        'error'     => $e->getMessage(),
                        'file'      => $e->getFile(),
                        'line'      => $e->getLine(),
                    ]);
                } else {
                    // 重试前等待
                    Coroutine::sleep(1);
                }
            }
        }
    }

    /**
     * 注册信号处理(优雅退出)
     *
     * @return void
     */
    private function registerSignal(): void
    {
        // 注册 SIGTERM 信号(kill 命令)
        pcntl_signal(SIGTERM, function () {
            $this->info("收到 SIGTERM 信号,准备退出...");
            $this->running = false;
        });

        // 注册 SIGINT 信号(Ctrl+C)
        pcntl_signal(SIGINT, function () {
            $this->info("收到 SIGINT 信号,准备退出...");
            $this->running = false;
        });

        // 启用信号处理
        pcntl_async_signals(true);
    }

    /**
     * 获取队列长度(使用连接池)
     *
     * @return int
     */
    protected function getQueueLength(): int
    {
        try {
            return RedisPool::execute(function ($redis) {
                return $redis->lLen($this->queueName);
            });
        } catch (Throwable $e) {
            Log::notice(__METHOD__, [
                'file'    => $e->getFile(),
                'line'    => $e->getLine(),
                'message' => $e->getMessage(),
            ]);
            return 0;
        }
    }
}

3. 实际使用示例

3.1 创建命令类

<?php

namespace App\Console\Commands;

use App\Console\SwooleCoroutineCommand;
use App\Services\YourBusinessServiceV2;

class YourBusinessCommand extends SwooleCoroutineCommand
{
    protected $signature = 'your:business
                           {method : 执行的方法:pop、push}
                           {--coroutineNum=50 : 协程并发数}';

    protected $description = '业务队列消费(协程版本)';

    public function handle()
    {
        $method = $this->argument('method');
        $this->coroutineNum = (int) $this->option('coroutineNum');

        if ($method === 'pop') {
            $this->pop();
        } elseif ($method === 'push') {
            $this->push();
        }
    }

    /**
     * 协程消费
     */
    private function pop(): void
    {
        $this->queueName = 'your:queue:name';
        $this->info("开始协程消费,并发数: {$this->coroutineNum}");

        $service = app()->make(YourBusinessServiceV2::class);
        $this->runCoroutine($service);
    }

    /**
     * 指定需要初始化的数据库连接
     */
    protected function getDbConnections(): array
    {
        return ['mysql', '3d66_inspiration'];
    }
}

3.2 创建业务 Service

<?php

namespace App\Services;

use App\Console\SwooleCommandHandleInterface;
use App\Support\MySQLPoolManager;
use App\Support\RedisPool;

class YourBusinessServiceV2 implements SwooleCommandHandleInterface
{
    /**
     * 处理队列数据
     *
     * @param array $data
     * @return void
     */
    public function handle(array $data)
    {
        // 1. 从数据库查询数据(使用连接池 + 原生 PDO)
        $info = $this->getDataFromDb($data['id']);

        if (empty($info)) {
            return;
        }

        // 2. 业务处理逻辑
        $result = $this->processBusinessLogic($info);

        // 3. 保存结果到数据库
        $this->saveResult($info['id'], $result);
    }

    /**
     * 从数据库查询数据(使用连接池 + 原生 PDO)
     */
    private function getDataFromDb(int $id): ?array
    {
        return MySQLPoolManager::execute('3d66_inspiration', function ($pdo, $prefix) use ($id) {
            $table = $prefix . 'your_table';
            $sql = "SELECT * FROM {$table} WHERE id = ? LIMIT 1";
            $stmt = $pdo->prepare($sql);
            $stmt->execute([$id]);
            return $stmt->fetch(\PDO::FETCH_ASSOC) ?: null;
        });
    }

    /**
     * 保存结果(使用连接池 + 原生 PDO)
     */
    private function saveResult(int $id, array $result): void
    {
        MySQLPoolManager::execute('3d66_inspiration', function ($pdo, $prefix) use ($id, $result) {
            $table = $prefix . 'result_table';
            $sql = "INSERT INTO {$table} (data_id, result_data, status)
                    VALUES (?, ?, ?)
                    ON DUPLICATE KEY UPDATE
                        result_data = VALUES(result_data),
                        status = VALUES(status)";
            $stmt = $pdo->prepare($sql);
            $stmt->execute([
                $id,
                json_encode($result),
                1
            ]);
        });
    }

    /**
     * 使用 Redis 缓存(使用连接池)
     */
    private function getCachedData(string $key): ?string
    {
        return RedisPool::execute(function ($redis) use ($key) {
            return $redis->get($key) ?: null;
        });
    }

    /**
     * 业务逻辑处理
     */
    private function processBusinessLogic(array $info): array
    {
        // 这里实现你的业务逻辑
        return [
            'processed' => true,
            'timestamp' => time(),
        ];
    }
}

重要注意事项

1. 避免使用 Eloquent ORM

在协程模式下,不要使用 Eloquent ORM,原因如下:

Eloquent ORM 的问题

// ❌ 错误示例:Eloquent ORM 在协程下会出现数据串扰
$user = User::find($id);  // 协程 A
$user = User::find($id);  // 协程 B
// 两个协程可能共享同一个数据库连接,导致数据混乱

Eloquent ORM 的致命问题

  1. 数据串扰:多个协程共享同一个 Eloquent 模型实例,导致数据污染
  2. 连接混乱:Eloquent 使用单例模式管理数据库连接,协程间会相互影响
  3. 事务混乱:多个协程的事务操作会相互干扰
  4. 性能低下:ORM 层的额外开销在高并发下被放大

使用原生 PDO + 连接池

// ✅ 正确示例:使用原生 PDO + 连接池
$user = MySQLPoolManager::execute('mysql', function ($pdo, $prefix) use ($id) {
    $table = $prefix . 'users';
    $sql = "SELECT * FROM {$table} WHERE id = ? LIMIT 1";
    $stmt = $pdo->prepare($sql);
    $stmt->execute([$id]);
    return $stmt->fetch(\PDO::FETCH_ASSOC);
});

优势

  1. 协程安全:每次从连接池获取独立连接,完全隔离
  2. 性能高:无 ORM 开销,直接操作数据库
  3. 可控性强:完全掌控 SQL 执行过程
  4. 防注入:使用预处理语句,安全可靠

2. 避免类属性污染

在协程中,避免使用类属性存储临时数据

// ❌ 错误示例:类属性会被多个协程共享
class BadService implements SwooleCommandHandleInterface
{
    private array $userData = [];  // 危险!会被多个协程共享

    public function handle(array $data)
    {
        $this->userData = $this->getUser($data['user_id']);
        // 协程切换后,$this->userData 可能被其他协程覆盖
    }
}

// ✅ 正确示例:使用参数传递
class GoodService implements SwooleCommandHandleInterface
{
    public function handle(array $data)
    {
        $userData = $this->getUser($data['user_id']);
        $this->processUser($userData);  // 通过参数传递
    }

    private function processUser(array $userData): void
    {
        // 使用参数,而不是类属性
    }
}

3. 连接池配置

确保在 config/database.php 中配置了所需的数据库连接:

'connections' => [
    '3d66_inspiration' => [
        'driver'    => 'mysql',
        'host'      => env('DB_INSPIRATION_HOST', '127.0.0.1'),
        'port'      => env('DB_INSPIRATION_PORT', '3306'),
        'database'  => env('DB_INSPIRATION_DATABASE', 'inspiration'),
        'username'  => env('DB_INSPIRATION_USERNAME', 'root'),
        'password'  => env('DB_INSPIRATION_PASSWORD', ''),
        'charset'   => 'utf8mb4',
        'collation' => 'utf8mb4_unicode_ci',
        'prefix'    => 'll_',  // 表前缀会自动加上
        'strict'    => false,
    ],
],

4. 表前缀处理

连接池会自动处理表前缀,使用时无需手动添加:

// ✅ 正确:连接池会自动加上配置的前缀
MySQLPoolManager::execute('3d66_inspiration', function ($pdo, $prefix) use ($id) {
    $table = $prefix . 'your_table';  // 结果:ll_your_table
    // ...
});

// ❌ 错误:重复添加前缀
$table = $prefix . 'll_your_table';  // 结果:ll_ll_your_table

运行和监控

启动命令

# 启动协程消费(50 个协程并发)
php artisan your:business pop

# 指定协程数量
php artisan your:business pop --coroutineNum=100

# 推送数据到队列
php artisan your:business push

监控指标

# 查看队列长度
redis-cli LLEN your:queue:name

# 查看进程状态
ps aux | grep "your:business"

# 查看内存占用
top -p $(pgrep -f "your:business")

Supervisor 配置

[program:your-business-coroutine]
process_name=%(program_name)s
command=php /path/to/artisan your:business pop --coroutineNum=50
autostart=true
autorestart=true
user=www-data
numprocs=1
redirect_stderr=true
stdout_logfile=/path/to/logs/your-business.log
stopwaitsecs=60

性能优化建议

1. 合理设置协程数量

// CPU 密集型任务:协程数 = CPU 核心数 * 2
$this->coroutineNum = 16;

// IO 密集型任务:协程数可以设置更高
$this->coroutineNum = 100;

// 根据实际情况调整,监控 CPU 和内存使用率

2. 连接池大小配置

MySQLPoolManager 中配置合适的连接池大小:

// 推荐:连接池大小 = 协程数量 * 0.3 ~ 0.5
// 50 协程 -> 15-25 个数据库连接

3. 批量操作优化

// ✅ 批量插入
MySQLPoolManager::execute('mysql', function ($pdo, $prefix) use ($dataList) {
    $table = $prefix . 'your_table';
    $placeholders = implode(',', array_fill(0, count($dataList), '(?,?)'));
    $sql = "INSERT INTO {$table} (col1, col2) VALUES {$placeholders}";

    $params = [];
    foreach ($dataList as $item) {
        $params[] = $item['col1'];
        $params[] = $item['col2'];
    }

    $stmt = $pdo->prepare($sql);
    $stmt->execute($params);
});

常见问题

Q1: 协程数量设置多少合适?

A: 根据任务类型:

  • IO 密集型(数据库查询、API 调用):50-200
  • CPU 密集型(图像处理、加密运算):CPU 核心数 * 2
  • 混合型:30-100

建议:从小到大逐步调整,监控 CPU、内存和响应时间。

Q2: 为什么不能使用 Eloquent ORM?

A: Eloquent ORM 的设计不适合协程场景:

  • 单例模式的连接管理导致协程间共享连接
  • 模型实例在协程间共享会造成数据污染
  • 事务管理在协程下不可靠

使用原生 PDO + 连接池是协程模式下的最佳实践。

Q3: 如何处理事务?

A: 使用 PDO 的事务功能:

MySQLPoolManager::execute('mysql', function ($pdo, $prefix) use ($data) {
    try {
        $pdo->beginTransaction();

        // 执行多个 SQL 操作
        $stmt1 = $pdo->prepare("INSERT INTO ...");
        $stmt1->execute([...]);

        $stmt2 = $pdo->prepare("UPDATE ...");
        $stmt2->execute([...]);

        $pdo->commit();
    } catch (\Exception $e) {
        $pdo->rollBack();
        throw $e;
    }
});

Q4: 内存泄漏怎么办?

A: 注意以下几点:

  1. 避免在循环中创建大量对象
  2. 及时释放大变量:unset($largeData)
  3. 避免闭包捕获大量外部变量
  4. 定期重启进程(Supervisor 配置 autorestart=true

总结

通过 Swoole 协程改造 Laravel Command,我们实现了:

  1. 内存占用降低 80%+:从多进程的 GB 级别降至单进程的 MB 级别
  2. 并发能力提升 10 倍+:从进程数受限到协程数可达数千
  3. 更低的运维成本:单进程更易于管理和监控
  4. 更高的资源利用率:协程切换开销极低,CPU 利用率更高

适用场景

  • ✅ IO 密集型任务(数据库查询、API 调用)
  • ✅ 高并发队列消费
  • ✅ 实时数据处理
  • ❌ CPU 密集型计算(建议使用多进程)

核心原则

  1. 使用原生 PDO + 连接池,避免 Eloquent ORM
  2. 通过参数传递数据,避免类属性污染
  3. 合理设置协程数量,监控性能指标
  4. 做好异常处理和日志记录

希望本文能帮助你在 Laravel 项目中成功应用 Swoole 协程技术,实现性能的飞跃!

(内容由claude总结输出!!!)

相关链接

本作品采用《CC 协议》,转载必须注明作者和本文链接
不成大牛,不改個簽
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!