Laravel 的任务调度 (cron) 和队列一起使用制作采集器

说明!注意事项!

  • 看下面的教程!还是看一遍laravel中文文档是很有必要的!因为我是看中文文档、写出来的程序
  • 本人使用的是阿里云(1核CPU|1GB内存|1M带宽|centos6.5-64位系统|系统盘20G|laravel5.2)的配置
  • 所有的队列(queue)和任务调度(console)里面的数据库DB操作!都需要使用 DB类库!不可以使用orm!区别是mysql长链接和短链接……
  • 任务调度(console)里面只做数据库的查询——处理抛给队列处理!因为这样你的系统进程会很干净!不会又出现挂断或者假死等等!注意一点 程序结束必须有 return
  • 队列(queue)里面也是每一个 function 都要书写 return
  • 不管是在任务调度里面、队列任务里面、队列任务里面再次调用队列任务!都使用全局队列任务推送函数:dispatch!如何使用看下面操练代码
  • 重要一点!要排除所有的bug
  • 看代码为主!文字为辅助

    队列(queue)

  • 很多文档都在说队列、使用数据库、如果真正上线了使用数据库是不行的!一定修改成redis的!###一会说一下修改redis的流程
  • 做队列调试直接使用!数据库队列进行调试!

    数据库队列——开发与调试

  • 以下操作可以直接在现有项目上面执行!
    php artisan queue:table #为数据库队列生成数据库表
    php artisan migrate #为数据库队列创建数据库表
    php artisan make:job ArtCaiji #创一个队列工作任务!这个是处理文章内容的采集
  • 以下是队列工作任务文件里面的代码 注意看!我给大家注释

    <?php
    
    namespace App\Jobs;
    
    use App\Jobs\Job;
    use Illuminate\Queue\SerializesModels;
    use Illuminate\Queue\InteractsWithQueue;
    use Illuminate\Contracts\Queue\ShouldQueue;
    //以上都是自动生成的代码不需要管理
    use DB, phpQuery, Log; //这里调用了3个类库!DB是数据库操作类库、phpquery是采集类库、log是日志类库
    /*
    log类库主要是用来做调试 打印输出
    phpquery类库主要是用来做 网页上面的信息采集
    db类库主要是用来做 数据库的处理
    代码里面没有使用中间件!怕大家晕……
    类里面的 
    public function __construct($row)
    {
        //
        $this->row = $row;
    }
    不能省略这是队列工作任务类最重要的、传值方式!变量$row我传入的是一个对象值!然后把它赋给ArtCaiji类里面的私有对象$row;
    */
    class ArtCaiji extends Job implements ShouldQueue
    {
        use InteractsWithQueue, SerializesModels;
        protected $row;
        /**
         * Create a new job instance.
         *
         * @return void
         */
        public function __construct($row)
        {
            //
            $this->row = $row;
        }
    
        /**
         * 下面是我们要执行的代码
         *
         * @return void
         */
        public function handle()
        {
            //
            $metas = array();
            $res    = array();
            if($this->row){
                $html = $this->get_url_content($this->row['href']);
                if($html){
                    phpQuery::newDoclamb($html);
                    foreach(pq('meta') as $meta){
                        $key = pq($meta)->attr('name');
                        $value= pq($meta)->attr('content');
                        $metas[strtolower($key)] = $value;
                    }
                    $title = trim(pq('.head-wrap .title')->text());
                    $articles = DB::table('articles')->where('art_title', $title)->first();
                    if($articles){
                        //Log::info('文章已经存在===='.$title);
                        phpQuery::$documents = array();
                        return true;
                    }else{
                        $insertedId = DB::table('articles')->insertGetId([
                            'user_id' => '10000',
                            'cate_id' => $this->row['cate_id'],
                            'art_title' => $title,
                            'art_tags' => $metas['keywords'],
                            'copy_from' => '本站原创',
                            'copy_url' => 'http://www.webshowu.com',
                            'art_intro' => $metas['description'],
                            'art_content' => $this->ImgFindShift_5118(pq('.content')->html()),
                            'art_views' => '10',
                            'art_status' => '3',
                            'created_at' => date('Y-m-d H:i:s',time()),
                            'updated_at' => date('Y-m-d H:i:s',time()),
                        ]);
                        if($insertedId){
                            $result = $this->zhanzhang_push_baidu("http://www.webshowu.com/artinfo-".$insertedId.".html");
                        }
                    }
                    phpQuery::$documents = array();
                    return true;
                }else{
                    Log::info(var_export($this->row,true));
                    return true;
                }
            }else{
                Log::info('对象是空值');
                return true;
            }
    
        }
        /*专业处理5118文章图片*/
        function ImgFindShift_5118($html){
            if($html){
                $array = array();
                $dochtml = phpQuery::newDoclamb($html);
                foreach(pq('img') as $img){
                    $val['src1'] = pq($img)->attr('data-original');
                    $val['src2'] = pq($img)->attr('src');
                    $array[] = $val;
                }
                foreach($array as $key => $str){
                    pq("img:eq($key)")->attr('src',$str['src1']);
                }
                return $dochtml;
            }else{
                return false;
            }
        }
        /** 获取指定URL内容 */
        function get_url_content($url, $proxy = true) {
            $data = '';
            $ch = curl_init();
            curl_setopt($ch, CURLOPT_URL, $url);
            curl_setopt($ch, CURLOPT_HEADER ,0);
            curl_setopt($ch, CURLOPT_CONNECTTIMEOUT,60);
            curl_setopt($ch, CURLOPT_RETURNTRANSFER,1); // 设置不将爬取代码写到浏览器,而是转化为字符串
            curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
            if($proxy){
                //AppKey 信息,请替换
                $appKey = 'xxx';
                //AppSecret 信息,请替换
                $secret = 'xxxx';
                //示例请求参数
                $paramMap = array(
                    'app_key'   => $appKey,
                    'timestamp' => date('Y-m-d H:i:s'),
                    'enable-simulate' => 'false',
                );
                //按照参数名排序
                ksort($paramMap);
                //连接待加密的字符串
                $codes = $secret;
                //请求的URL参数
                $auth = 'MYH-AUTH-MD5 ';
                foreach ($paramMap as $key => $val) {
                    $codes .= $key . $val;
                    $auth  .= $key . '=' . $val . '&';
                }
                $codes .= $secret;
                //签名计算
                $auth .= 'sign=' . strtoupper(md5($codes));
                curl_setopt($ch, CURLOPT_HTTPHEADER, array("Proxy-Authorization: {$auth}"));
                curl_setopt($ch, CURLOPT_PROXYAUTH, CURLAUTH_BASIC); //代理认证模式
                curl_setopt($ch, CURLOPT_PROXY, 'xxxx'); //代理服务器地址
                curl_setopt($ch, CURLOPT_PROXYPORT, 'xxx'); //代理服务器端口
                curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_HTTP); //使用http代理模式
            }
            $data = curl_exec($ch);
            if ($data  === FALSE) {
                return false;
            }
            curl_close($ch);
            if (!$data) {
                return false;
            } else {
                $encode = mb_detect_encoding($data, array('ascii', 'gb2312', 'utf-8', 'gbk'));
                if($encode != 'utf-8'){
                    if($encode == 'EUC-CN' || $encode == 'CP936'){
                        $data = @mb_convert_encoding($data, 'utf-8', 'gb2312');
                    }else{
                        $data = @mb_convert_encoding($data, 'utf-8', $encode);
                    }   
                }
                return $data;
            }
        }
        /** 百度站长工具推送代码 **/
        function zhanzhang_push_baidu($url){
    
            $urls = array($url);
            $api = 'http://data.zz.baidu.com/urls?site=www.webshowu.com&token=6ujhg0alnRLbwZr7';
            $ch = curl_init();
            $options =  array(
                CURLOPT_URL => $api,
                CURLOPT_POST => true,
                CURLOPT_RETURNTRANSFER => true,
                CURLOPT_POSTFIELDS => implode("\n", $urls),
                CURLOPT_HTTPHEADER => array('Content-Type: text/plain'),
            );
            curl_setopt_array($ch, $options);
            $result = curl_exec($ch);
            return $result;
        }
    
    }

    上线之后修改成redis

  • 必须安装好redis
  • 把代码 "predis/predis": "~1.0" 添加到composer.conf文件的require里面:代码如下
    {
      "name": "laravel/laravel",
      "description": "The Laravel Framework.",
      "keywords": ["framework", "laravel"],
      "license": "MIT",
      "type": "project",
      "require": {
          "php": ">=5.5.9",
          "laravel/framework": "5.2.*",
          "jenssegers/agent": "^2.3",
          "stevenyangecho/laravel-u-editor": "~1.3",
          "overtrue/laravel-lang": "1.0.*",
          "predis/predis": "~1.0" 
      },
      "require-dev": {
          "fzaninotto/faker": "~1.4",
          "mockery/mockery": "0.9.*",
          "phpunit/phpunit": "~4.0",
          "symfony/css-selector": "2.8.*|3.0.*",
          "symfony/dom-crawler": "2.8.*|3.0.*"
      },
      "autoload": {
          "classmap": [
              "database",
        "app/libs/phpQuery"
          ],
          "psr-4": {
              "App\\": "app/"
          }
      },
      "autoload-dev": {
          "classmap": [
              "tests/TestCase.php"
          ]
      },
      "scripts": {
          "post-root-package-install": [
              "php -r \"copy('.env.example', '.env');\""
          ],
          "post-create-project-cmd": [
              "php artisan key:generate"
          ],
          "post-install-cmd": [
              "php artisan clear-compiled",
              "php artisan optimize"
          ],
          "post-update-cmd": [
              "php artisan clear-compiled",
              "php artisan optimize"
          ]
      },
      "config": {
          "preferred-install": "dist"
      }
    }
  • 执行 composer update 代码!
  • 修改 redis 的配置信息 文件位置是:config/database.php 里面的 redis数组
  • 修改 队列 的配置信息 文件位置是: config/queue.php 里面的 default 值修改成redis
  • 如果你看到的都是这样书写代码 env('QUEUE_DRIVER', 'sync') 请到项目跟目录下面修改 .env 文件
  • 啊……!就这样简单!是的

    执行队列

  • 建议调试的时候直接使用 php artisan queue:listen
  • 线上直接使用supervisor安装配置使用都在这个网址里面https://learnku.com/laravel/t/2126

    任务调度(cron)

  • 重要的是要保证进程不会假死!挂掉!不能有一丝丝php错误信息!每一个 function 都需要return !正常执行完毕程序、进程会自动推出!我强烈建议每一个调度任务都建立一个console

    建立调度任务并编写代码

  • 下面的程序可以直接执行
    php artisan make:console CaiJiQi #创建调度任务命令文件 位置是 app/Console/Commands 文件里面
  • 下面是CaiJiQi.php文件里面的代码

    <?php
    
    namespace App\Console\Commands;
    
    use Illuminate\Console\Command;
    //以上是自动生成的
    use App\Jobs\RegList; //这里我调用了队列RegList 采集规则 处理程序
    use DB, Log; //这里调用了2个类库!DB是数据库类库、log是日志类库
    class caijiqi extends Command
    {
        /**
         * 设置用 php的artisan工具调用名字 
         * 调用代码直接在命令这样写 php artisan caiji 就执行现在这个文件
         * @var string
         */
        protected $signature = 'caiji';
    
        /**
         * 设置写 这个命令工具的描述 
         * 只能写英文的 中文会乱码 因为编码的问题
         * @var string
         */
        protected $description = 'zhu yao shi yong lai zuo ding shi cai ji';
    
        /**
         * 这一行可能有些朋友基础不好的!是看不明白!
         * 加载这个类的时候、直接执行父类的析构函数方法
         * @return void
         */
        public function __construct()
        {
            parent::__construct();
        }
    
        /**
         * 这才是我们的关键!可以开始写要执行的代码了!
         * 因为我使用不到传值!所以先不写了!你们需要那就去看laravel中文文档
         * @return mixed
         */
        public function handle()
        {
            //
            $reg_list = DB::table('Reg_List')->where('reg_status','3')->get(); //读取采集规则表里面所以规则
            foreach($reg_list as $str){
                dispatch(new RegList($str)); //然后遍历把它 推送给 RegList 采集规则队列
            }
            return true; //不能减少return啊!
        }
    }
  • 由于处理采集规则是一个长耗时的工作!所以我又创建了一个队列文件 RegList
  • 下面是RegList.php文件里面的代码

    <?php
    
    namespace App\Jobs;
    
    use App\Jobs\Job;
    use Illuminate\Queue\SerializesModels;
    use Illuminate\Queue\InteractsWithQueue;
    use Illuminate\Contracts\Queue\ShouldQueue;
    //以上代码是自动生成的
    use App\Jobs\ArtCaiji;///这里我调用了队列ArtCaiji 采集文章 处理程序
    use DB, phpQuery, Log;
    /*
    log类库主要是用来做调试 打印输出
    phpquery类库主要是用来做 网页上面的信息采集
    db类库主要是用来做 数据库的处理
    代码里面没有使用中间件!怕大家晕……
    类里面的 
    public function __construct($row)
    {
        //
        $this->row = $row;
    }
    不能省略这是队列工作任务类最重要的、传值方式!变量$row我传入的是一个对象值!然后把它赋给RegList类里面的私有对象$row;
    */
    class RegList extends Job implements ShouldQueue
    {
        use InteractsWithQueue, SerializesModels;
        protected $row;
        /**
         * Create a new job instance.
         *
         * @return void
         */
        public function __construct($row)
        {
            //
            $this->row = $row;
        }
    
        /**
         * Execute the job.
         *
         * @return void
         */
        public function handle()
        {
            //
            Log::info('调用了一次==='.$this->row->reg_url);
            $html = $this->get_url_content($this->row->reg_url);
            if($html){
                $data = array();
                phpQuery::newDoclamb($html);
                foreach(pq($this->row->reg_list) as $res){
                    $title = trim(pq($res)->text());
                    if($this->row->reg_ishost){
                        $href = $this->row->reg_host.pq($res)->attr('href');
                    }else{
                        $href = pq($res)->attr('href');
                    }
                    $data[] = $title.'======'.$href;
                }
                phpQuery::$documents = array();
                if($this->row->reg_content == json_encode($data)){
                    //Log::info('采集结果一样=='.$this->row->reg_url);
                }else{
                    $result = $this->ArtCaiji_array_diff($data,json_decode($this->row->reg_content,true));
                    if($result){
                        DB::table('Reg_List')->where('reg_id',$this->row->reg_id)->update(array('updated_at'=> date('Y-m-d H:i:s',time()),'reg_content'=>json_encode($data)));
                        $this->ArtCaiji_queue($result,$this->row->cate_id);
                        return true;
                    }
                }
            }else{
                Log::info('这个网址已经失去效果=='.$this->row->reg_url);
            }
        }
        /*文章一维数组进行对比返回不一样的值*/
        function ArtCaiji_array_diff($array1,$array2){
            if($array2!='' && $array1!=''){
                $result = array_diff($array1,$array2);
                return $result;
            }
            if($array2 == '' && $array1 !=''){}
            {
                $result = $array1;
                return $result;
            }
            return false;
        }
        /*文章队列提交*/
        function ArtCaiji_queue($array,$cate_id){
            if($array){
                foreach ($array as $value) {
                    $str = explode('======',$value);
                    $result['title'] = $str['0'];
                    $result['href'] = $str['1'];
                    $result['cate_id'] = $cate_id;
                    if($result['title'] != '' && $result['href'] != '' && $result['cate_id']){
                        dispatch(new ArtCaiji($result));
                    }
                }
                return true;
            }else{
                return true;
            }
        }
        /** 获取指定URL内容 */
        function get_url_content($url, $proxy = true) {
            $data = '';
            $ch = curl_init();
            curl_setopt($ch, CURLOPT_URL, $url);
            curl_setopt($ch, CURLOPT_HEADER ,0);
            curl_setopt($ch, CURLOPT_CONNECTTIMEOUT,60);
            curl_setopt($ch, CURLOPT_RETURNTRANSFER,1); // 设置不将爬取代码写到浏览器,而是转化为字符串
            curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
            if($proxy){
                //AppKey 信息,请替换
                $appKey = 'xx';
                //AppSecret 信息,请替换
                $secret = 'xxxx';
                //示例请求参数
                $paramMap = array(
                    'app_key'   => $appKey,
                    'timestamp' => date('Y-m-d H:i:s'),
                    'enable-simulate' => 'false',
                );
                //按照参数名排序
                ksort($paramMap);
                //连接待加密的字符串
                $codes = $secret;
                //请求的URL参数
                $auth = 'MYH-AUTH-MD5 ';
                foreach ($paramMap as $key => $val) {
                    $codes .= $key . $val;
                    $auth  .= $key . '=' . $val . '&';
                }
                $codes .= $secret;
                //签名计算
                $auth .= 'sign=' . strtoupper(md5($codes));
                curl_setopt($ch, CURLOPT_HTTPHEADER, array("Proxy-Authorization: {$auth}"));
                curl_setopt($ch, CURLOPT_PROXYAUTH, CURLAUTH_BASIC); //代理认证模式
                curl_setopt($ch, CURLOPT_PROXY, 'xxx'); //代理服务器地址
                curl_setopt($ch, CURLOPT_PROXYPORT, 'xxx'); //代理服务器端口
                curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_HTTP); //使用http代理模式
            }
            $data = curl_exec($ch);
            if ($data  === FALSE) {
                return false;
            }
            curl_close($ch);
            if (!$data) {
                return false;
            } else {
                $encode = mb_detect_encoding($data, array('ascii', 'gb2312', 'utf-8', 'gbk'));
                if($encode != 'utf-8'){
                    if($encode == 'EUC-CN' || $encode == 'CP936'){
                        $data = @mb_convert_encoding($data, 'utf-8', 'gb2312');
                    }else{
                        $data = @mb_convert_encoding($data, 'utf-8', $encode);
                    }   
                }
                return $data;
            }
        }
    }

    感激

