Laravel mysql to Mongo 分享一些数据同步及分析的心得体会
mysql 作为业务库,不太适合去作为分析,要不可能扛不住。先将mysql的数据同步到mongodb中,然后再从mongodb中去分析。
数据同步
将mysql中的数据同步到mongodb中,这里分为两步,第一步全量同步,不去检查mongo是否存在这条数据,要不会耗时,直接插入。可以将id作为同步的校准,比如,同步过程中中断了,此时在进行全量同步数据的时候,这样不会导至重复插入。
$maxId = OrderMongo::max('id');//基准id
Order::oldest('id')->where('id','>',$maxId)->chunk(1000,function($items){
OrderMongo::insert($items->toArray());
})
嫌插入慢的时候,可以放进队列运行。
$maxId = OrderMongo::max('id');
Order::oldest('id')->where('id','>',$maxId)->chunk(1000,function($items){
$a = $lists->pluck('id');
$data = [
'max' =>$a->max(),
'min' =>$a->min()
];
dispatch( new InsertQueue($data))->onQueue('insert-queue');
})
InsertQueue
public $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
OrderModel::oldest('id')->where('id', '>=',$this->data['min'] ))->where('id','<=',$this->data['max'])->chunk(100,function(items){
OrderMongo::insert($items->toArray());
});
}
记得用一个进程去消费,要不插入数据的顺序会乱掉。
全量同步完后,为了保持数据的一致性,要考虑增量同步,分为插入和更新。此时根据,update_time 的时间,来对数据进行更新或插入,增量同步的时间间隔,要比定时任务的时间间隔大一分钟。这样可以避免在当前时间点丢单的情况,像这样
$schedule->command('sync:zengLiang')->everyThirtyMinutes();//30分钟
//这里 31分钟
Order::where('update_time','>',now()->subMinutes(31))->each(function($item){
OrderMongo::updateOrInsert(['id'=>$item->id,$titem->toArray());
})
数据分析
使用惯了mysql的聚合语句后,使用mongo来查会一时适应不过来,mysql 使用 join 比较多,基本上 mongo 弄明白来”join”后,使用起来就舒服多了。这里以orders和order_items 作为例子。上代码
$data = OrderMongo::query()->raw(function ($collection) {
$aggregate = [];
$aggregate[] = [
'$match'=>[//相当于mysql 的 where 和orWhere
'$and'=>[
['date' => ['$gte' => $start, '$lt' => $end]]
],
'$or'=>[
['store_id' => 1]
],
]
];
$aggregate[] = [
'$project' => [
'id' =>1,//字段展示 相当于 mysql 的 select id ,total from orders
'store_id'=>1,
'total'=>1,//字段展示
]
];
//$lookup 相当于 join
$aggregate [] = [
'$lookup'=>[
'from' =>'order_items',
'localField'=>'order_id',
'foreignField'=>'id',
'as'=>'items',
]
];
//不过这里 查出的数据是 [[id,store_id,total,items]] items 是一个数组,现在把它展开
$aggregate[] = [
'$unwind'=>'$items'
];
//假如 一个order 有三条 items 这里会展开三条数据 [[id,store_id,total,items],[id,store_id,total,items],[id,store_id,total,items]] //这里的items 是一个对象
//聚合
'$group'=>[
'_id' => [//这里相当于 mysql 的groupBy field1,field2
'store_id'=>'$store_id'
],
'store_total' => [
'$sum' => '$total',
],//相当于 select sum(total) as store_total //这里并不是门店的数据,因为 oreder_items 展开了,只是作为示例
];
return $collection->aggregate($aggregate);
})
这只是最基础的分析数据,分析的时候,可以将各个维度的模块分析数据,分开,模拟数据弄的多一点,放进队列跑,跑得快点,跑错了没关系,删除掉数据重新跑。可能最后总数据只差了10 几款钱,这时不要慌,先确定范围,再将范围一步步的缩小,最后找到造成数据错误的一条数据和几条数据,修复,在和基准数据确认,再跑~总之就是不断跑~~因为这一条数据错误,假如后面的数据依赖这条数据的话,那么数据多多少少会有一点误差
以上
待定
可能会写一个数据同步的扩展包,因为写的过程中,发现可以结构化,这样在后台点点点就可以同步了
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: