laravel 队列任务重复执行

1. 运行环境

lnmp

1). 当前使用的 Laravel 版本?

8.4
//: <> (使用 php artisan --version 命令查看)

2). 当前使用的 php/php-fpm 版本?

7.4
PHP 版本:
php-fpm 版本:
7.4

3). 当前系统

CentOS 8

4). 业务环境

开发和生产,使用负载均衡

2. 问题描述?

记录每天的库存流水,因为每天产生的数据量很大,所以我编写了一个队列任务类

3. 您期望得到的结果?

我期望得到的结果是队列里面的的任务依次执行且不会重复

4. 您实际得到的结果?

实际得到的结果会产生重复的数据,队列任务只在一台服务器上运行的

laravel 队列任务重复执行

下面是我的代码

public function addWareHouseProductInfoNew(Request $request,WareHouseService $wareHouseService){

        $job = (new AddWarehouseProduct($request->all()))->onQueue('AddWarehouseProduct');
        $this->dispatch($job);
        return $this->ok([
            'code' => 200,
            'message' =>'success'
        ]);
    }

任务类的代码

 public function handle()
    {
        $key = 'add_warehouse_product';
        \Illuminate\Support\Facades\Redis::throttle($key)->allow(10)->every(60)->then(function (){
            //库存流水
            $operation_id = isset($this->data['operation_id']) ? $this->data['operation_id'] : null;
            $warehouse_id = isset($this->data['warehouse_id']) ? $this->data['warehouse_id'] : null;
            $manager_id = isset($this->data['manager_id']) ? $this->data['manager_id'] : 1192;
            $products = $this->data['products'];
            $operation_type = DB::table('lemi_warehouse_operation')->where('operation_id', $operation_id)->value('operation_type');
            try {
                // DB::beginTransaction();
                $ware_order = DB::table('lemi_warehouse_order')->insertGetId([
                    'manager_id' => $manager_id ?? 1192,
                    'memo' => isset($this->data['memo']) ? $this->data['memo'] : '',
                    'created_at' => date('Y-m-d H:i:s')
                ]);
                $warehouse_location = DB::table('lemi_warehouse_location')->where('warehouse_id', $warehouse_id)->where('is_default', 1)->first();
                $amount_warehouse_sum =0;
                if ($operation_type == 1)//入库
                {
                    foreach ($products as $k => $v) {
                        if (!$v['location_id']) {
                            $v['location_id'] = $warehouse_location->location_id;
                        }
                        DB::beginTransaction();
                        //计算产品分库数量
                        $add =  DB::table('lemi_warehouse_product')->insertGetId([
                            'operation_id' => $operation_id,
                            'warehouse_id' => $warehouse_id,
                            // 'location_id' => $location_id,
                            'location_id' => $v['location_id'],
                            'product_id' => $v['product_id'],
                            'amount_in' => $v['amount'],
                            'amount_out' => 0,
                            'price' => $v['price'],
                            'amount_wait_sum' => 0,
                            'order_id' => $ware_order,
                            'detail_min_id' => isset($v['detail_min_id']) ? $v['detail_min_id'] : null,
                            'store_id' => isset($v['store_id']) ? $v['store_id'] : null,
                            'created_at' => date('Y-m-d H:i:s'),
                            'manager_id' => $manager_id ?? 1192,
                        ]);
                        DB::commit();
                        $this->calculate($add);
                    }
                } else { //出库
                    foreach ($products as $k => $v) {
                        DB::beginTransaction();
                        if (!$v['location_id']) {
                            $v['location_id'] = $warehouse_location->location_id;
                        }
                        $add = DB::table('lemi_warehouse_product')->insertGetId([
                            'operation_id' => $operation_id,
                            'warehouse_id' => $warehouse_id,
                            // 'location_id' => $location_id,
                            'location_id' => $v['location_id'],
                            'product_id' => $v['product_id'],
                            'amount_in' => 0,
                            'amount_out' => $v['amount'],
                            'price' => $v['price'],
                            'amount_wait_sum' => 0,
                            'order_id' => $ware_order,
                            'detail_min_id' => isset($v['detail_min_id']) ? $v['detail_min_id'] : null,
                            'store_id' => isset($v['store_id']) ? $v['store_id'] : null,
                            'created_at' => date('Y-m-d H:i:s'),
                            'manager_id' => $manager_id ?? 1192,
                        ]);
                        DB::commit();
                        //Log::info('库存出库'.$add);
                        $this->calculate($add);
                    }
                }
            } catch (QueryException $e) {
                DB::rollBack();
                Log::debug('库存添加失败',$this->data);
                Log::error($e);
            }
        },function (){
            return $this->release(10);
        });


    }

    public function calculate($id)
    {
        $wareHouse = DB::table('lemi_warehouse_product')->where('id', '=',$id)->whereNull('amount_warehouse_sum')->first();
        if ($wareHouse) { //计算产品分库数量
            $amount_warehouse_sum1 = DB::table('lemi_warehouse_product')->where('warehouse_id', $wareHouse->warehouse_id)->where('product_id', $wareHouse->product_id)->whereNotNull('amount_warehouse_sum')->orderby('id', 'desc')->value('amount_warehouse_sum');
            $amount_warehouse_sum2 = DB::table('lemi_warehouse_product')->where('warehouse_id', $wareHouse->warehouse_id)->where('location_id', $wareHouse->location_id)->where('product_id', $wareHouse->product_id)->whereNotNull('amount_warehouse_sum')->orderby('id', 'desc')->value('amount_warehouse_sum_location');
            $amount_sum2 = DB::table('lemi_warehouse_product')->where('product_id', $wareHouse->product_id)->whereNotNull('amount_warehouse_sum')->orderby('id', 'desc')->value('amount_sum');
            if ($wareHouse->amount_in) {
                $amount_warehouse_sum = $amount_warehouse_sum1 + $wareHouse->amount_in;
                $amount_warehouse_sum_location = $amount_warehouse_sum2 + $wareHouse->amount_in;
                $amount_sum = $amount_sum2 + $wareHouse->amount_in;
            } elseif ($wareHouse->amount_out) {
                $amount_warehouse_sum = $amount_warehouse_sum1 - $wareHouse->amount_out;
                $amount_warehouse_sum_location = $amount_warehouse_sum2 - $wareHouse->amount_out;
                $amount_sum = $amount_sum2 - $wareHouse->amount_out;
            } else {
                $amount_warehouse_sum = $amount_warehouse_sum1;
                $amount_sum = $amount_sum2;
                $amount_warehouse_sum_location = $amount_warehouse_sum2;
            }
            DB::table('lemi_warehouse_product')->where('id', $wareHouse->id)->update([
                'amount_warehouse_sum' => $amount_warehouse_sum,
                'amount_sum' => $amount_sum,
                'amount_warehouse_sum_location' => $amount_warehouse_sum_location
            ]);
        }else{
            Log::info('没有该记录'.$id);
        }

    }

