laravel 队列任务重复执行
1. 运行环境
lnmp
1). 当前使用的 Laravel 版本?
8.4
//: <> (使用 php artisan --version
命令查看)
2). 当前使用的 php/php-fpm 版本?
7.4
PHP 版本:
php-fpm 版本:
7.4
3). 当前系统
CentOS 8
4). 业务环境
开发和生产,使用负载均衡
2. 问题描述?
记录每天的库存流水,因为每天产生的数据量很大,所以我编写了一个队列任务类
3. 您期望得到的结果?
我期望得到的结果是队列里面的的任务依次执行且不会重复
4. 您实际得到的结果?
实际得到的结果会产生重复的数据,队列任务只在一台服务器上运行的
下面是我的代码
public function addWareHouseProductInfoNew(Request $request,WareHouseService $wareHouseService){
$job = (new AddWarehouseProduct($request->all()))->onQueue('AddWarehouseProduct');
$this->dispatch($job);
return $this->ok([
'code' => 200,
'message' =>'success'
]);
}
任务类的代码
public function handle()
{
$key = 'add_warehouse_product';
\Illuminate\Support\Facades\Redis::throttle($key)->allow(10)->every(60)->then(function (){
//库存流水
$operation_id = isset($this->data['operation_id']) ? $this->data['operation_id'] : null;
$warehouse_id = isset($this->data['warehouse_id']) ? $this->data['warehouse_id'] : null;
$manager_id = isset($this->data['manager_id']) ? $this->data['manager_id'] : 1192;
$products = $this->data['products'];
$operation_type = DB::table('lemi_warehouse_operation')->where('operation_id', $operation_id)->value('operation_type');
try {
// DB::beginTransaction();
$ware_order = DB::table('lemi_warehouse_order')->insertGetId([
'manager_id' => $manager_id ?? 1192,
'memo' => isset($this->data['memo']) ? $this->data['memo'] : '',
'created_at' => date('Y-m-d H:i:s')
]);
$warehouse_location = DB::table('lemi_warehouse_location')->where('warehouse_id', $warehouse_id)->where('is_default', 1)->first();
$amount_warehouse_sum =0;
if ($operation_type == 1)//入库
{
foreach ($products as $k => $v) {
if (!$v['location_id']) {
$v['location_id'] = $warehouse_location->location_id;
}
DB::beginTransaction();
//计算产品分库数量
$add = DB::table('lemi_warehouse_product')->insertGetId([
'operation_id' => $operation_id,
'warehouse_id' => $warehouse_id,
// 'location_id' => $location_id,
'location_id' => $v['location_id'],
'product_id' => $v['product_id'],
'amount_in' => $v['amount'],
'amount_out' => 0,
'price' => $v['price'],
'amount_wait_sum' => 0,
'order_id' => $ware_order,
'detail_min_id' => isset($v['detail_min_id']) ? $v['detail_min_id'] : null,
'store_id' => isset($v['store_id']) ? $v['store_id'] : null,
'created_at' => date('Y-m-d H:i:s'),
'manager_id' => $manager_id ?? 1192,
]);
DB::commit();
$this->calculate($add);
}
} else { //出库
foreach ($products as $k => $v) {
DB::beginTransaction();
if (!$v['location_id']) {
$v['location_id'] = $warehouse_location->location_id;
}
$add = DB::table('lemi_warehouse_product')->insertGetId([
'operation_id' => $operation_id,
'warehouse_id' => $warehouse_id,
// 'location_id' => $location_id,
'location_id' => $v['location_id'],
'product_id' => $v['product_id'],
'amount_in' => 0,
'amount_out' => $v['amount'],
'price' => $v['price'],
'amount_wait_sum' => 0,
'order_id' => $ware_order,
'detail_min_id' => isset($v['detail_min_id']) ? $v['detail_min_id'] : null,
'store_id' => isset($v['store_id']) ? $v['store_id'] : null,
'created_at' => date('Y-m-d H:i:s'),
'manager_id' => $manager_id ?? 1192,
]);
DB::commit();
//Log::info('库存出库'.$add);
$this->calculate($add);
}
}
} catch (QueryException $e) {
DB::rollBack();
Log::debug('库存添加失败',$this->data);
Log::error($e);
}
},function (){
return $this->release(10);
});
}
public function calculate($id)
{
$wareHouse = DB::table('lemi_warehouse_product')->where('id', '=',$id)->whereNull('amount_warehouse_sum')->first();
if ($wareHouse) { //计算产品分库数量
$amount_warehouse_sum1 = DB::table('lemi_warehouse_product')->where('warehouse_id', $wareHouse->warehouse_id)->where('product_id', $wareHouse->product_id)->whereNotNull('amount_warehouse_sum')->orderby('id', 'desc')->value('amount_warehouse_sum');
$amount_warehouse_sum2 = DB::table('lemi_warehouse_product')->where('warehouse_id', $wareHouse->warehouse_id)->where('location_id', $wareHouse->location_id)->where('product_id', $wareHouse->product_id)->whereNotNull('amount_warehouse_sum')->orderby('id', 'desc')->value('amount_warehouse_sum_location');
$amount_sum2 = DB::table('lemi_warehouse_product')->where('product_id', $wareHouse->product_id)->whereNotNull('amount_warehouse_sum')->orderby('id', 'desc')->value('amount_sum');
if ($wareHouse->amount_in) {
$amount_warehouse_sum = $amount_warehouse_sum1 + $wareHouse->amount_in;
$amount_warehouse_sum_location = $amount_warehouse_sum2 + $wareHouse->amount_in;
$amount_sum = $amount_sum2 + $wareHouse->amount_in;
} elseif ($wareHouse->amount_out) {
$amount_warehouse_sum = $amount_warehouse_sum1 - $wareHouse->amount_out;
$amount_warehouse_sum_location = $amount_warehouse_sum2 - $wareHouse->amount_out;
$amount_sum = $amount_sum2 - $wareHouse->amount_out;
} else {
$amount_warehouse_sum = $amount_warehouse_sum1;
$amount_sum = $amount_sum2;
$amount_warehouse_sum_location = $amount_warehouse_sum2;
}
DB::table('lemi_warehouse_product')->where('id', $wareHouse->id)->update([
'amount_warehouse_sum' => $amount_warehouse_sum,
'amount_sum' => $amount_sum,
'amount_warehouse_sum_location' => $amount_warehouse_sum_location
]);
}else{
Log::info('没有该记录'.$id);
}
}
请各位大佬指教一二
有可能出现重复的几个原因:
对于强烈需要防止重复执行的任务,加一些锁机制,自带的已经很完善,可参考:
队列《Laravel 9 中文文档》
队列《Laravel 9 中文文档》