think-swoole协程部分实现原理

连接池以及如何保证每个协程使用独立的连接

  • 准备连接池

      $pool = new Smf\ConnectionPool\ConnectionPool(
          think\swoole\Pool::pullPoolConfig($config),
          $this->app->make($type),
          $config
      );
      $pools->add($name, $pool);
      //注入到app
      $this->app->instance("swoole.pool.{$name}", $pool);
  • 绑定连接池

      if ($this->getConfig('pool.db.enable', true)) {
          $this->app->bind('db', think\swoole\pool\Db::class);
      }
      if ($this->getConfig('pool.cache.enable', true)) {
          $this->app->bind('cache', think\swoole\pool\Cache::class);
      }
  • 使用数据库代理实现从连接池借用连接以及归还
    think\swoole\pool\Db 继承 think\Db 并重写了 createConnection方法

      use think\db\ConnectionInterface;
      use think\swoole\pool\proxy\Connection;
      use think\swoole\pool\Connector;
    
      protected function createConnection(string $name): Connection
      {
          return new Connection(new class(function () use ($name) {
              return parent::createConnection($name);
          }) extends Connector {
              public function disconnect($connection)
              {
                  if ($connection instanceof ConnectionInterface) {
                      $connection->close();
                  }
              }
          }, $this->config->get('swoole.pool.db', []));
      }

    Connection 是个继承了 ProxyDb 代理类,原本执行 Db 的地方改为执行 Proxy
    Proxy 的所有调用都会走 __call()->getPoolConnection()->method() ,比如Db::find(),变成了 Proxy::__call()->getPoolConnection()->find()getPoolConnection 在每个协程下 borrow 一次连接并记录到 Context,当协程结束的时候自动归还。
    借还代码如下

      protected function getPoolConnection()
      {
          return Context::rememberData('connection.' . spl_object_id($this), function () {
              $connection = $this->pool->borrow();
    
              $connection->{static::KEY_RELEASED} = false;
    
              Coroutine::defer(function () use ($connection) {
                  //自动释放
                  $this->releaseConnection($connection);
              });
    
              return $connection;
          });
      }

    DbModel 走连接池

  • think\facade\Db
    think\facade\Db创建的是一个think\DbManager 对象,对 DbManager 的任何调用都会先走到 connect 方法,connect 通过调用 createConnection 创建数据库连接。让 Db 走连接池的办法就是重写 createConnection 方法,把 createPdo 变成从连接池 borrow connection

      public function __call($method, $args)
      {
          return call_user_func_array([$this->connect(), $method], $args);
      }
      public function connect(string $name = null, bool $force = false)
      {
          return $this->instance($name, $force);
      }
      protected function instance(string $name = null, bool $force = false): ConnectionInterface
      {
          if (empty($name)) {
              $name = $this->getConfig('default', 'mysql');
          }
    
          if ($force || !isset($this->instance[$name])) {
              $this->instance[$name] = $this->createConnection($name);
          }
    
          return $this->instance[$name];
      }
  • think\Model
    think\Model 的任何调用都会先走到 db 方法,db() 代码如下

      $query = self::$db->connect($this->connection)
          ->name($this->name . $this->suffix)
          ->pk($this->pk);

    这里的 self::$db 是个 DbManager 对象,DbManager对数据库的任何操作都会走连接池,等于 Model 的任意操作也会走连接池。

      public function __call($method, $args)
      {
          if (isset(static::$macro[static::class][$method])) {
              return call_user_func_array(static::$macro[static::class][$method]->bindTo($this, static::class), $args);
          }
    
          return call_user_func_array([$this->db(), $method], $args);
      }
    
      public static function __callStatic($method, $args)
      {
          if (isset(static::$macro[static::class][$method])) {
              return call_user_func_array(static::$macro[static::class][$method]->bindTo(null, static::class), $args);
          }
    
          $model = new static();
    
          return call_user_func_array([$model->db(), $method], $args);
      }

    App-snapshot 让每个会话拥有一个独立的环境

  • InteractsWithHttp
    对http请求的处理放在 InteractsWithHttp,每接收到一个http请求,就会创建一个协程,然后使用 runInSandbox 处理业务代码。

      public function onRequest($req, $con)
      {
          Coroutine::create(function () use ($req, $res) {
              $this->runInSandbox(function (Http $http, Event $event, SwooleApp $app) use ($req, $res) {
                  $app->setInConsole(false);
    
                  $request = $this->prepareRequest($req);
    
                  try {
                      $response = $this->handleRequest($http, $request);
                  } catch (Throwable $e) {
                      $response = $this->app
                          ->make(Handle::class)
                          ->render($request, $e);
                  }
    
                  $this->setCookie($res, $app->cookie);
                  $this->sendResponse($res, $request, $response);
              });
          });
      }
  • Sandbox
    Sandbox 会找到当前http请求的根协程,如果没有手动创建协程的话,就是业务代码所在的协程。
    Sandbox 在根协程下 clone 一份 swooleApp,然后用克隆的App来执行业务代码。
    thinkphp 里容器相关的方法都是先走Container::getInstance(),包括facade,在think-swoole下, getInstanceSandbox 设置为匿名方法,代码如下

      Container::setInstance(function () {
          return $this->getApplication();
      });

    getApplication() 返回的是当前会话下根协程克隆的 App 对象。

基于上述原理,think-swoole下除了全局变量、静态变量,不用考虑变量污染的问题,也不必担心协程下数据库操作会混乱。

下面做一个简单的测试

<?php
namespace app\controller;

use app\BaseController;

class Index extends BaseController
{
    private int $index = 0;

    public function index(\app\Test $test)
    {
        return json([++$this->index, ++$test->value]);
    }
}

返回的永远是[1, 1]。

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!