我坎坷的 swoole 协程之旅

Swoole 协程

协程可以理解为纯用户态的线程,其通过协作而不是抢占来进行切换。相对于进程或者线程,协程所有的操作都可以在用户态完成,创建和切换的消耗更低。协程主要用于优化IO操作频繁的任务,当然这个IO需要使用异步IO,能够yeild的异步IO。

yield 实现协程多任务调度

这里有两篇分享很好讲诉了使用yeild来实现生成器,从而实现协程多任务调度,PHP 多任务协程处理PHP 协程实现,借花献佛哈哈。主要分以下两步。
这个和Python的asyncio协程实现很像。asyncio.event_loop:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。asyncio.task:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态。

Task

Task 是普通生成器的装饰器。我们将生成器赋值给它的成员变量以供后续使用,然后实现一个简单的 run() 和 finished() 方法。run() 方法用于执行任务,finished() 方法用于让调度程序知道何时终止运行。

class Task
{
    protected $generator;

    protected $run = false;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run() 
    {
        if ($this->run) { //判断是否是第一次run,第一次用next那直接会跑到第二个yield
            $this->generator->next();
        } else {
            $this->generator->current();
        }

        $this->run = true;
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}

Scheduler

Scheduler 用于维护一个待执行的任务队列。run() 会弹出队列中的所有任务并执行它,直到运行完整个队列任务。如果某个任务没有执行完毕,当这个任务本次运行完成后,我们将再次入列。

class Scheduler
{
    protected $queue;

    public function __construct()
    {
        $this->queue = new SplQueue(); //FIFO 队列
    }

    public function enqueue(Task $task)
    {
        $this->queue->enqueue($task);
    }

    public function run()
    {
        while (!$this->queue->isEmpty()) {
            $task = $this->queue->dequeue();
            $task->run();

            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }
        }
    }
}

使用

$scheduler = new Scheduler();

$task1 = new Task(call_user_func(function() {
    for ($i = 0; $i < 3; $i++) {
        print "task1: " . $i . "\n";
        yield sleep(1); //挂起IO操作
    }
}));

$task2 = new Task(call_user_func(function() {
    for ($i = 0; $i < 6; $i++) {
        print "task2: " . $i . "\n";
        yield sleep(1); //挂起IO操作
    }
}));

$scheduler->enqueue($task1);
$scheduler->enqueue($task2);
$startTime = microtime(true);
$scheduler->run();
print "用时: ".(microtime(true) - $startTime);

执行结果

交替执行,task1执行到yeild交出控制权,轮到task2执行到yeild再交出控制权,再一次轮到task1,直到task1执行完,队列里只剩下task2自我陶醉了。
虽然执行结果是这样的,但是效果并不是我们想要的,执行了9秒那和我们同步执行有什么区别,因为sleep()是同步阻塞的,接下来我们把sleep换一下。

task1: 0
task1: 1
task2: 0
task2: 1
task1: 2
task2: 2
task2: 3
task2: 4
task2: 5
用时: 9.0115599632263

异步sleep

需要用到swoole,co::sleep()是swoole自带的异步sleep,go()是 swoole协程 的创建命令

function async_sleep($s){
    return  go(function ()use($s)  {
                co::sleep($s); // 模拟请求接口、读写文件等I/O
            }); 
}

$scheduler = new Scheduler();

$task1 = new Task(call_user_func(function() {
    for ($i = 0; $i < 3; $i++) {
        print "task1: " . $i . "\n";
        yield async_sleep(1);
    }
}));

$task2 = new Task(call_user_func(function() {
    for ($i = 0; $i < 6; $i++) {
        print "task2: " . $i . "\n";
        yield async_sleep(1);
    }
}));

$scheduler->enqueue($task1);
$scheduler->enqueue($task2);
$startTime = microtime(true);
$scheduler->run();
print "用时: ".(microtime(true) - $startTime);

执行结果,这应该就我们想要的IO操作异步并发,一共9个IO实际时间=1个IO,如果这个异步IO是异步mysql,异步http等就大大提升了我们脚本的并发能力

task1: 0
task2: 0
task1: 1
task2: 1
task1: 2
task2: 2
task2: 3
task2: 4
task2: 5
用时: 1.0025930404663

Swoole 协程

从4.0版本开始Swoole提供了完整的协程(Coroutine)+通道(Channel)特性。应用层可使用完全同步的编程方式,底层自动实现异步IO。这句话是swoole说的。

for ($i = 0; $i < 10; ++$i) {
    // swoole 创建协程
    go(function () use ($i) {
        co::sleep(1.0); // 模拟异步请求接口、读写文件等I/O
        var_dump($i);
    });
}
swoole_event_wait(); //阻塞等所有协程完成任务
print "协程用时: ".(microtime(true) - $time);

