laravelS使用redis stream完成每日亿级别数据存储

环境背景

某广告系统日志上报服务,每天接受亿级别接受数据, 常用消息队列有 aws sqs kiness,rabbitmq,kafka,还有鸡肋版本的redis stream,个人事后建议使用(rabbitmq,kafka):joy:、但是由于对消息队列的不了解 为了快速开发用了redis 简单轻小

  1. 异步消息队列redis stream
  2. 数据库 tidb存储,tidb是个好东西 不用分表分库,但是对高可用这方面还是差点 需要对tidb捣鼓好长时间调优才可用
  3. laravelS 自定义进程
  4. laravelS HTTP服务
<?php

namespace App\Processes;

use Hhxsv5\LaravelS\Swoole\Process\CustomProcessInterface;
use Illuminate\Support\Facades\Redis;
use Swoole\Http\Server;
use Swoole\Process;

/**
 * 初始队列
 *
 * Class LoggerProcess
 * @package App\Processes
 */
class LoggerProcess implements CustomProcessInterface
{

    /**
     * @var bool 退出标记,用于Reload更新
     */
    private static $quit = false;

    /**
     * @var string 分组名称
     */
    public static $group_name = '';

    public static $consumer_name = '';

    /**
     * 执行回调
     *
     * @param Server $swoole
     * @param Process $process
     * @throws \Exception
     */
    public static function callback(Server $swoole, Process $process){
        self::$group_name = "logger";
        self::$consumer_name = "logger_".rand(1,100);
        // 初始化分组
        self::groupInit();
        // 数据落库
        self::handle();
    }

    /**
     * LaravelS >= v3.4.0 并且 callback() 必须是异步非阻塞程序。
     *
     * @param Server $swoole
     * @param Process $process
     */
    public static function onReload(Server $swoole, Process $process)
    {
        self::$quit = true;
    }

    /**
     * 要求:LaravelS >= v3.7.4 并且 callback() 必须是异步非阻塞程序。
     *
     * @param Server $swoole
     * @param Process $process
     */
    public static function onStop(Server $swoole, Process $process)
    {
        self::$quit = true;
    }

    /**
     * 数据处理
     */
    public static function handle(){
        $cache_key = 'LOGGER_STREAM';
        $cache_retry_key = 'LOGGER_RETRY_STREAM';
        while (!self::$quit){
            $data = [];
            try {
                // 获取队列里面的1000条信息
                $get_result = Redis::xReadGroup(self::$group_name,self::$consumer_name,[$cache_key=>">"],1000,1000);
                if(!$get_result || !isset($get_result[$cache_key])){
                    continue;
                }
                $data = $get_result[$cache_key];
                // 存库
                self::insertData($data);
                // 消费成功、删除消息
                Redis::xAck($cache_key,self::$group_name,array_keys($data));
                Redis::xDel($cache_key,array_keys($data));
            }catch (\Exception $exception){
                // 存入重试队列
                if ($data){
                    foreach ($data as $item){
                        Redis::xAdd($cache_retry_key,"*",$item);
                    }
                    Redis::xAck($cache_key,self::$group_name,array_keys($data));
                }
                // 钉钉报警通知
                $str = "[".date('Y-m-d H:i:s')."] ".class_basename(__CLASS__).":系统异常,".catchErr($exception);
            }
        }
    }



    /**
     * 初始化 group分组
     */
    public static function groupInit(){
        $cache_key = 'LOGGER_STREAM';
        $group_result = Redis::xInfo('GROUPS', $cache_key);
        $exist_group=false;
        if ($group_result){
            // 判断分组是否存在
            foreach($group_result as $group){
                if($group['name'] === self::$group_name) {
                    $exist_group = true;
                    break;
                }
            }
        }
        //判断组是否存在  如果不存在 则创建组  xGroup('CREATE',stream的名称,组的名称,0表示从头开始读)
        if(!$exist_group){
            //从0开始
            Redis::xGroup('CREATE', $cache_key,self::$group_name, "0");
        }
    }
}

php
本作品采用《CC 协议》,转载必须注明作者和本文链接
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!