自建线程管理
建立日期: 2020/05/16, 并入 Tool.py
修订日期: None
相关软件信息:
Win 10 | Python 3.7.6 |
说明: 本文请随意引用或更改,只须标示出处及作者,作者不保证内容绝对正确无误,如造成任何后果,请自行负责.
自建线程管理
在底层的 _threading 库中, 线程结束是没有任何通知的, 上层 threading 库中有提供一些功能, 总觉得还是很难搞, 所以自己就写了一个class, 来处理这类的事.
处理方法
- 所有的线程使用同一个使用者的函数.
- 建立一个待处理列表, 其中放的是使用者函数的参数tuple.
- 建立一个队列 queue, 内容是正在进行的线程, 以独立的识别码作为 key.
- 建立一个使用函数返回 False 的出错队列, 可以再排入待处理列表.
- 建立内部线程, 用来执行使用者函数, 如果使用者函数返回False, 将其加入出错队列; 如果使用者函数出错但不返回False, 就会被认为已完成. 不管有没有出错, 都会从进行的线程队列中移除.
- 建立一个线程管理的主线程, 负责将待处理列表中的项目加入到正在进行的线程队列 queue.
- 提供几个函数供查询 - 已完成线程的数目, 未完成线程的数目, 进行的线程队列 queue是否已满, 所有的线程是否已完成.
- 提供一个属性 stop, True 只能用来停止”将要进行”的所有线程, 进行中的线程, 是无法停止的.
- 还有一个 fail 参数, 用来指定如果有使用者函数返回 False 时, 要执行的动作, 比如反爬虫时, 可能要暂停等待一段时间, 才能继续.
使用方式
from time import sleep
def f(a, b):
print(a*10+b)
def error():
sleep(1)
args = [(a, b) for a in range(10) for b in range(10)]
T = Thread(f, args, fail=error, size=20)
while not T.all_finished():
pass
print('All jobs done !')
代码:
import _thread
class Thread():
def __init__(self, func, sequence, fail=None, size=40):
"""
Manage jobs for threading by list.
: Parameters
func - callable method to thread.
sequence - list of tuple, each arguments for func.
fail - callable method if func return Fail.
size - max size of thread queue.
: Return
Obejct of Thread manager.
"""
self.func = func
self.fail = fail
self.all = sequence
self.total = len(self.all)
self.temp = {}
self.queue = {}
self.size = size
self.stop = False
if callable(self.func) and isinstance(sequence, (tuple, list)):
_thread.start_new_thread(self._host, ())
def all_finished(self):
"""
Return True if all threads finished.
"""
return True if (
sum(map(len, [self.queue, self.all, self.temp]))==0) else False
def jobs_done(self):
"""
Return the number of jobs done.
"""
return self.total - self.jobs_left()
def jobs_left(self):
"""
Return the number of jobs not finished.
"""
return len(self.all) + len(self.tmp) + len(self.queue)
def queue_is_full(self):
"""
Return True if queue is full.
"""
return True if len(self.queue) == self.size else False
def _get_a_key(self):
"""
Get an integer ID for each new thread.
"""
for i in range(self.size):
if i not in self.queue:
return i
return None
def _host(self):
"""
Manager of all threads for thread run, failed process.
"""
while True:
if self.stop:
break
if len(self.all) == 0 and len(self.queue) == 0:
self.stop = True
length = min(self.size-len(self.queue), len(self.all))
for i in range(length):
self._queue_insert(self.func, self.all[i])
self.all = self.all[length:]
if len(self.temp) != 0:
for key, value in self.temp.items():
self.all.append(value)
del self.temp[key]
if callable(self.fail):
self.fail()
def _queue_delete(self, key):
"""
Delete thread in queue after thread done.
"""
del self.queue[key]
def _queue_insert(self, func, value):
"""
Run thread and add it into queue.
: Parameters
func - callable function for thread
value - tuple, arguements for func.
: Return
None
"""
key = self._get_a_key()
self.queue[key] = value
_thread.start_new_thread(self._thread_func, (func, key, value))
def _thread_func(self, func, key, value):
"""
Internal thread for control of user thread.
: Paramters
func - callable function for thread
key - integer ID for each new thread.
value - tuple, arguements for func.
: Return
None
"""
if self.stop:
return
if func(*value) == False:
self.temp[key] = value
self._queue_delete(key)
本作品采用《CC 协议》,转载必须注明作者和本文链接