基于 Hyperf ,进行便捷的上下文和协程管理,实现 伪事务 般的defer应用和请求级上下文

hyperf Hyperf go Coroutine defer

场景

一个API项目,日常写代码过程中,在需要进行上下文设置时,当然是字面意思,但有时候,一个API动作的完成,可能需要有一些主动发起的协程调度,如 CSP 之类,或者 第三方耗时API 需要在事务最后进行,这些实践起来,虽然原生方法能够解决,但并不优雅,我对其进行了一定的改造,以实现:

  • Db事务中,将第三方耗时API请求、MQ投递 放在 EasyCo::defer 里,在Db事务结束时,commit就正常commit,但 rollBack 时,可以通过 EasyCo::deferRollBack 取消本次动作中注册的defer
  • 在一次 API动作 实现的过程中,通过 EasyCtx::set 设定的上下文变量,可以在本次请求里,通过任何 EasyCo::create 创建的协程里,EasyCtx::get 拿到,不需要进行主动的协程use和赋值

应用Demo

defer 的伪事务

//伪代码
Db::beginTransaction();
try {

    //TODO 业务操作
    make(UserService::class)->bsAction();

    //耗时操作
    // - 第三方API 联动业务
    // - 投递MQ
    EasyCoroutine::easyDeferTrans(function (){
        //你的耗时操作
    });


    Db::commit();
}catch (\Throwable $exception){
    Db::rollBack();
    EasyCoroutine::easyDeferRollBack();
}        

通过这样的姿势,在一些你本来就封装了事务操作A,被嵌套在别人的复合事务操作B时,你的A就不是最终执行事务了,PHP本身没有嵌套事务,这样的嵌套只会合并为一个大事务进行执行,只需要在任意的 Db::rollBack() 时,同步执行 EasyCoroutine::easyDeferRollBack() ,即可取消掉本次注册的defer

请求级上下文

EasyContext::easySet('testData',1123123);

EasyCoroutine::easyCreate(function (){
    EasyContext::easyGet('testData');
});

通过这样的姿势,只要是 EasyCtx::set 的变量,必定可在 EasyCo::create 创建的协程中,通过 EasyCtx::get 进行获取,即可完成请求级的上下文管理,并且只是赋值,无地址干扰操作


源码

EasyCtx

<?php
/**
 * @contact  411906015@qq.com
 * @Author   liShun
 */
declare (strict_types=1);

namespace App\Utils;

use Hyperf\Context\Context;
use Hyperf\Utils\Str;
use Hyperf\WebSocketServer\Context as WsContext;

/**
 * 重载上下文管理 接入协程上下文 连接级上下文
 * 也可用作自己的上下文代码类似 EasyCoroutine 中的 deferTrans
 * @method static wsSet(string $id, $value)
 * @method static wsGet(string $id, $default = null, $fd = null)
 * @method static wsHas(string $id, $fd = null)
 * @method static wsDestroy(string $id)
 * @method static wsRelease(?int $fd = null)
 * @method static wsCopy(int $fromFd, array $keys = []): void
 * @method static wsOverride(string $id, \Closure $closure)
 * @method static wsGetOrSet(string $id, $value)
 * @method static wsGetContainer()
 * @mixin Context
 */
class EasyContext
{

    /**
     * 重载框架的协程上下文调度 综合了协程与ws 方便操作
     * @param $name
     * @param $arguments
     * @return mixed
     */
    public static function __callStatic($name, $arguments)
    {
        if (Str::startsWith(strtolower($name), 'ws')) {

            //非ws环境中,使用ws上下文会导致内存泄漏,请谨慎使用
            $wsMethodMap = [
                'wsSet'          => 'set',
                'wsGet'          => 'get',
                'wsHas'          => 'has',
                'wsDestroy'      => 'destroy',
                'wsRelease'      => 'release',
                'wsCopy'         => 'copy',
                'wsOverride'     => 'override',
                'wsGetOrSet'     => 'getOrSet',
                'wsGetContainer' => 'getContainer',
            ];
            $method      = $wsMethodMap[$name] ?? false;

            return WsContext::{$method}(...$arguments);
        }
        return Context::{$name}(...$arguments);
    }

    /**
     * 业务级 协程上下文 通过本方法设置的上下文变量,将会在 EasyCo 创建的协程中进行传递
     * 仅为值传递 不涉及地址
     * 一旦通过本方法 set,则只能通过对应的 easyGet 来获取,因为已经被存放到了指定的key中
     * @param mixed $id
     * @param mixed $value
     * @return mixed
     */
    public static function easySet(mixed $id, mixed $value): mixed
    {
        $key          = self::ctxKey();
        $current      = Context::getOrSet($key, []);
        $current[$id] = $value;
        return Context::set($key, $current);
    }

    /**
     * 通过本方法 获取业务级上下文中的指定值
     * @param string $id
     * @param $default
     * @return mixed|null
     */
    public static function easyGet(string $id, $default = null): mixed
    {
        $current = Context::get(self::ctxKey(), []);
        if (!$current) {
            return $default;
        } else {
            return $current[$id] ?? $default;
        }
    }

