Laravel mysql to Mongo 分享一些数据同步及分析的心得体会

mysql 作为业务库,不太适合去作为分析,要不可能扛不住。先将mysql的数据同步到mongodb中,然后再从mongodb中去分析。

数据同步

将mysql中的数据同步到mongodb中,这里分为两步,第一步全量同步,不去检查mongo是否存在这条数据,要不会耗时,直接插入。可以将id作为同步的校准,比如,同步过程中中断了,此时在进行全量同步数据的时候,这样不会导至重复插入。

$maxId = OrderMongo::max('id');//基准id

Order::oldest('id')->where('id','>',$maxId)->chunk(1000,function($items){
    OrderMongo::insert($items->toArray());
})

嫌插入慢的时候,可以放进队列运行。

$maxId = OrderMongo::max('id');

Order::oldest('id')->where('id','>',$maxId)->chunk(1000,function($items){
    $a = $lists->pluck('id');
    $data = [
        'max' =>$a->max(),
        'min' =>$a->min()
    ];
    dispatch( new InsertQueue($data))->onQueue('insert-queue');
})

InsertQueue

 public $data;

 public function __construct($data)
    {
        $this->data = $data;
    }
 public function handle()
{
    OrderModel::oldest('id')->where('id', '>=',$this->data['min'] ))->where('id','<=',$this->data['max'])->chunk(100,function(items){
        OrderMongo::insert($items->toArray());
    });
}

记得用一个进程去消费,要不插入数据的顺序会乱掉。

全量同步完后,为了保持数据的一致性,要考虑增量同步,分为插入和更新。此时根据,update_time 的时间,来对数据进行更新或插入,增量同步的时间间隔,要比定时任务的时间间隔大一分钟。这样可以避免在当前时间点丢单的情况,像这样

    $schedule->command('sync:zengLiang')->everyThirtyMinutes();//30分钟
//这里 31分钟
Order::where('update_time','>',now()->subMinutes(31))->each(function($item){
    OrderMongo::updateOrInsert(['id'=>$item->id,$titem->toArray());
})

数据分析

使用惯了mysql的聚合语句后,使用mongo来查会一时适应不过来,mysql 使用 join 比较多,基本上 mongo 弄明白来”join”后,使用起来就舒服多了。这里以orders和order_items 作为例子。上代码

$data = OrderMongo::query()->raw(function ($collection) {
    $aggregate = [];
    $aggregate[] = [
        '$match'=>[//相当于mysql 的 where 和orWhere
            '$and'=>[
                ['date' => ['$gte' => $start, '$lt' => $end]]
            ],
            '$or'=>[
                ['store_id' => 1]
            ],
        ]
    ];
    $aggregate[] = [
        '$project' => [
            'id' =>1,//字段展示 相当于 mysql 的 select id ,total from orders
            'store_id'=>1,
            'total'=>1,//字段展示
        ]
    ];

    //$lookup 相当于 join 
    $aggregate [] = [
        '$lookup'=>[
            'from' =>'order_items',
            'localField'=>'order_id',
            'foreignField'=>'id',
            'as'=>'items',
        ]
    ];

    //不过这里 查出的数据是 [[id,store_id,total,items]] items 是一个数组,现在把它展开

    $aggregate[] = [
        '$unwind'=>'$items'
    ];

    //假如 一个order 有三条 items 这里会展开三条数据 [[id,store_id,total,items],[id,store_id,total,items],[id,store_id,total,items]] //这里的items 是一个对象

    //聚合
    '$group'=>[
        '_id' => [//这里相当于 mysql 的groupBy field1,field2
            'store_id'=>'$store_id'
        ],
        'store_total' => [
            '$sum' => '$total',
        ],//相当于 select sum(total) as store_total //这里并不是门店的数据,因为 oreder_items 展开了,只是作为示例

    ];
    return $collection->aggregate($aggregate);


})

这只是最基础的分析数据,分析的时候,可以将各个维度的模块分析数据,分开,模拟数据弄的多一点,放进队列跑,跑得快点,跑错了没关系,删除掉数据重新跑。可能最后总数据只差了10 几款钱,这时不要慌,先确定范围,再将范围一步步的缩小,最后找到造成数据错误的一条数据和几条数据,修复,在和基准数据确认,再跑~总之就是不断跑~~因为这一条数据错误,假如后面的数据依赖这条数据的话,那么数据多多少少会有一点误差

以上

待定

可能会写一个数据同步的扩展包,因为写的过程中,发现可以结构化,这样在后台点点点就可以同步了

本作品采用《CC 协议》,转载必须注明作者和本文链接
Make everything simple instead of making difficulties as simple as possible
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 2

OrderMongo::min('id') 这里不是 OrderMongo::max('id') 吗???

3年前 评论
jcc123

@振翅飞翔 是的,多谢指出,已修改

3年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!