开源轻量级 PHP 数据库 ORM 框架 ycdb (高级) : 构建稳定的数据库 / 缓存连接池

目录:

  • Instruction
  • Requirement
  • 创建测试表
  • 在linux中编译ycdb
  • Start ycdatabase
  • 初始化ycdb连接
  • 原生SQL执行
  • 错误处理
  • Where 语句
  • Select 语句
  • Insert 语句
  • Replace 语句
  • Update 语句
  • Delete 语句
  • 完整例句
  • 数据库事务
  • 数据缓存
  • MySQL数据库连接池
  • Redis 连接池方案

MySQL数据库连接池

php数据库连接池的缺陷

    短连接性能普遍上不去,CPU 大量资源被系统消耗,网络一旦抖动,会有大量 TIME_WAIT 产生,不得不定期重启服务或定期重启机器,服务器工作不稳定,QPS 忽高忽低,稳定高效的连接池可以有效的解决上述问题,它是高并发的基础。

    ycdb通过一种特殊的方式来建立一个稳定的与MySQL之间的连接池,性能至少提升30%,按照 PHP 的运行机制,长连接在建立之后只能寄居在工作进程之上,也就是说有多少个工作进程,就有多少个长连接,打个比方,我们有 10 台 PHP 服务器,每台启动 1000 个 PHP-FPM 工作进程,它们连接同一个 MySQL 实例,那么此 MySQL 实例上最多将存在 10000 个长连接,数量完全失控了!而且PHP的连接池心跳机制不完善。

解决方案

源码 github 地址:  https://github.com/caohao-php/ycdatabase

我们不妨绕着走。让我们把目光聚焦到 Nginx 的身上,其 stream 模块实现了 TCP/UDP 服务的负载均衡,同时借助 stream-lua 模块,我们就可以实现可编程的 stream 服务,也就是用 Nginx 实现自定义的 TCP/UDP 服务!当然你可以自己从头写 TCP/UDP 服务,不过站在 Nginx 肩膀上无疑是更省时省力的选择。我们可以选择 OpenResty 库来完成MySQL的连接池功能,OpenResty是一个非常强大,而且功能完善的Nginx Lua框架,他封装了Socket、MySQL, Redis, Memcache 等操作,可是 Nginx 和 PHP 连接池有什么关系?且听我慢慢道来:通常大部分 PHP 是搭配 Nginx 来使用的,而且 PHP 和 Nginx 多半是在同一台服务器上。有了这个客观条件,我们就可以利用 Nginx 来实现一个连接池,在 Nginx 上完成连接 MySQL 等服务的工作,然后 PHP 通过本地的 Unix Domain Socket 来连接 Nginx,如此一来既规避了短链接的种种弊端,也享受到了连接池带来的种种好处。

OpenResty 安装
OpenResty 文档: https://moonbingbing.gitbooks.io/openresty-best-practices/content/openresty/install_on_centos.html

CentOS 6.8 下的安装:

安装必要的库

$ yum install readline-devel pcre-devel openssl-devel perl

安装 OpenResty
$ cd ~/ycdatabase/openresty
$ tar -xzvf openresty-1.13.6.1.tar.gz
$ cd openresty-1.13.6.1
$ ./configure --prefix=/usr/local/openresty.1.13 --with-luajit --without-http_redis2_module --with-http_iconv_module
$ gmake 
$ gmake install
打开数据库连接池
$ cp -rf ~/ycdatabase/openresty/openresty-pool ~/
$ /usr/local/openresty.1.13/nginx/sbin/nginx -p ~/openresty-pool

MySQL数据库连接池配置

~/openresty-pool/conf/nginx.conf , 
如果你有多个 MySQL, 你可以另起一个 server , 并在listen unix 之后新增一个新的unix domain socket监听。

worker_processes  1;        #nginx worker 数量

error_log logs/error.log;   #指定错误日志文件路径

events {
    worker_connections 1024;
}

stream {
  lua_code_cache on;

  lua_check_client_abort on;

  server {
    listen unix:/tmp/mysql_pool.sock;

    content_by_lua_block {
      local mysql_pool = require "mysql_pool"

      local config = {host = "127.0.0.1", 
                      user = "root", 
                      password = "test", 
                      database = "collect", 
                      timeout = 2000, 
                      max_idle_timeout = 10000, 
                      pool_size = 200}

      pool = mysql_pool:new(config)

      pool:run()
    }
  }
}

PHP代码

  • 除了option 配置为 array("unix_socket" => "/tmp/mysql_pool.sock") 之外,php的mysql连接池使用方法和之前一模一样,另外, unix_socket 方式的 MySQL 不支持事务。
$option = array("unix_socket" => "/tmp/mysql_pool.sock");
$ycdb = new ycdb($option);
$ret = $ycdb->select("user_info_test", "*", ["sexuality" => "male"]);

