爬虫进阶:框架功能完善-多爬虫实现之线程池实现异步

未匹配的标注

利用线程池实现异步

异步任务分析:

在引擎中,实现的主要功能如下

  • 上面的方框中是关于start_urls中的请求处理
  • 下面的方框中是一个请求从调度器取出请求,进行下载之后交给爬虫解析再交给管道的过程 在以上两个过程中,他们之间没有直接的联系,都可以通过异步多线程的方式分别实现,加快程序执行的速度

那么具体该如何实现该逻辑

  • multiprocessing.dummy 提供的Pool 类具有apply_async的方法,能够异步的执行让他运行的函数
  • apply_async方法能够接收一个callback,即其中的函数执行完成之后继续会做的事情,在这里,我们可以定义一个callback,其中让他继续执行上图中下方框的任务,同时给他一个停止条件,

利用回调实现循环

利用回调实现递归,可以达到循环的目的

# scrapy_plus/core/engine.py
import time
from multiprocessing.dummy import Pool    # 导入线程池对象
import importlib
from datetime import datetime

from scrapy_plus.http.request import Request    # 导入Request对象
from scrapy_plus.utils.log import logger    # 导入logger
from scrapy_plus.conf import settings

from .scheduler import Scheduler
from .downloader import Downloader


class Engine(object):
    '''
    a. 对外提供整个的程序的入口
    b. 依次调用其他组件对外提供的接口,实现整个框架的运作(驱动)
    '''

    def __init__(self):

        self.scheduler = Scheduler()    # 初始化调度器对象
        self.downloader = Downloader()    # 初始化下载器对象

        self.spiders = self._auto_import_instances(settings.SPIDERS, True)  # 动态导入并实例化爬虫对象
        self.pipelines = self._auto_import_instances(settings.PIPELINES)  # 动态导入并实例化管道对象
        self.spider_mids = self._auto_import_instances(settings.SPIDER_MIDDLEWARES)  # 动态导入并实例化爬虫中间件对象
        self.downloader_mids = self._auto_import_instances(settings.DOWNLOADER_MIDDLEWARES)  # 动态导入并实例化下载器中间件对象

        self.total_response_number = 0

        self.pool = Pool()  # 创建线程池对象
        self.running = False  # 记录是否退出程序的状态

    def start(self):
        '''启动整个引擎'''
        start = datetime.now()  # 起始时间
        logger.info("开始运行时间:%s" % start)  # 使用日志记录起始运行时间
        self._start_engine()
        stop = datetime.now()  # 结束时间
        logger.info("开始运行时间:%s" % stop)  # 使用日志记录结束运行时间
        logger.info("耗时:%.2f" % (stop - start).total_seconds())  # 使用日志记录运行耗时

    def _start_requests(self):
        '''向调度器添加初始请求'''
        # 1. 爬虫模块发出初始请求
        for spider_name, spider in self.spiders.items():
            for start_request in spider.start_requests():
                # 2. 把初始请求添加给调度器
                # 利用爬虫中间件预处理请求对象
                for spider_mid in self.spider_mids:
                    start_request = spider_mid.process_request(start_request)
                start_request.spider_name = spider_name  # 为请求对象绑定它所属的爬虫的名称
                self.scheduler.add_request(start_request)

    def _execute_request_response_item(self):
        '''根据请求、发起请求获取响应、解析响应、处理响应结果'''
        # 3. 从调度器获取请求对象,交给下载器发起请求,获取一个响应对象
        request = self.scheduler.get_request()
        if request is None:
            return
        # 利用下载器中间件预处理请求对象
        for downloader_mid in self.downloader_mids:
            request = downloader_mid.process_request(request)
        # 4. 利用下载器发起请求
        response = self.downloader.get_response(request)
        # 利用下载器中间件预处理响应对象
        for downloader_mid in self.downloader_mids:
            response = downloader_mid.process_response(response)

        spider = self.spiders[request.spider_name]  # 根据请求的spider_name属性,获取对应的爬虫对象

        # 5. 利用爬虫的解析响应的方法,处理响应,得到结果
        parse = getattr(spider, request.parse)    # 获取对应的解析函数
        results = parse(response)    # parse函数的返回值是一个容器,如列表或者生成器对象
        for result in results:
            # 6. 判断结果对象
            # 6.1 如果是请求对象,那么就再交给调度器
            if isinstance(result, Request):
                # 利用爬虫中间件预处理请求对象
                for spider_mid in self.spider_mids:
                    result = spider_mid.process_request(result)
                result.spider_name = request.spider_name  # 为请求对象绑定它所属的爬虫的名称
                self.scheduler.add_request(result)
            # 6.2 否则,就交给管道处理
            else:
                # 利用爬虫中间件预处理数据对象
                for spider_mid in self.spider_mids:
                    result = spider_mid.process_item(result)
                for pipeline in self.pipelines:    # 多个管道对象,轮流处理item对象
                    result = pipeline.process_item(result, spider)

        # 统计响应总数
        self.total_response_number += 1

    def _callback(self, temp):
        '''执行新的请求的回调函数,实现循环'''
        if self.running is True:  # 如果还没满足退出条件,那么继续添加新任务,否则不继续添加,终止回调函数,达到退出循环的目的
            self.pool.apply_async(self._execute_request_response_item, callback=self._callback)

    def _start_engine(self):
        '''依次调用其他组件对外提供的接口,实现整个框架的运作(驱动)'''
        self.running = True  # 启动引擎,设置状态为True
        # 向调度器添加初始请求
        self.pool.apply_async(self._start_requests)  # 使用异步

        self.pool.apply_async(self._execute_request_response_item, callback=self._callback)  # 利用回调实现循环

        # 设置循环,处理多个请求
        while True:
            time.sleep(0.0001)  # 避免cpu空转,消耗性能

            # 根据请求、发起请求获取响应、解析响应、处理响应结果
            # self._execute_request_response_item()

            # 设置退出条件:当请求数和响应数相等时,退出循环
            # 因为异步,需要增加判断,请求数不能为0
            if self.total_response_number >= self.scheduler.total_request_number and self.scheduler.total_request_number != 0:
                self.running = False  # 满足循环退出条件后,设置运行状态为False
                break

        self.pool.close()
        self.pool.join()