请各位大佬指教一二

《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
最佳答案

有可能出现重复的几个原因:

  • 在N台服务器同时部署了任务队列任务
  • 只布了一台服务器,但是启动了多个任务进程(如supervisor的numprocs配置,或者直接挂起后台多个)
  • 单次任务执行时间非常长,超过了最大执行时间限制,任务调度的下一个周期会以为任务还未得到执行,又唤起继续执行,解决:调整任务执行的最大时间,详见 队列《Laravel 9 中文文档》

对于强烈需要防止重复执行的任务,加一些锁机制,自带的已经很完善,可参考:
队列《Laravel 9 中文文档》
队列《Laravel 9 中文文档》

1年前 评论
Rache1 1年前
讨论数量: 4

有可能出现重复的几个原因:

  • 在N台服务器同时部署了任务队列任务
  • 只布了一台服务器,但是启动了多个任务进程(如supervisor的numprocs配置,或者直接挂起后台多个)
  • 单次任务执行时间非常长,超过了最大执行时间限制,任务调度的下一个周期会以为任务还未得到执行,又唤起继续执行,解决:调整任务执行的最大时间,详见 队列《Laravel 9 中文文档》

对于强烈需要防止重复执行的任务,加一些锁机制,自带的已经很完善,可参考:
队列《Laravel 9 中文文档》
队列《Laravel 9 中文文档》

1年前 评论
Rache1 1年前

不加点日志debug下?
有没有可能是request了多次,以为只提交了一次?

1年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!