运行时间是1秒这里就不多说了。协程之所以快是因异步IO可以yield,但是我们平常使用的mysql请求,http请求等都是同步的,就算使用协程调度也提升不了并发,这不swoole提供了我们想要的东东。

Swoole 协程MySQL客户端

swoole的Coroutine\MySQL具体操作可以看这里,代码中举了异步和同步的mysql请求和并发试一下, dump需要引入symfony,方便打印对象的结构。

//异步mysql
function asyncMysql(){
    go(function () {
        $db = new \Swoole\Coroutine\Mysql();
        $server = array(
            'host' => '127.0.0.1',
            'user' => 'root',
            'password' => '123456',
            'database' => 'test',
            'port' => '3306',
        );
        $db->connect($server); //异步
        $result = $db->query('select * from users limit 1');
        // dump( $result);
    });
}
//同步msql
function synMysql(){
    $servername = "127.0.0.1";
    $username = "root";
    $password = "123456";
    $dbname = "test";
    $conn = mysqli_connect($servername, $username, $password, $dbname);

    if (!$conn) {
        die("连接失败: " . mysqli_connect_error());
    }

    $sql = "select * from users limit 1";
    $result = mysqli_query($conn, $sql);

    if (mysqli_num_rows($result) > 0) {
        while($row = mysqli_fetch_assoc($result)) {
            // dump($row);
        }
    } else {
        echo "0 结果";
    }

    mysqli_close($conn);
}

$startTime = microtime(true);

for($i=0;$i<100;$i++){
    asyncMysql();
}
swoole_event_wait();
$endTime = microtime(true);

dump($endTime-$startTime);

异步所花时间
0.029722929000854
0.017247200012207
0.029895067214966
0.024247884750366
同步所花时间
0.086297988891602
0.083254814147949
0.0831139087677
0.083254814147949

看运行时间不太对哈,这个怎么差了这么一点。我想的是这样的哈,Coroutine\MySQL 上面的例子异步IO操作应该是 connect 和 query,其他的例如创建客户端那就是同步操作了,这个消耗是同步阻塞的,而且占了比例不小,所以才出现这样的情况。
那想一下我们是不是可以这样写,把mysql异步客服端直接拿出来让协程共享。

function asyncMysql(){
    go(function(){
        $db = new \Swoole\Coroutine\Mysql();
        $server = array(
            'host' => '127.0.0.1',
            'user' => 'root',
            'password' => '4QqRbtNCc3LnHko4LQ9H',
            'database' => 'tracknumer_share',
            'port' => '3306',
        );   
        $db->connect($server); 
        $startTime = microtime(true);
        for($i=0;$i<10;$i++){
            go(function ()use($db) {
                $result = $db->query('select * from users limit 1');
            });
        }
        swoole_event_wait();
        $endTime = microtime(true);
        dump($endTime-$startTime);
    });
}
[2019-04-30 11:23:36 @4769.0]   ERROR   check_bind (ERROR 10002): mysql client has already been bound to another coroutine#2, reading or writing of the same socket in multiple coroutines at the same time is not allowed.
Stack trace:
#0  Swoole\Coroutine\MySQL->query() called at [/data/web/dev/swoole-demo/src/Coroutine/mysql.php:44]

哦天哪发生了什么,报错了,它说这个mysql客户端已经有其他协程占用了。是我太天真的了。官网说swoole这样做是为了防止多个协程同一时刻使用同一个客户端导致数据错乱。
那我们就简单实现一个mysql的连接池,复用协程客户端,实现长连接。

Swoole 协程MySQL连接池

<?php 
require __DIR__ . '/../bootstrap.php';
class MysqlPool
{
    protected $available = true;
    public $pool;
    protected $config; //mysql服务的配置文件
    protected $max_connection = 50;//连接池最大连接 
    protected $min_connection = 20;
    protected $current_connection = 0;//当前连接数

    public function __construct($config)
    {
        $this->config = $config;
        $this->pool   = new SplQueue;
        $this->initPool();
    }
    public function initPool(){
        go(function () {
            for($i=1;$i<=$this->min_connection;$i++){
                $this->pool->push($this->newMysqlClient());
            }
        });
    }
    public function put($mysql)
    {
        $this->pool->push($mysql);
    }

    /**
     * @return bool|mixed|\Swoole\Coroutine\Mysql
     */
    public function get()
    {
        //有空闲连接且连接池处于可用状态
        if ($this->available && $this->pool->length > 0) {
            return $this->pool->pop();
        }

        //无空闲连接,创建新连接
        $mysql = $this->newMysqlClient();
        if ($mysql == false) {
            return false;
        } else {
            return $mysql;
        }
    }

