情况最简单下的爬虫案例
文件建立日期: 2020/03/06
最后修订日期: None
相关软件信息:
Win 10 | Python 3.8.2 | PySimpleGUI 4.16.0 | BS4 4.8.2 |
说明: 本文请随意引用或更改, 只须标示出处及作者, 作者不保证内容絶对正确无误, 如造成任何后果, 请自行负责.
标题: 情况最简单下的爬虫案例
常看一些网络上的小说, 但都会碰到某些章节没下载或广告等等问题, 所以就写了一个简单的爬虫, 只针对简单的请求, 没有要设置Header, Cookie, 登入, 验证或VIP用户等, 就可以取得网页数据的小说网页
要求:
- 小说目录网址, 不需翻页, 以
https://www.wfxs.org/html/2/
为例 - 取得各章的网址
- 建立一个目录供存整本小说
- 每章各建立一个文本档案
- 为加速完成, 使用多线程
- 简单显示进行中的线程, 完成数及总数
线程说明
用了好几种方法, 常会碰到主程序结束, 但线程未全部完成, 章节的总数总是不对, 因此自己建立记录区, 自行管理, 终于解决这个问题, 明确所有的线程都已完成.
输出
说明及代码
- 使用的库
from pathlib import Path
from bs4 import BeautifulSoup as bs
from copy import deepcopy
import urllib.request as request
import _thread
import PySimpleGUI as sg
- 建立网页处理的类
class WEB():
def __init__(self):
self.base = 'https://www.wfxs.org' # 网页的根目录
self.root = '' # 小说储存的根目录
self.queue = {} # 记录目前正在工作的线程
self.max = 40 # 最大线程数
self.buffer = {} # 线程完成的章节小说内容, 待存档
self.temp = [] # 章节线程出错, 待重排入线程
self.count = 0 # 已存档章节数
self.not_allow = '''?|><"*'+,;=[]\x00/\\''' # 不合格的文件名文字
- 创建目录: 如果该目录已存在, 后面加上数字以区别
def create_subdirectory(self, name):
i, path = 1, Path(name)
while path.is_dir():
i += 1
path = Path(name+str(i))
self.root = path
path.mkdir()
- 读取该章节小说内容
由于内容放在head
中, 如果直接以head.text
读取, 也会取到子tag中的文字, 因此先移除其他所的子tag, 再以head.text
读取.
<html ……><head><title> … </title><meta … /> … 章内容文字</head>
def chapter_content(self, html):
for tag in html.head: tag.extract()
chapter_text = self.form(html.head.text).strip()
return chapter_text
- 将文字中的
<br>
以及多余的空行移除
def form(self, text):
text = text.replace('\xA0', '')
while '\n\n' in text:
text = text.replace('\n\n', '\n')
return text
- 获取一个新未使用的记录线程键值
def get_a_key(self):
for i in range(self.max):
if i not in self.queue:
return i
return None
- 读取目录中的作者名
find_all, tag为meta
, 参数name:author
; 读取content
的内容, 从右边分割字符串, 取最右边一个.
<meta content="绝品天医 版权归 叶天南"name="author"/>
def get_auther(self, html):
return html.find_all(
name ='meta', attrs={'name':'author'}
)[0].get('content').rsplit(sep=None, maxsplit=1)[-1]
- 读取目录中所有的章节名及链结
find_all, tag为<dd>
, 章节名为tag<a>
的text
, 链结为tag<a>
的href
值, 如果章节名空字符串, 略过.
<dd><a href="/html/2/3063.html">第十五章 庆元诊所</a></dd>
def get_chapters(self, html):
chapters = html.find_all(name='dd')
result = []
for chapter in chapters:
title = chapter.a.text.split('(')[0]
if title != '':
link = self.base + chapter.a.get('href')
result.append([self.valid(title), link])
return result
- 读取目录中对该书的简介
简介在tag<p>
, 参数class="tl pd8 pd10"
中<br>
后的text
<p class="tl pd8 pd10">作者︰叶天南写的小说《绝品天医》… <br/>.……..
def get_description(self, html):
return self.form(html.find(
name='p',
attrs={'class':"tl pd8 pd10"}).br.text)
- 读取目录中书名
书名在tag<h1>
中的text
, 因为要以书名来建立目录, 所以要移除书名中的非法字母
<h1 class="tc h10">绝品天医</h1>
def get_name(self, html):
return self.valid(html.h1.text)
- 加载目录中的书名, 作者, 简介, 各章名及其链结
def load_catalog(self, url):
status, html = self.load_html(url)
if status != 200:
return None, None, None, None, None
name = self.get_name(html)
auther = self.get_auther(html)
description = self.get_description(html)
chapters = self.get_chapters(html)
return status, name, auther, description, chapters
- 加载章节的小说内文, 再放入存盘用的缓冲区, 只要网页加载的结果不是
200
代码, 就从线程记录区移除, 并放入后面重排入线程.
def load_chapter(self, key, chapter, url):
status, html = self.load_html(url)
if status != 200:
self.temp.append([chapter, url])
del self.queue[key]
else:
chapter_text = self.chapter_content(html)
self.buffer[key] = [chapter, chapter_text]
return
- 根据网址读入HTML檔, 如果出错或状态代码不是200的都返回None, 指示错误, 后面再重新读取
该网页的编码为big5
, 译码如果有错,ignore
, 该字会略过不处理.
<meta http-equiv="Content-Type" content="text/html; charset=big5" />
def load_html(self, url):
try:
response = request.urlopen(url)
status = response.getcode()
except:
return None, ''
else:
if status == 200:
data = str(response.read(), encoding='big5', errors='ignore')
html = bs(data, features='html.parser')
return status, html
else:
return None, ''
- 删除线程记录
def queue_delete(self, key):
del self.queue[key]
- 线程加入记录中, 并启动, 批注中为非线程作法
def queue_insert(self, chapter, url):
key = self.get_a_key()
self.queue[key] = [chapter, url]
# self.load_chapter(key, chapter, url)
_thread.start_new_thread(self.load_chapter, (key, chapter, url))
- 检查线程记录是否已达到上限, 用来限制最大限线程数, 不会再加入新的限程
def queue_is_full(self):
return True if len(self.queue) == self.max else False
- 检查线程记录是否空, 用来确认所有的线程都已完成.
def queue_not_empty(self):
return True if len(self.queue) != 0 else False
- 储存小说书的说明档, 内含书名, 作者, 简介, 如果档案已存在, 附加额外数字以区别
def save_book(self, name, auther, description):
i, path = 1, self.root.joinpath(name+'.txt')
while path.is_file():
i += 1
path = self.root.joinpath(name+str(i)+'.txt')
text = '\n'.join(('书名: %s'%name, '作者: %s'%auther,
'简介: %s'%description))
with open(path, 'wt', encoding='utf-8') as f:
f.write(text)
- 储存小说的章节内文, 如果檔名存在, 附加额外数字以区别, 在存盘缓冲区以及线程记录中, 删除该章节.
def save_chapter(self):
buffer = deepcopy(self.buffer)
for key, value in buffer.items():
i, path = 1, self.root.joinpath(value[0]+'.txt')
while path.is_file():
i += 1
path = self.root.joinpath(value[0]+str(i)+'.txt')
with open(path, 'wt', encoding='utf-8') as f:
f.write(value[1])
self.count += 1
del self.buffer[key]
del self.queue[key]
- 将文件名中的非法字母移除, 避免存盘错误
def valid(self, text):
return ''.join((char for char in text if char not in self.not_allow))
- 主程序
- 如果目录无法加载, 结束程序
- 储存小说书的说明档
- 建立简单GUI, 显示进度, 并控制随时可以结束, 或保证所有的行程都已执行完毕
url = 'https://www.wfxs.org/html/2/' # 小说目录网址
W = WEB()
status, name, auther, description, chapters = W.load_catalog(url)
if status == None:
print('%s open failed !' % url)
quit()
W.create_subdirectory(name)
W.save_book(name, auther, description)
font = ('Courier New', 16, 'bold')
layout = [[sg.Text('', font=font, auto_size_text=False, key='Text1',
size=(W.max, 1))],
[sg.Text('', font=font, auto_size_text=False, key='Text2',
size=(W.max, 1))]]
window = sg.Window('Novel Download', layout=layout, finalize=True)
size = len(chapters)
all = deepcopy(chapters)
while len(all) != 0:
W.temp = []
for chapter, url in all:
W.save_chapter()
state, values = window.read(timeout=1)
if state == None:
window.close()
quit()
window['Text1'].update(value='■'*len(W.queue))
window['Text2'].update(
value='{}/{} chapters saved'.format(W.count, size))
while W.queue_is_full(): # 如果线程记录区已满, 存章节
W.save_chapter()
W.queue_insert(chapter, url) # 插入线程, 下载章节
# 如果线程记录区不是空的, 存章节, 跑完所有的线程, 再重新跑出错的章节
while W.queue_not_empty():
W.save_chapter()
all = deepcopy(W.temp)
while True:
state, values = window.read(timeout=100)
if state == None:
break
window['Text1'].update(value='■'*len(W.queue))
window['Text2'].update(value='{}/{} chapters saved'.format(W.count, size))
W.save_chapter() # 储存章节, 直到线程全部执行完毕
window.close()
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: