Laravel mysql to Mongo 分享一些数据同步及分析的心得体会

mysql 作为业务库,不太适合去作为分析,要不可能扛不住。先将mysql的数据同步到mongodb中,然后再从mongodb中去分析。

数据同步

将mysql中的数据同步到mongodb中,这里分为两步,第一步全量同步,不去检查mongo是否存在这条数据,要不会耗时,直接插入。可以将id作为同步的校准,比如,同步过程中中断了,此时在进行全量同步数据的时候,这样不会导至重复插入。

$maxId = OrderMongo::max('id');//基准id

Order::oldest('id')->where('id','>',$maxId)->chunk(1000,function($items){
    OrderMongo::insert($items->toArray());
})

嫌插入慢的时候,可以放进队列运行。

$maxId = OrderMongo::max('id');

Order::oldest('id')->where('id','>',$maxId)->chunk(1000,function($items){
    $a = $lists->pluck('id');
    $data = [
        'max' =>$a->max(),
        'min' =>$a->min()
    ];
    dispatch( new InsertQueue($data))->onQueue('insert-queue');
})

InsertQueue

 public $data;

 public function __construct($data)
    {
        $this->data = $data;
    }
 public function handle()
{
    OrderModel::oldest('id')->where('id', '>=',$this->data['min'] ))->where('id','<=',$this->data['max'])->chunk(100,function(items){
        OrderMongo::insert($items->toArray());
    });
}

记得用一个进程去消费,要不插入数据的顺序会乱掉。

全量同步完后,为了保持数据的一致性,要考虑增量同步,分为插入和更新。此时根据,update_time 的时间,来对数据进行更新或插入,增量同步的时间间隔,要比定时任务的时间间隔大一分钟。这样可以避免在当前时间点丢单的情况,像这样

    $schedule->command('sync:zengLiang')->everyThirtyMinutes();//30分钟
//这里 31分钟
Order::where('update_time','>',now()->subMinutes(31))->each(function($item){
    OrderMongo::updateOrInsert(['id'=>$item->id,$titem->toArray());
})

数据分析

使用惯了mysql的聚合语句后,使用mongo来查会一时适应不过来,mysql 使用 join 比较多,基本上 mongo 弄明白来”join”后,使用起来就舒服多了。这里以orders和order_items 作为例子。上代码

$data = OrderMongo::query()->raw(function ($collection) {
    $aggregate = [];
    $aggregate[] = [
        '$match'=>[//相当于mysql 的 where 和orWhere
            '$and'=>[
                ['date' => ['$gte' => $start, '$lt' => $end]]
            ],
            '$or'=>[
                ['store_id' => 1]
            ],
        ]
    ];
    $aggregate[] = [
        '$project' => [
            'id' =>1,//字段展示 相当于 mysql 的 select id ,total from orders
            'store_id'=>1,
            'total'=>1,//字段展示
        ]
    ];

    //$lookup 相当于 join 
    $aggregate [] = [
        '$lookup'=>[
            'from' =>'order_items',
            'localField'=>'order_id',
            'foreignField'=>'id',
            'as'=>'items',
        ]
    ];

    //不过这里 查出的数据是 [[id,store_id,total,items]] items 是一个数组,现在把它展开

    $aggregate[] = [
        '$unwind'=>'$items'
    ];

    //假如 一个order 有三条 items 这里会展开三条数据 [[id,store_id,total,items],[id,store_id,total,items],[id,store_id,total,items]] //这里的items 是一个对象

    //聚合
    '$group'=>[
        '_id' => [//这里相当于 mysql 的groupBy field1,field2
            'store_id'=>'$store_id'
        ],
        'store_total' => [
            '$sum' => '$total',
        ],//相当于 select sum(total) as store_total //这里并不是门店的数据,因为 oreder_items 展开了,只是作为示例

    ];
    return $collection->aggregate($aggregate);


})

这只是最基础的分析数据,分析的时候,可以将各个维度的模块分析数据,分开,模拟数据弄的多一点,放进队列跑,跑得快点,跑错了没关系,删除掉数据重新跑。可能最后总数据只差了10 几款钱,这时不要慌,先确定范围,再将范围一步步的缩小,最后找到造成数据错误的一条数据和几条数据,修复,在和基准数据确认,再跑~总之就是不断跑~~因为这一条数据错误,假如后面的数据依赖这条数据的话,那么数据多多少少会有一点误差

以上

待定

可能会写一个数据同步的扩展包,因为写的过程中,发现可以结构化,这样在后台点点点就可以同步了

本作品采用《CC 协议》,转载必须注明作者和本文链接
Make everything simple instead of making difficulties as simple as possible
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 2

OrderMongo::min('id') 这里不是 OrderMongo::max('id') 吗???

3年前 评论
jcc123

@振翅飞翔 是的,多谢指出,已修改

3年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!