实现异步并发控制

在配置文件中设置最大并发数,并在引擎中使用

# scrapy_plus/core/engine.py
class Engine(object):

    ......

    def _start_engine(self):
        self.running = True
        '''依次调用其他组件对外提供的接口,实现整个框架的运作(驱动)'''
        # 向调度器添加初始请求
        self.pool.apply_async(self._start_requests)    # 使用异步

        # 控制最大并发数
        for i in range(settings.MAX_ASYNC_NUMBER):
            self.pool.apply_async(self._execute_request_response_item, callback=self._callback)    # 利用回调实现循环

        while True:
            time.sleep(0.0001)    # 避免cpu空转,消耗性能
            # 设置退出条件:当请求数和响应数相等时,退出循环
            # 因为异步,需要增加判断,请求数不能为0
            if self.total_response_number >= self.scheduler.total_request_number and self.scheduler.total_request_number != 0:
                self.running = False    # 满足循环退出条件后,设置运行状态为False
                break
        self.pool.close()
        self.pool.join()

对异步任务进行异常控制,增加异常回调函数error_callback

# scrapy_plus/core/engine.py
class Engine(object):

    ......

    def _callback(self, temp):
        '''执行新的请求的回调函数,实现循环'''
        if self.running is True:    # 如果还没满足退出条件,那么继续添加新任务,否则不继续添加,终止回调函数,达到退出循环的目的
            self.pool.apply_async(self._execute_request_response_item, callback=self._callback, error_callback=self._error_callback)

    def _error_callback(self, exception):
        '''异常回调函数'''
        try:
            raise exception    # 抛出异常后,才能被日志进行完整记录下来
        except Exception as e:
            logger.exception(e)

    def _start_engine(self):
        self.running = True
        '''依次调用其他组件对外提供的接口,实现整个框架的运作(驱动)'''
        # 向调度器添加初始请求
        self.pool.apply_async(self._start_requests, error_callback=self._error_callback)    # 使用异步

        for i in range(settings.MAX_ASYNC_NUMBER):
            self.pool.apply_async(self._execute_request_response_item, callback=self._callback, error_callback=self._error_callback)    # 利用回调实现循环

        while True:
            time.sleep(0.0001)    # 避免cpu空转,消耗性能
            # 设置退出条件:当请求数和响应数相等时,退出循环
            # 因为异步,需要增加判断,请求数不能为0
            if self.total_response_number >= self.scheduler.total_request_number and self.scheduler.total_request_number != 0:
                self.running = False    # 满足循环退出条件后,设置运行状态为False
                break
        self.pool.close()
        self.pool.join()

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 查看所有版本


暂无话题~