if($ret == -1) {
  $code = $ycdb->errorCode();
  $info = $ycdb->errorInfo();
  echo "code:" . $code . "\n";
  echo "info:" . $info[2] . "\n";
} else {
  print_r($ret);
}

Lua数据库连接池代码

 ~/openresty-pool/mysql_pool.lua

local mysql = require "resty.mysql"
local cjson = require "cjson"

local assert = assert
local setmetatable = setmetatable
local tonumber = tonumber

-- 解析请求
local function parse_request(sock)
    --获取 sql 语句
    local sql_size, err = sock:receive()

    if not sql_size then
        if err == "timeout" then
            sock:close()
        end
        return nil, err
    end

    local size = tonumber(sql_size)
    if size <= 0 then
        return nil, "SQL size is zero"
    end

    local sql_str, err = sock:receive(size)
    if not sql_str then
        if err == "timeout" then
            sock:close()
        end
        return nil, err
    end

    --获取 map
    local map_size, err = sock:receive()

    if not map_size then
        if err == "timeout" then
            sock:close()
        end
        return nil, err
    end

    size = tonumber(map_size);
    if size <= 0 then
        -- 没有 map
        return sql_str
    end

    local map_res, err = sock:receive(map_size)
    if not map_res then
        if err == "timeout" then
            sock:close()
        end
        return nil, err
    end

    -- 解析 map ,创建 SQL 语句,防止SQL注入
    local maps = cjson.decode(map_res)

    for k, v in pairs(maps) do
        if v == true then
            v = 1
        elseif v == false then
            v = 0
        end

        sql_str = ngx.re.gsub(sql_str, k, ngx.quote_sql_str(v))
    end

    return sql_str
end

-- 返回请求
local function response_success(sock, result)
    local ret = {
        errno = 0,
        data = result
    }

    local send_str = cjson.encode(ret)

    local ret, err = sock:send(string.len(send_str) ..  "\n" .. send_str)

    if not ret then
        ngx.log(ngx.ERR, "response success failed : [", err, "], send_str=[", send_str, "]")
        return nil, err
    end
end

-- 返回请求
local function response_error(sock, errno, errmsg, sqlstate)
    local ret = {
        errno = errno,
        errorCode = sqlstate,
        errorInfo = {sqlstate, errno, errmsg}
    }

    local send_str = cjson.encode(ret)
    local ret, err = sock:send(string.len(send_str) ..  "\n" .. send_str)

    if not ret then
        ngx.log(ngx.ERR, "response error failed : [", err, "], send_str=[", send_str, "]")
        return nil, err
    end
end

-- 关闭数据库
local function close_db(db)
    if not db then
        return
    end
    db:close()
end

-- 异常退出
local function exit(err)
    ngx.log(ngx.ERR, "ERROR EXIT: [", err, "]")
    return ngx.exit(ngx.ERROR)
end

----------------------------------------
local _M = {}
_M._VERSION = "1.0"

function _M.new(self, config)
    local t = {
        _host = config.host,
        _port = config.port or 3306,
        _user = config.user,
        _password = config.password,
        _database = config.database,
        _timeout = config.timeout or 2000,  -- default 2 sec
        _pool_size = config.pool_size or 100,
        _max_idle_timeout = config.max_idle_timeout or 10000
    }

    return setmetatable(t, { __index = _M })
end

function _M.run(self)
    local downstream_sock = assert(ngx.req.socket(true))

    local query_sql, err = parse_request(downstream_sock)

    if not query_sql then
        return exit("parse_request failed : " .. err)
    end

    --数据库连接
    local db, err = mysql:new()

    db:set_timeout(self._timeout)

    local ok, err, errno, sqlstate = db:connect{
        host = self._host,
        port = self._port,
        database = self._database,
        user = self._user,
        password = self._password,
        max_packet_size = 1024 * 1024}

    if not ok then
        response_error(downstream_sock, -1, err, "E0001")
        return exit("connect mysql error : " .. err)
    end

    local result, err, errno, sqlstate = db:query(query_sql)

    -- 返回结果
    if result then
        response_success(downstream_sock, result)
    else
        ngx.log(ngx.ERR, "query failed: [", errno, "][", sqlstate, "][",err , "]")
        response_error(downstream_sock, errno, err, sqlstate)
    end

    -- 设置 mysql 连接池
    local ok, err = db:set_keepalive(self._max_idle_timeout, self._pool_size)
    if not ok then
        ngx.log(ngx.ERR, "set_keepalive failed: [", err, "]")
    end
end

return _M

Redis连接池方案
同理,Redis也可以采用相同的方法解决连接池问题。

Redis连接池配置

