自建线程管理

建立日期: 2020/05/16, 并入 Tool.py

修订日期: None

相关软件信息:

Win 10 Python 3.7.6

说明: 本文请随意引用或更改,只须标示出处及作者,作者不保证内容绝对正确无误,如造成任何后果,请自行负责.

自建线程管理

在底层的 _threading 库中, 线程结束是没有任何通知的, 上层 threading 库中有提供一些功能, 总觉得还是很难搞, 所以自己就写了一个class, 来处理这类的事.

处理方法

  1. 所有的线程使用同一个使用者的函数.
  2. 建立一个待处理列表, 其中放的是使用者函数的参数tuple.
  3. 建立一个队列 queue, 内容是正在进行的线程, 以独立的识别码作为 key.
  4. 建立一个使用函数返回 False 的出错队列, 可以再排入待处理列表.
  5. 建立内部线程, 用来执行使用者函数, 如果使用者函数返回False, 将其加入出错队列; 如果使用者函数出错但不返回False, 就会被认为已完成. 不管有没有出错, 都会从进行的线程队列中移除.
  6. 建立一个线程管理的主线程, 负责将待处理列表中的项目加入到正在进行的线程队列 queue.
  7. 提供几个函数供查询 - 已完成线程的数目, 未完成线程的数目, 进行的线程队列 queue是否已满, 所有的线程是否已完成.
  8. 提供一个属性 stop, True 只能用来停止”将要进行”的所有线程, 进行中的线程, 是无法停止的.
  9. 还有一个 fail 参数, 用来指定如果有使用者函数返回 False 时, 要执行的动作, 比如反爬虫时, 可能要暂停等待一段时间, 才能继续.

使用方式

from time import sleep

def f(a, b):
    print(a*10+b)

def error():
    sleep(1)

args = [(a, b) for a in range(10) for b in range(10)]

T = Thread(f, args, fail=error, size=20)

while not T.all_finished():
    pass
print('All jobs done !')

代码:

import _thread

class Thread():

    def __init__(self, func, sequence, fail=None, size=40):
        """
        Manage jobs for threading by list.
        : Parameters
          func - callable method to thread.
          sequence - list of tuple, each arguments for func.
          fail - callable method if func return Fail.
          size - max size of thread queue.
        : Return
          Obejct of Thread manager.
        """
        self.func = func
        self.fail = fail
        self.all = sequence
        self.total = len(self.all)
        self.temp = {}
        self.queue = {}
        self.size = size
        self.stop = False
        if callable(self.func) and isinstance(sequence, (tuple, list)):
            _thread.start_new_thread(self._host, ())

    def all_finished(self):
        """
        Return True if all threads finished.
        """
        return True if (
            sum(map(len, [self.queue, self.all, self.temp]))==0) else False

    def jobs_done(self):
        """
        Return the number of jobs done.
        """
        return self.total - self.jobs_left()

    def jobs_left(self):
        """
        Return the number of jobs not finished.
        """
        return len(self.all) + len(self.tmp) + len(self.queue)

    def queue_is_full(self):
        """
        Return True if queue is full.
        """
        return True if len(self.queue) == self.size else False

    def _get_a_key(self):
        """
        Get an integer ID for each new thread.
        """
        for i in range(self.size):
            if i not in self.queue:
                return i
        return None

    def _host(self):
        """
        Manager of all threads for thread run, failed process.
        """
        while True:
            if self.stop:
                break
            if len(self.all) == 0 and len(self.queue) == 0:
                self.stop = True
            length = min(self.size-len(self.queue), len(self.all))
            for i in range(length):
                self._queue_insert(self.func, self.all[i])
            self.all = self.all[length:]
            if len(self.temp) != 0:
                for key, value in self.temp.items():
                    self.all.append(value)
                    del self.temp[key]
                if callable(self.fail):
                    self.fail()

    def _queue_delete(self, key):
        """
        Delete thread in queue after thread done.
        """
        del self.queue[key]

    def _queue_insert(self, func, value):
        """
        Run thread and add it into queue.
        : Parameters
          func - callable function for thread
          value - tuple, arguements for func.
        : Return
          None
        """
        key = self._get_a_key()
        self.queue[key] = value
        _thread.start_new_thread(self._thread_func, (func, key, value))

    def _thread_func(self, func, key, value):
        """
        Internal thread for control of user thread.
        : Paramters
          func - callable function for thread
          key - integer ID for each new thread.
          value - tuple, arguements for func.
        : Return
          None
        """
        if self.stop:
            return
        if func(*value) == False:
            self.temp[key] = value
        self._queue_delete(key)
本作品采用《CC 协议》,转载必须注明作者和本文链接

Jason Yang

讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!