基于Hyperf开发的任务调度系统.支持任务投递,DAG任务编排(多个任务使用同一个事务).

Task-Schedule

Task-Schedule

Hyperf开发的任务调度系统.

基于 Hyperf + Nsq 的一个异步队列库.支持投递任务,DAG任务编排.多个任务使用同一个事务。

特性

  • 默认 Nsq 驱动
  • 秒级延时任务
  • 自定义重试次数和时间
  • 自定义错误回调
  • 支持任务执行中间件
  • 自定义队列快照事件
  • 弹性多进程消费
  • 协程支持
  • 漂亮的仪表盘
  • 任务编排协程安全的单连接模式(事务保持、多路复用等条件下,有时必须使用一个连接)
  • dag任务编排

环境

  • PHP 7.4+
  • Swoole 4.6+
  • Redis 5.0+ (redis 驱动)
  • Nsq 1.2.0

TODO

  • 分布式支持

案例

1.投递任务

use App\Model\Task;
use App\Job\SimpleJob;
use App\Kernel\Nsq\Queue;
class Example{
     /**
     * @desc 测试job队列功能
     */
    public function queue() : void
    {
        $task = Task::find(1);

        $job = new SimpleJob($task);

        $queue = new Queue('queue');
        $queue->push($job);
    }
}

2.任务编排协程安全的单连接模式(事务保持、多路复用等条件下,有时必须使用一个连接)

use App\Kernel\Concurrent\ConcurrentMySQLPattern;
use App\Dag\Task\Task1;
use App\Dag\Task\Task2;
use App\Dag\Task\Task3;
class Example{
public function conCurrentMySQL() : void
    {
        $dsn      = 'DSN';
        $user     = 'USER';
        $password = 'PWD';
        try {
            $pdo = new \PDO($dsn, $user, $password);
            $pdo->setAttribute(\PDO::ATTR_EMULATE_PREPARES, true);
            $c = new ConcurrentMySQLPattern($pdo, $this->logger);
            $c->beginTransaction();
            $dag     = new \Hyperf\Dag\Dag();
            $a       = \Hyperf\Dag\Vertex::make(function () use ($c)
            {
                $task = new Task1();
                return $task->Run($c);
            }, 'a');
            $b       = \Hyperf\Dag\Vertex::make(function ($results) use ($c)
            {
                $task = new Task2();
                return $task->Run($c);
            }, 'b');
            $d       = \Hyperf\Dag\Vertex::make(function ($results) use ($c, $a, $b)
            {
                if ($results[$a->key] && $results[$b->key]) {
                    return $c->commit();
                }
                return $c->rollback();
            }, 'd');
            $e       = \Hyperf\Dag\Vertex::make(function ($results) use ($c)
            {
                $c->close();
            }, 'e');
            $results = $dag
                ->addVertex($a)
                ->addVertex($b)
                ->addVertex($d)
                ->addVertex($e)
                ->addEdge($a, $b)
                ->addEdge($b, $d)
                ->addEdge($d, $e)
                ->run();
        } catch (\PDOException $exception) {
            echo 'Connection failed: ' . $exception->getMessage();
        }
    }
}

仪表盘

github.com/Hyperf-Glory/Task-Sched...

参考:github.com/Littlesqx/aint-queue/

本作品采用《CC 协议》,转载必须注明作者和本文链接
CodingHePing
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 1
CodingHePing

谢谢各位的star

3年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!