python线程池问题

最近使用python线程池,发现线程池不能强制关闭,所以修改了一下重写了一下,由于自身能力有限,不知道有没有问题,想通过大神指定一下,主要是希望线程池可以通过用户自己设置守护线程还是非守护线程,用户决定强不强制关闭,源码代码调整如下:


import atexit
import time
from concurrent.futures import _base
import itertools
import queue
import threading
import weakref
import os

# _threads_queues = weakref.WeakKeyDictionary()
# _shutdown = False


# def _python_exit():
#     global _shutdown
#     _shutdown = True
#     items = list(_threads_queues.items())
#     for t, q in items:
#         q.put(None)
#     for t, q in items:
#         t.join()
#
#
# atexit.register(_python_exit)


class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        # 判断任务是否被取消
        if not self.future.set_running_or_notify_cancel():
            return
        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)


def _worker(executor_reference, work_queue, initializer, initargs):
    if initializer is not None:
        try:
            initializer(*initargs)
        except BaseException:
            _base.LOGGER.critical('initializer执行异常:', exc_info=True)
            executor = executor_reference()
            if executor is not None:
                # 初始化方法调用异常
                executor._initializer_failed()
            return
    try:
        while True:
            # 如果队列中没有任务, 关闭线程
            if work_queue.qsize() == 0:
                work_queue.put(None)
            # 阻塞获取队列中的任务
            work_item = work_queue.get(block=True)
            # 如果任务不为None
            if work_item is not None:
                # 运行任务
                work_item.run()
                # 删除对象引用
                del work_item
                # 尝试增加空闲计数
                executor = executor_reference()
                if executor is not None:
                    executor._idle_semaphore.release()
                # 删除对象引用
                del executor
                continue
            # 如果任务为None
            executor = executor_reference()
            # 如果executor为空或executor._shutdown为True, 结束线程
            if executor is None or executor._shutdown:
                if executor is not None:
                    executor._shutdown = True
                work_queue.put(None)
                return
            executor._threads.discard(threading.current_thread())
            executor._idle_semaphore.acquire(timeout=0)
            del executor
            return
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)


class BrokenThreadPool(_base.BrokenExecutor):
    """
    当 ThreadPoolExecutor 中的工作线程初始化失败时引发。
    """


class ThreadPoolExecutorNew(_base.Executor):
    # 用于在未提供thread_name_prefix时分配唯一的线程名称。
    # 类变量, 类与类之间共享变量
    _counter = itertools.count().__next__

    def __init__(self, max_workers=None, thread_name_prefix='', daemon=True, initializer=None, initargs=()):
        if max_workers is None:
            max_workers = min(32, (os.cpu_count() or 1) + 4)
        if max_workers <= 0:
            raise ValueError("max_workers必须大于0")
        # callable() 函数用于检查一个对象是否是可调用的
        if initializer is not None and not callable(initializer):
            raise TypeError("initializer必须是一个可以调用的函数")
        self._max_workers = max_workers
        # 任务队列
        self._work_queue = queue.SimpleQueue()
        # 控制线程数, 有空线程时, 使用空线程执行任务
        self._idle_semaphore = threading.Semaphore(0)
        # 存放当前的所有线程
        self._threads = set()
        # 判断线程池是否正常
        self._broken = False
        # 标识线程池是否关闭
        self._shutdown = False
        self._daemon = daemon
        self._shutdown_lock = threading.Lock()
        self._thread_name_prefix = (thread_name_prefix or ("线程池-%d" % self._counter()))
        self._initializer = initializer
        self._initargs = initargs

    def submit(*args, **kwargs):
        if len(args) >= 2:
            self, fn, *args = args
        elif not args:
            raise TypeError("描述符 'submit' of 'ThreadPoolExecutor' 对象需要一个参数")
        elif 'fn' in kwargs:
            fn = kwargs.pop('fn')
            self, *args = args
            import warnings
            warnings.warn("不推荐将“fn”作为关键字参数传递", DeprecationWarning, stacklevel=2)
        else:
            raise TypeError('submit至少有一个参数, got %d' % (len(args) - 1))
        with self._shutdown_lock:
            # 如果线程池异常, 抛出异常
            # 什么情况下会导致线程池异常
            # 1 提供初始化操作, 初始化失败, 会导致线程池异常
            if self._broken:
                raise BrokenThreadPool(self._broken)
            # 线程池关闭,无法添加任务, 调用shutdown方法, 会导致submit无法提交任务
            if self._shutdown:
                raise RuntimeError('关闭后无法安排新的函数调用')
            # if _shutdown:
            #     raise RuntimeError('无法在解释器关闭后安排新的函数调用')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f

    submit.__text_signature__ = _base.Executor.submit.__text_signature__
    submit.__doc__ = _base.Executor.submit.__doc__

    def _adjust_thread_count(self):
        # 如果空闲线程可用,请不要旋转新线程
        if self._idle_semaphore.acquire(timeout=0):
            return

        # 当执行器丢失时,弱引用回调将唤醒工作线程。
        def weakref_cb(_, q=self._work_queue):
            q.put(None)

        num_threads = len(self._threads)
        if num_threads < self._max_workers:
            thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads)
            t = threading.Thread(name=thread_name, target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue,
                                       self._initializer,
                                       self._initargs))
            t.setDaemon(self._daemon)
            t.start()
            self._threads.add(t)
            # _threads_queues[t] = self._work_queue

    def _initializer_failed(self):
        with self._shutdown_lock:
            self._broken = ('线程初始值设定项失败,线程池“”不再可用',)
            # Drain work queue and mark pending futures failed
            while True:
                try:
                    work_item = self._work_queue.get_nowait()
                except queue.Empty:
                    break
                if work_item is not None:
                    work_item.future.set_exception(BrokenThreadPool(self._broken))

    def shutdown(self, wait=True, timeout=None):
        with self._shutdown_lock:
            self._shutdown = True
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()
        if not wait:
            if timeout is not None:
                for t1 in self._threads:
                    t1.join(timeout)
            if timeout is None:
                for t2 in self._threads:
                    t2.join(0)

    shutdown.__doc__ = _base.Executor.shutdown.__doc__

    # def __exit__(self, exc_type, exc_val, exc_tb):
    #     self.shutdown(wait=False)
    #     return False
def thh(p):
    time.sleep(1)


if __name__ == '__main__':
    t = ThreadPoolExecutorNew(max_workers=10, daemon=True)
    tt = []
    start = time.time()
    for i in range(100):
        r = t.submit(thh, i)
        tt.append(r)
    for rr in tt:
        rr.result()
    print(time.time() - start)
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!