    protected function newMysqlClient()
    {

        if($this->current_connection >= $this->max_connection){
            throw new Exception("链接池已经满了"); 
        }
        $this->current_connection++;
        $mysql = new Swoole\Coroutine\Mysql();
        $mysql->connect($this->config); 
        return $mysql;
    }

    public function destruct()
    {
        // 连接池销毁, 置不可用状态, 防止新的客户端进入常驻连接池, 导致服务器无法平滑退出
        $this->available = false;
        while (!$this->pool->isEmpty()) {
            go(function(){
                $mysql = $this->pool->pop();
                $mysql->close();
            });
        }
    }

    public function __destruct(){
        $this->destruct();
    }
}

$config = array(
            'host' => '127.0.0.1',
            'user' => 'root',
            'password' => '123456',
            'database' => 'test',
            'port' => '3306',
        );

$pool = new MysqlPool($config);

好了,一个简单的连接池已经搞好了,我先用一下


go(function()use($config){
    $pool = new MysqlPool($config);
    for($i=0;$i<2;$i++){
        go(function ()use($pool) {
            $mysql = $pool->get();
            $result = $mysql->query('select * from users limit 1');
            dump($result);
            $pool->put($mysql);
        });
    }
    dump($pool);

});

好了结果出来了,新增一个defer(),在协程推出之前释放连接池的资源。


go(function()use($pool){ 
    $pool = new MysqlPool($config);
    defer(function () use ($pool) { //用于资源的释放, 会在协程关闭之前(即协程函数执行完毕时)进行调用, 就算抛出了异常, 已注册的defer也会被执行.
        echo "Closing connection pool\n";
        $pool->destruct();
    });
    for($i=0;$i<2;$i++){
        go(function ()use($pool) {
            $mysql = $pool->get();
            $result = $mysql->query('select * from users limit 1');
            dump($result);
            $pool->put($mysql);
        });
    }
     dump($pool);
});

这个有一个比较完善的 协程客户端链接池包

Swoole 协程 Channel 实现并发数据收集

这里使用子协程+通道来并发收集数据,理想的情况是使用连接池。

//每个子进程创建一个mysql连接
go(function()use($pool,$config){
    $chan = new chan(10);
    for($i=0;$i<2;$i++){
        go(function()use($pool,$chan,$config){
            $mysql = new \Swoole\Coroutine\Mysql();
            $mysql->connect($config); 
            $result = $mysql->query('select * from users limit 1');
            $chan->push($result);
            $mysql->close();
        });
    }

    for($i=0;$i<2;$i++){
        dump($chan->pop());//这个pop()如果遇到空会yield,直到子协程的push()数据之后才会重新唤醒
    }

});
//使用连接池
go(function()use($config){
    $pool = new MysqlPool($config);
    defer(function () use ($pool) { //用于资源的释放, 会在协程关闭之前(即协程函数执行完毕时)进行调用, 就算抛出了异常, 已注册的defer也会被执行.
        echo "Closing connection pool\n";
        $pool->destruct();
    });
    $chan = new chan(10);
    for($i=0;$i<2;$i++){
        go(function()use($pool,$chan,$config){
            $mysql = $pool->get();
            $result = $mysql->query('select * from users limit 1');
            $chan->push($result);
            $pool->put($mysql);
        });
    }
    for($i=0;$i<2;$i++){
        dump($chan->pop());//这个pop()如果遇到空会yield,直到子协程的push()数据之后才会重新唤醒
    }
});

过了一圈swoole协程感觉还是没有Python的asyncio包好用,有些地方总是搞不明白,希望各位大佬不吝指教。原链接

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 4年前 自动加精
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 5
小滕

连接池的逻辑有点问题。连接池是预先创建好一定数量的连接,然后在这个数量下进行分配,如果已经分配完毕当再有进程从连接池读取连接的时候就会阻塞。这里,你在没有多余的连接资源的时候回重新创建,其实就有问题的。如果一个应用跑1w个协程都需要连接,那么就要创建1w个连接了。另外一点需要注意的是在用完连接之后必须push回去,这样那些陷入阻塞的中的进程能够复用连接。

4年前 评论

@小滕 是的,协程客户端链接池 这里有一个比较完善的。就按照您说的会定义minActive,maxActive 两个参数。minActive用于连接池创建的时候初始化的链接数,如果不够用则继续创建直到连接池满了

4年前 评论

我觉得我理解起来好难,不好理解,后面的例子都没怎么看懂

4年前 评论

@wojianduanfa_sxm_87 我讲的太粗了,可以多看几遍协程原理

4年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
1
粉丝
2
喜欢
21
收藏
21
排名:780
访问:8823
私信
所有博文
博客标签
社区赞助商