爬虫进阶:框架功能升级之断点续爬

未匹配的标注

断点续爬

断点续爬设计分析

断点续爬设计原理介绍:

断点续爬的效果:爬虫程序中止后,再次启动,对已经发起的请求不再发起,而是直接从之前的队列中获取请求继续执行。这也就意味着需要实现以下两点:

1.去重标识(历史请求的指纹)持久化存储,使得新的请求可以和以前的请求进行去重对比

2.请求队列也需要持久化存储

其实也就是程序的中止不会造成请求队列和去重容器的消失,再次启动程序后,还能继续访问它们。

断点续爬无丢失方案的实现

断点续爬无丢失的代码实现:

  • 添加备份容器:利用redis的hash类型类对每一个请求对象进行存储
  • 为Request对象设置重试次数属性
  • 在调度器的get_request方法中实现响应的逻辑判断
  • 实现delete_request方法:从备份中删除对应的Reqeust对象
  • 实现add_lost_request方法
  • 在引擎中调用这些方法,完成断点续爬无丢失需求
# scrapy_plus/redis_hash.py
'''实现一个对redis哈希类型的操作封装'''
import redis
import pickle

from scrapy_plus.http.request import Request
from scrapy_plus.conf import settings


class RedisBackupRequest(object):
    '''利用hash类型,存储每一个请求对象,key是指纹,值就是请求对象'''

    REDIS_BACKUP_NAME = settings.REDIS_BACKUP_NAME
    REDIS_BACKUP_HOST = settings.REDIS_BACKUP_HOST
    REDIS_BACKUP_PORT = settings.REDIS_BACKUP_PORT
    REDIS_BACKUP_DB = settings.REDIS_BACKUP_DB


    def __init__(self):
        self._redis = redis.StrictRedis(host=self.REDIS_BACKUP_HOST, port=self.REDIS_BACKUP_PORT ,db=self.REDIS_BACKUP_DB)
        self._name = self.REDIS_BACKUP_NAME

    # 增删改查
    def save_request(self, fp, request):
        '''将请求对象备份到redis的hash中'''
        bytes_data = pickle.dumps(request)
        self._redis.hset(self._name, fp, bytes_data)

    def delete_request(self, fp):
        '''根据请求的指纹,将其删除'''
        self._redis.hdel(self._name, fp)

    def update_request(self, fp, request):
        '''更新已有的fp'''
        self.save_request(fp, request)

    def get_requests(self):
        '''返回全部的请求对象'''
        for fp, bytes_request in self._redis.hscan_iter(self._name):
            request = pickle.loads(bytes_request)
            yield request
  • 为Request对象增加重试次数属性:

      class Request(object):
          '''框架内置请求对象,设置请求信息'''
    
          def __init__(self, url, method='GET', headers=None, params=None, data=None, parse='parse', filter=True, meta=None):
              self.url = url    # 请求地址
              self.method = method    # 请求方法
              self.headers = headers    # 请求头
              self.params = params    # 请求参数
              self.data = data    # 请求体
              self.parse = parse    # 指明它的解析函数, 默认是parse方法
              self.filter = filter  # 是否进行去重,默认是True
              self.retry_time = 0    # 重试次数
              self.meta = meta
  • 修改调度器,实现对应的逻辑以及方法:

      # scrapy_plus/core/scheduler.py
      ......
      from scrapy_plus.redis_hash import RedisBackupRequest
      ......
    
      class Scheduler(object):
          '''
          缓存请求对象(Request),并为下载器提供请求对象,实现请求的调度
          对请求对象进行去重判断
          '''
          def __init__(self,collector):
    
              if SCHEDULER_PERSIST: #如果使用分布式或者是持久化,使用redis的队列
                  self.queue = ReidsQueue()
                  self._filter_container = RedisFilterContainer()
              else:
                  self.queue = Queue()
                  self._filter_container = NoramlFilterContainer()
              self.collector = collector
    
          def add_reqeust(self, request):
              '''存储request对象进入队列
              return: None
              '''
              # 先判断是否要去重
              if request.filter is False:
                  self.queue.put(request)
                  logger.info("添加请求成功<disable去重>[%s %s]" % (request.method, request.url))
                  self.total_request_number += 1  # 统计请求总数
                  return # 必须return
    
              # 判断去重,如果重复,就不添加,否则才添加
              fp = self._gen_fp(request)
              if not self.filter_request(fp, request):
                  # 往队列添加请求
                  logger.info("添加请求成功[%s %s]"%(request.method.upper(), request.url))
                  self.queue.put(request)
                  if settings.ROLE in ['master', 'slave']:
                      self._backup_request.save_request(fp, request)   # 对请求进行备份
                  # 如果是新的请求,那么就添加进去重容器,表示请求已经添加到了队列中
                  self._filter_container.add_fp(fp)
    
                  self.total_request_number += 1
              else:
                  self.repeat_request_number += 1
    
          def get_request(self):
              '''从队列取出一个请求对象
              return: Request Object
              '''
              try:
                  request = self.queue.get(False)
              except:
                  return None
              else:
                  if request.filter is True and settings.ROLE in ['master', 'slave']:  # 先判断 是否需要进行去重
                      # 判断重试次数是否超过规定
                      fp = self._gen_fp(request)
                      if request.retry_time >= settings.MAX_RETRY_TIMES:
                          self._backup_request.delete_request(fp)    # 如果超过,那么直接删除
                          logger.warnning("出现异常请求,且超过最大尝试的次数:[%s]%s"%(request.method, request.url))
                      request.retry_time += 1   # 重试次数+1
    
                      self._backup_request.update_request(fp, request)  # 并更新到备份中
                  return request
    
          def delete_request(self, request):
              '''根据请求从备份删除对应的请求对象'''
              if settings.ROLE in ['master', 'slave']:
                  fp = self._gen_fp(request)
                  self._backup_request.delete_request(fp)
    
          def add_lost_reqeusts(self):
              '''将丢失的请求对象再添加到队列中'''
              # 从备份容器取出来,放到队列中
              if settings.ROLE in ['master', 'slave']:
                  for request in self._backup_request.get_requests():
                      self.queue.put(request)
          ......

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 查看所有版本


暂无话题~