~/openresty-pool/conf/nginx.conf , 

worker_processes  1;        #nginx worker 数量

error_log logs/error.log;   #指定错误日志文件路径

events {
    worker_connections 1024;
}

stream {
    lua_code_cache on;

    lua_check_client_abort on;

    server {
        listen unix:/tmp/redis_pool.sock;

        content_by_lua_block {
            local redis_pool = require "redis_pool"

            pool = redis_pool:new({ip = "127.0.0.1", port = 6379, auth = "password"})

            pool:run()
        }
    }

    server {

        listen unix:/tmp/mysql_pool.sock;

        content_by_lua_block {
            local mysql_pool = require "mysql_pool"

            local config = {host = "127.0.0.1", 
                            user = "root", 
                            password = "test", 
                            database = "collect", 
                            timeout = 2000, 
                            max_idle_timeout = 10000, 
                            pool_size = 200}

            pool = mysql_pool:new(config)

            pool:run()
        }
    }
}

PHP代码

$redis = new Redis();
$redis->pconnect('/tmp/redis_pool.sock');
var_dump($redis->hSet("foo1", "vvvvv42", 2));
var_dump($redis->hSet("foo1", "vvvv", 33));
var_dump($redis->expire("foo1", 111));
var_dump($redis->hGetAll("foo1"));

Redis连接池Lua代码

 ~/openresty-pool/redis_pool.lua

local redis = require "resty.redis"

local assert = assert
local rawget = rawget
local setmetatable = setmetatable
local tonumber = tonumber
local byte = string.byte
local sub = string.sub

-- 解析请求
local function parse_request(sock)
    local line, err = sock:receive()

    if not line then
        if err == "timeout" then
            sock:close()
        end
        return nil, err
    end

    local result = line .. "\r\n"
    local prefix = byte(line)

    if prefix == 42 then -- char '*'
        local num = tonumber(sub(line, 2))
        if num <= 0 then
            return result
        end

        for i = 1, num do
            local res, err = parse_request(sock)
            if res == nil then
                return nil, err
            end
            result = result .. res
        end

    elseif prefix == 36 then -- char '$'
        local size = tonumber(sub(line, 2))
        if size <= 0 then
            return result
        end

        local res, err = sock:receive(size)
        if not res then
            return nil, err
        end

        local crlf, err = sock:receive(2)
        if not crlf then
            return nil, err
        end

        result = result .. res .. crlf
    end

    return result
end

-- 异常退出
local function exit(err)
    ngx.log(ngx.ERR, "Redis ERROR EXIT: [", err, "]")
    return ngx.exit(ngx.ERROR)
end

----------------------------------------
local _M = {}
_M._VERSION = "1.0"

function _M.new(self, config)
    local t = {
        _ip = config.ip or "127.0.0.1",
        _port = config.port or 6379,
        _auth = config.auth,
        _timeout = config.timeout or 1000,  -- default 1 sec
        _pool_size = config.pool_size or 100,
        _max_idle_timeout = config.max_idle_timeout or 10000
    }
    return setmetatable(t, { __index = _M })
end

function _M.run(self)
    local downstream_sock = assert(ngx.req.socket(true))

    -- 解析客户端请求
    local res, err = parse_request(downstream_sock)
    if not res then
        return exit("parse_request failed : " .. err)
    end

    -- 创建 redis 连接
    local red = redis:new()

    red:set_timeout(self._timeout)

    local ok, err = red:connect(self._ip, self._port)
    if not ok then
        return exit(err)
    end

    -- redis auth 授权
    if self._auth then
        local times = assert(red:get_reused_times())
        if times == 0 then
            local ok, err = red:auth(self._auth)
            if not ok then
                return exit("auth failed : " .. err)
            end
        end
    end

    -- 发送请求到 redis 
    local upstream_sock = rawget(red, "_sock")
    upstream_sock:send(res)

    -- 接收 redis 应答,并解析
    local res, err = parse_request(upstream_sock)
    if not res then
        return exit("receive from redis server error: " .. err)
    end

    -- 发送应答给客户端
    downstream_sock:send(res)

    -- 设置 redis 连接池
    local ok, err = red:set_keepalive(self._max_idle_timeout, self._pool_size)
    if not ok then
        ngx.log(ngx.ERR, "redis set_keepalive failed: [", err, "]")
    end
end

return _M
本作品采用《CC 协议》,转载必须注明作者和本文链接
lizhiqiang666
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 1

这个方案也是个不错的解决方案,不过我还是更喜欢 mysql 中间件这种解决方案,db层干啥我不想多余关注,服务之间全无干扰

5年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
技术负责人 @ 某某
文章
91
粉丝
209
喜欢
906
收藏
1029
排名:25
访问:24.1 万
私信
所有博文
社区赞助商