    static private function ctxKey(): string
    {
        return 'EasyContextCreate';
    }
}

defer 伪事务

<?php
/**
 * @contact  411906015@qq.com
 * @Author   liShun
 */
declare (strict_types=1);

namespace App\Utils;

use Hyperf\Context\Context;
use Hyperf\Utils\Coroutine;

/**
 * 便捷模拟事务投递 defer,在 DB::rollBack 时,必须同步清除
 * @mixin Coroutine
 */
class EasyCoroutine
{

    /**
     * 重载框架的协程调度
     * @param $name
     * @param $arguments
     * @return mixed
     */
    public static function __callStatic($name, $arguments)
    {
        return Coroutine::{$name}(...$arguments);
    }


    /**
     * 在业务流程中创建上下文,会将特定上下文传进去
     * @param callable $callable
     * @return int
     */
    static public function easyCreate(callable $callable): int
    {
        //创建时,获取当前特定需传输的上下文
        $current = Context::get(self::ctxKey(), []);

        return Coroutine::create(function () use ($callable, $current) {
            try {
                //也许copy进来,但感觉没必要,copy会直接覆盖且清空原上下文
                //此处进行强行覆盖操作 其实等同于copy
                Context::set(self::ctxKey(), $current);

                call($callable);
            } catch (\Throwable $exception) {
                //日志记录
                //通过 opis/closure 可将回调函数作为字符串保存日志 \Opis\Closure\serialize($callable)
            }
        });
    }

    /**
     * 注册 事务嵌套中的 defer
     * defer 归属同一个协程流程,只是在当前协程栈内 最终执行而已,不需要进行 上下文 传递
     * 返回本次注册 defer 的上下文ID
     * @param callable $callable
     * @return string
     */
    static public function easyDeferTrans(callable $callable): string
    {
        if (Coroutine::inCoroutine()) {
            $id = Coroutine::id() . '-' . session_create_id();
        } else {
            $id = uniqid() . '-' . session_create_id();
        }

        $deferCtx      = Context::getOrSet(self::deferCtxKey(), []);
        $deferCtx[$id] = 1;
        Context::set(self::deferCtxKey(), $deferCtx);

        $fn = function () use ($callable, $id) {
            try {
                $deferCtx = Context::get(self::deferCtxKey(), []);

                $defer = intval($deferCtx[$id] ?? 0);
                if ($defer) {
                    call($callable);
                }
            } catch (\Throwable $exception) {
                //日志记录
                //通过 opis/closure 可将回调函数作为字符串保存日志 \Opis\Closure\serialize($callable)
            }
        };

        Coroutine::defer($fn);

        return $id;
    }

    /**
     * 注销所有注册的 defer
     * @return void
     */
    static public function easyDeferRollBack(): void
    {
        try {
            if (Coroutine::inCoroutine()) {
                Context::set(self::deferCtxKey(), []);
            } else {
                Context::destroy(self::deferCtxKey());
            }
        } catch (\Throwable $exception) {
            //日志记录
            //通过 opis/closure 可将回调函数作为字符串保存日志 \Opis\Closure\serialize($callable)
        }
    }

    /**
     * 注销指定的某个defer
     * @param mixed $id
     * @return void
     */
    static public function easyDeferRollBackById(mixed $id): void
    {
        try {
            $deferCtx = Context::get(self::deferCtxKey(), []);
            if ($deferCtx) {
                $deferCtx[$id] = 0;
                Context::set(self::deferCtxKey(), $deferCtx);
            }
        } catch (\Throwable $exception) {
            //日志记录
            //通过 opis/closure 可将回调函数作为字符串保存日志 \Opis\Closure\serialize($callable)
        }
    }

    /**
     * defer上下文KEY
     * @return string
     */
    static private function deferCtxKey(): string
    {
        return 'EasyCoroutineDefer';
    }

    /**
     * 获取业务级上下文的KEY
     * 此处应与 EasyCtx 中保持一致
     * @return string
     */
    static private function ctxKey(): string
    {
        return 'EasyContextCreate';
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 7

当然了,各位也可以直接取用本方法的思想,直接通过classmap或devp去替换掉官方协程对象,这样也更优雅

1年前 评论

似懂非懂,是不是可以理解为业务级别的上下文? 这个封装是不是完成了从请求协程到请求协程内创建的其它所有协程的上下文传递?例如A请求进来,创建了N个协程处理逻辑,N个协程的上下文都是自动完成赋值的?

1年前 评论
Polaris2018 (楼主) 1年前
Polaris2018 (楼主) 1年前
zhangsansan957 (作者) 1年前
  • deferTrans 应用时应注意的问题:
    • 在自定义进程中,如果存在 while(1) 之类的不退出的进程,注册的defer是不会执行的
    • cli脚本中,会在脚本结束时执行
    • 所以建议在涉及 deferTrans 的任务里,要注意以上一些情况
      • 可以通过手动创建协程,再去执行代码,就会在当次创建的协程结束时执行defer,需要同步时,通过 csp waitGroup Parallel 之类的去执行,或者手搓一个 channel,也是很简单的事
1年前 评论
Polaris2018 (作者) (楼主) 1年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!