基于 swoole 的连接池模型和异步任务小框架

前言

维护着一个很老的项目,基于CodeIgniter写的,几经易手之后,CI框架底层源码都被改得不能按照CI官方文档来操作了,非常头疼,其中某些部分的代码慢的要命,靠优化SQL语句,根据慢查询给旧表加索引之类的,虽然有一些成效,但是不能解决所有问题。几个月前就想自己撸一个异步任务处理的框架练练手,同时也希望能解决一下工作中的当务之急,于是就陆陆续续的写了一个,用来写入一些不是很重要的数据和不是很需要及时反馈给客户端的功能,类似日志类的数据和粗略统计用的数据。项目地址在这里

原理

使用 Swoole Server 作为处理任务的服务端,它是一个TCP Server,因此任何TCP Client 都可以向它投递任务,目前项目中只做了两种 Client 的实现。
整个任务的处理流程比较简单:Client--->send序列化之后的任务对象--->Server--->Receive任务--->反序列化--->对象运行任务

Client

一个 client 实例一次性可以 send 多个任务:

namespace TaskClient;
use Task\AbstractTask;
class CommonSender
{
    private $fp;
    public function __construct()
    {
        $config = require dirname(dirname(__DIR__)) . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'swoole.php';
        $this->fp = stream_socket_client("tcp://" . $config['ip'] . ":" . $config['swoole_task_server_port'], $errno, $errstr);
        if (!$this->fp) {
            echo "ERROR: $errno - $errstr\n";
        }
    }
    public function sendTask(AbstractTask $task)
    {
        $serializedTask = serialize($task);
        //Server在接收的时候如果Client没有释放连接,接收到的的数据将会作为一整块数据,这里拼接PHP_EOL
        //是为了隔开一个client实例多次发送的数据
        return fwrite($this->fp, $serializedTask . PHP_EOL, strlen($serializedTask) + 1);
    }
    public function sendCommand($command)
    {
        $serializedCommand = serialize($command);
        return fwrite($this->fp, $serializedCommand . PHP_EOL, strlen($serializedCommand) + 1);
    }
    public function __destruct()
    {
        fclose($this->fp);
    }
}

Task任务对象

所有的任务对象都继承自一个抽象类,在需要数据库连接资源时,由Server 注入即可(在序列化之前是不可以出现资源类型的,因为资源类型是不可以被序列化的),Server是常驻进程的所以其中的数据库连接能够做到有意义的“池”化,例程中使用了Medoo作为数据库包装类(本质上还是基于 PDO 的):

<?php

namespace Task;

use DB\MedooPDO;

abstract class AbstractTask
{
    protected $DB;//此处应该由server 来分配资源

    /**
     * 在server中可以注入资源
     */
    public function setDB(MedooPDO $db = null)
    {
        $this->DB = $db;
        return;
    }

    /**
     * 业务代码中应实现的具体方法
     * @return mixed
     */
    public abstract function handler();

    /**
     * DB 是否断线
     * @return bool
     */
    public function isDBGone()
    {
        //这里是基于Meddo框架的错误码,如果有其他实现,需要重写此方法
        return $this->DB->error()[1] == 2006;
    }
}

Server

代码较多就不贴了。
其中使用了一个静态数组作为“池”容器,在其中放入数据库连接对象,如果有可用对象则取出(pop)对象使用,使用完毕之后归还到“池”中(push);
Task 如果使用了数据库连接对象,失败时会先尝试从池中继续取其他可用对象,继续注入运行,如果对象全部都取完了,池子空了还仍然没有可用的数据库连接对象,那么就尝试重新创建,然后注入池中使用。

反思

数据库连接池在每个进程都有一个,假如进程数是m,池容量为n,那么将会创建m*n个mysql数据库连接,因为MySQL连接资源非常有限,这就造成了极大的浪费,应考虑减少这种浪费,例如将使用数据库放在某些进程中单独执行。

本作品采用《CC 协议》,转载必须注明作者和本文链接
每天进步一点点
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!