laravel chunk 和chunkById 逻辑解读(踩坑有感)
一、项目版本
laravel 5.5
二、事件起因
最近的工作中有对数据库中的大量数据进行抽取处理的场景,其中就会需要对数据进行分块查询出来进行处理。一开始直接用chunk处理,后续想给任务提提速,发现laravel有chunkById的轮子于是直接使用chunkById取代了chunk。 但是在核对数据时发现数据差异特别大, 于是有了下面的深究过程。
三、原理解析
public function chunk($count, callable $callback)
{
$this->enforceOrderBy();
$page = 1;
do {
// We'll execute the query for the given page and get the results. If there are
// no results we can just break and return from here. When there are results
// we will call the callback with the current chunk of these results here.
$results = $this->forPage($page, $count)->get();
$countResults = $results->count();
if ($countResults == 0) {
break;
}
// On each chunk result set, we will pass them to the callback and then let the
// developer take care of everything within the callback, which allows us to
// keep the memory low for spinning through large result sets for working.
if ($callback($results, $page) === false) {
return false;
}
unset($results);
$page++;
} while ($countResults == $count);
return true;
}
public function chunkById($count, callable $callback, $column = null, $alias = null)
{
$column = is_null($column) ? $this->getModel()->getKeyName() : $column;
$alias = is_null($alias) ? $column : $alias;
$lastId = 0;
do {
$clone = clone $this;
// We'll execute the query for the given page and get the results. If there are
// no results we can just break and return from here. When there are results
// we will call the callback with the current chunk of these results here.
$results = $clone->forPageAfterId($count, $lastId, $column)->get();
$countResults = (int) $results->count();
if (!$countResults) {
break;
}
// On each chunk result set, we will pass them to the callback and then let the
// developer take care of everything within the callback, which allows us to
// keep the memory low for spinning through large result sets for working.
if ($callback($results) === false) {
return false;
}
$lastId = $results->last()->{$alias};
unset($results);
} while ($countResults === $count);
return true;
}
从两个方法的源码都可以看到,两个方法使用的思路都是一样的,根据指定的分块大小 记录一个起始值,根据每一次的其实值查出指定分块的大小,避免一次性查出过多数据导致占用内存过大。 chunk 实际上就是通过 limit offset实现的,而 chunkById在没有指定column参数的情况下是通过上一次查询记录的结果的最后一条数据的主键为标记(方法会对id进行排序)。
四、问题溯源
通过查看任务执行期间的sql记录发现,chunkById并没有如我预期一样的运行
{"message":"select sum(package_consumables_cost) package_consumables_cost,sum(package_cost) package_cost,sum(return_restock) return_restock,sum(return_and_first_leg_freight) return_and_first_leg_freight from `monthly_report_histories` where `record_month` = ? and `record_month` = month and `amount_type_name` = ? and `id` > ? group by `customer_account`, `product_sku`, `purchaser_name`, `seller_name`, `department_name`, `currency_code`, `platform_site_code` order by `id` asc limit 250","context":"{\"bindings\":[\"2023-01\",\"\",62902300] }"}
{"message":"select sum(platform_last_leg_cost * exchange_rate) platform_last_leg_cost, sum(exchange_loss) exchange_loss,sum(product_cost) product_cost,sum(freight) freight,sum(first_leg_freight) first_leg_freight,sum(package_consumables_cost) package_consumables_cost,sum(package_cost) package_cost,sum(return_restock) return_restock,sum(return_and_first_leg_freight) return_and_first_leg_freight from `monthly_report_histories` where `record_month` = ? and `record_month` = month and `amount_type_name` = ? and `id` > ? group by `customer_account`, `product_sku`, `purchaser_name`, `seller_name`, `department_name`, `currency_code`, `platform_site_code` order by `id` asc limit 250","context":"{\"bindings\":[\"2023-01\",\"\",62902857]}
五、总结
我的查询语句里面存在很多的groupBy和聚合函数。mysql的执行顺序where是在groupby和聚合之前的,chunkById在获取标记点的时候的查询语句已经和我最初的不一样了, 整体数据在分组和聚合之前就已经被改变了。 这也是数据不一致的原因。
后续在使用自己没用过的功能时一定要测一下,不要全凭想象
。
本作品采用《CC 协议》,转载必须注明作者和本文链接