有bug反馈

在使用中有任何问题,欢迎反馈给我,可以用以下联系方式跟我交流

  • qq群:480774617
本帖已被设为精华帖!
本帖由系统于 4年前 自动加精
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
讨论数量: 2
所有的队列 (queue) 和任务调度 (console) 里面的数据库 DB 操作!都需要使用 DB 类库!不可以使用 orm!区别是 mysql 长链接和短链接……    

博主这个如何理解?

4年前 评论
slpi1 4年前

对于异步的任务,因为无法知晓任务执行的时间,也就无法确定第一次数据库执行与下一次数据库执行的时间差。如果是用的数据库长连接(连接复用),可能会导致第二次执行时,数据库的连接已经丢失了,理论上这种场景要使用短连接。这因该就是题主的意思。

但是,在laravel5.5中,orm或DB操作,其实都是用的长连接,所以两者的使用并没有区别,laravel在进行数据库执行时,如果捕获了丢失连接的异常,会自动进行重连,所以并不存在这个问题。具体代码见

Illuminate\Database\Connection

    protected function run($query, $bindings, Closure $callback)
    {
        $this->reconnectIfMissingConnection();

        $start = microtime(true);

        // Here we will run this query. If an exception occurs we'll determine if it was
        // caused by a connection that has been lost. If that is the cause, we'll try
        // to re-establish connection and re-run the query with a fresh connection.
        try {
            $result = $this->runQueryCallback($query, $bindings, $callback);
        } catch (QueryException $e) {
            $result = $this->handleQueryException(
                $e, $query, $bindings, $callback
            );
        }

        // Once we have run the query we will calculate the time that it took to run and
        // then log the query, bindings, and execution time so we will report them on
        // the event that the developer needs them. We'll log time in milliseconds.
        $this->logQuery(
            $query, $bindings, $this->getElapsedTime($start)
        );

        return $result;
    }

        protected function handleQueryException($e, $query, $bindings, Closure $callback)
    {
        if ($this->transactions >= 1) {
            throw $e;
        }

        return $this->tryAgainIfCausedByLostConnection(
            $e, $query, $bindings, $callback
        );
    }
4年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!