11.4. threading — 管理单个进程里的并行操作

目的:管理几个线程执行。

使用线程允许一个程序在同一个进程空间中并发运行多个操作。

Thread 对象

最简单的使用一个 Thread 是去使用一个目标函数实例化它,然后调用 start() 让线程运行。

threading_simple.py

import threading

def worker():
    """thread worker function"""
    print('Worker')

threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

输出是五行 Worker

$ python3 threading_simple.py

Worker
Worker
Worker
Worker
Worker

能够创建一个线程并且传递给它参数告诉它做什么是很有用的。任何对象都可以作为参数传递给线程。这个例子传递了一个数字,然后在线程中打印出来。

threading_simpleargs.py

import threading

def worker(num):
    """thread worker function"""
    print('Worker: %s' % num)

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

整数参数现在包括在每行的输出消息中。

$ python3 threading_simpleargs.py

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

确定当前线程

使用参数或者名字来标识出线程的方法并不优雅且也没有必要。每个线程 Thread 实例自带一个名字,这个名字也可以在线程创建时更改。如果你要在多段运行中使用多个线程处理不同的操作,务必不要忘记给每个线程命名名字。

threading_names.py

import threading
import time

def worker():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.2)
    print(threading.current_thread().getName(), 'Exiting')

def my_service():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.3)
    print(threading.current_thread().getName(), 'Exiting')

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker)  # 使用默认名字

w.start()
w2.start()
t.start()

调试的输出信息是各个线程当前的名字。 线程名字叫 "Thread-1" 的是我们并未命名的 w2 线程的名字。

$ python3 threading_names.py

worker Starting
Thread-1 Starting
my_service Starting
worker Exiting
Thread-1 Exiting
my_service Exiting

大多数程序都不会用 print 来进行调试。  logging 模块支持在日志信息中写入线程的名字,你可以用格式化代码 %(threadName)s 来得到它。日志信息中写入了线程名字可以更好得定位源代码中相关代码。

threading_names_log.py

import logging
import threading
import time

def worker():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')

def my_service():
    logging.debug('Starting')
    time.sleep(0.3)
    logging.debug('Exiting')

logging.basicConfig(
    level=logging.DEBUG,
    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
)

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker)  # 使用默认名字

w.start()
w2.start()
t.start()

logging 同时也是线程安全的,所以即使是不同线程中的信息也会保证清晰明确。

$ python3 threading_names_log.py

[DEBUG] (worker    ) Starting
[DEBUG] (Thread-1  ) Starting
[DEBUG] (my_service) Starting
[DEBUG] (worker    ) Exiting
[DEBUG] (Thread-1  ) Exiting
[DEBUG] (my_service) Exiting

守护 vs. 非守护

说到这点,例子中隐式得等待了所有线程执行完成后退出。有时程序所生成的线程是 守护 状态的,也就是说不会阻塞主线程退出。守护线程可以用在那些不容易从线程中退出的服务中。或者那些要让线程在运行中退出但不会丢失或者污染其他数据的服务(比如某个线程负责发起「心跳检测」)。把某线程标记为守护线程,可以在构建时传入 daemon=True 或调用 set_daemon() 设置为 True。默认情况下,线程是非守护的。

threading_daemon.py

import threading
import time
import logging

def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

设置为守护的线程我们可以看到并未打印出"Exiting",因为所有非守护线程(包括主线程)在守护线程被 sleep() 唤醒前就结束了。

$ python3 threading_daemon.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting

如果要等待标记为守护的线程结束,可以使用 join() 方法。

threading_daemon_join.py

import threading
import time
import logging

def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

使用 join() 后,守护线程就有机会执行完成并打印出 "Exiting"

$ python3 threading_daemon_join.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon    ) Exiting

默认情况下,join() 会无限期阻塞直到线程完成。我们也可以传递一个浮点数来表示我们阻塞的秒数。如果在超时时间内线程并未结束,join() 就会返回,不再继续等待。

threading_daemon_join_timeout.py

import threading
import time
import logging

def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()

因为超时时间小于守护线程的睡眠时间,在 join() 返回后该线程仍然是 「存活」状态。

$ python3 threading_daemon_join_timeout.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
d.isAlive() True

枚举所有线程

没必要为所有守护线程保留一个显示句柄以确保他们在主进程退出之前已经完成。 enumerate()  返回一个激活的 Thread 实例列表。这个列表包括当前线程,由于加入当前线程会导致死锁情况,所以应该跳过。

threading_enumerate.py

import random
import threading
import time
import logging

def worker():
    """thread worker function"""
    pause = random.randint(1, 5) / 10
    logging.debug('sleeping %0.2f', pause)
    time.sleep(pause)
    logging.debug('ending')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(3):
    t = threading.Thread(target=worker, daemon=True)
    t.start()

main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is main_thread:
        continue
    logging.debug('joining %s', t.getName())
    t.join()

由于 worker 随机休眠了一些时间,所以输出可能会有所变化。

$ python3 threading_enumerate.py

(Thread-1  ) sleeping 0.20
(Thread-2  ) sleeping 0.30
(Thread-3  ) sleeping 0.40
(MainThread) joining Thread-1
(Thread-1  ) ending
(MainThread) joining Thread-3
(Thread-2  ) ending
(Thread-3  ) ending
(MainThread) joining Thread-2

继承 Thread

在启动的时候,Thread 做了一些基本的初始化然后调用了 run() 方法,他然后调用了传入构造器的目标函数。为了创建 Thread 的子类,可以重写 run() 然后做任何你想做的。

threading_subclass.py

import threading
import logging

class MyThread(threading.Thread):

    def run(self):
        logging.debug('running')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t = MyThread()
    t.start()

run() 的返回值被忽略了。

$ python3 threading_subclass.py

(Thread-1  ) running
(Thread-2  ) running
(Thread-3  ) running
(Thread-4  ) running
(Thread-5  ) running

由于 传入 Thread 构造函数的 argskwargs 被保存在以 -- 开头的私有属性中,所以不能在子类中访问到。为了传入参数给自定义的线程类型,可以重新定义构造函数,把参数保存在能够在子类中看到的实例属性中。

threading_subclass_args.py

import threading
import logging

class MyThreadWithArgs(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        super().__init__(group=group, target=target, name=name,
                         daemon=daemon)
        self.args = args
        self.kwargs = kwargs

    def run(self):
        logging.debug('running with %s and %s',
                      self.args, self.kwargs)

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
    t.start()

MyThreadWithArgs  使用了和 Thread 相同的 API,但 MyThreadWithArgs 相对 Thread 可以轻松地改变改造函数去更直接地获取到更多不同的参数,就像其他类一样。

$ python3 threading_subclass_args.py

(Thread-1  ) running with (0,) and {'b': 'B', 'a': 'A'}
(Thread-2  ) running with (1,) and {'b': 'B', 'a': 'A'}
(Thread-3  ) running with (2,) and {'b': 'B', 'a': 'A'}
(Thread-4  ) running with (3,) and {'b': 'B', 'a': 'A'}
(Thread-5  ) running with (4,) and {'b': 'B', 'a': 'A'}

计时器线程

Timer 提供了一个继承 Thread 的例子,也包含在 threading 模块中。Timer 在延迟一段时间后启动工作,他可以在延迟的这段时间内任何时间点取消。

threading_timer.py

import threading
import time
import logging

def delayed():
    logging.debug('worker running')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

t1 = threading.Timer(0.3, delayed)
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')

logging.debug('starting timers')
t1.start()
t2.start()

logging.debug('waiting before canceling %s', t2.getName())
time.sleep(0.2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')

例子中的第二个计时器从来不会运行,并且第一个计时器好像是在主程序完成之后运行的。由于它不是守护线程,但是它在主线程完成时被隐式加入了。

$ python3 threading_timer.py

(MainThread) starting timers
(MainThread) waiting before canceling t2
(MainThread) canceling t2
(MainThread) done
(t1        ) worker running

线程间的信号

虽然我们使用多线程是为了同时运行多个操作,不过有时我们也需要同步它们。在线程间安全通信的方式可以使用事件对象。每个 Event (事件)内部都有一个标记,我们可以用 set()clear() 方法控制它。其他线程可以使用 wait() 来暂停直到标记被设置才重新启动,使用这方法可以有效阻塞执行。

threading_event.py

import logging
import threading
import time

def wait_for_event(e):
    """做任何事前先等待事件被设置。"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)

def wait_for_event_timeout(e, t):
    """等待 t 秒。"""
    while not e.is_set():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

e = threading.Event()
t1 = threading.Thread(
    name='block',
    target=wait_for_event,
    args=(e,),
)
t1.start()

t2 = threading.Thread(
    name='nonblock',
    target=wait_for_event_timeout,
    args=(e, 2),
)
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(0.3)
e.set()
logging.debug('Event is set')

wait() 方法可以接收一个参数,表示事件等待的超时时间。同时它返回一个布尔类型的对象,指代事件被设置了没有,所以我们可以根据它的返回来进行下一步行动。 事件中的 is_set() 方法可以单独使用而不必担心阻塞住。

本例中, wait_for_event_timeout() 会检测事件的状态但并不会一直阻塞。 wait_for_event() 则因调用的是 wait() 而阻塞住直到事件状态改变才会返回。

$ python3 threading_event.py

(block     ) wait_for_event starting
(nonblock  ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(MainThread) Event is set
(nonblock  ) event set: True
(nonblock  ) processing event
(block     ) event set: True

控制资源访问

除了同步多个线程的操作,控制共享资源的访问以防止污染或丢失数据也是非常重要的。 Python 的内置数据结构(列表(list),字典(dict)等....)都是线程安全的,有「原子操作」的对象都是这样。(全局解释器锁会保护这样的 Python 内部数据结构在更新时线程不会被释放)。其他 Python 的数据结构或者说较简单的类型如整数浮点数则不会受此保护。我们可以使用 Lock 对象来保护某对象的访问。

threading_lock.py

import logging
import random
import threading
import time

class Counter:

    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start

    def increment(self):
        logging.debug('Waiting for lock')
        self.lock.acquire()
        try:
            logging.debug('Acquired lock')
            self.value = self.value + 1
        finally:
            self.lock.release()

def worker(c):
    for i in range(2):
        pause = random.random()
        logging.debug('Sleeping %0.02f', pause)
        time.sleep(pause)
        c.increment()
    logging.debug('Done')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

logging.debug('Waiting for worker threads')
main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
logging.debug('Counter: %d', counter.value)

本例中,worker() 函数会调用 Counter 实例中的 increment,这里面管理着一把 Lock 来防止两个线程在同一时间内改变内部状态。如果没有用 Lock,很可能会丢失 value 属性的值。

$ python3 threading_lock.py

(Thread-1  ) Sleeping 0.18
(Thread-2  ) Sleeping 0.93
(MainThread) Waiting for worker threads
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Sleeping 0.11
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Done
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Sleeping 0.81
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Done
(MainThread) Counter: 4

从当前线程中得知锁是否被其他线程占用可以向 acquire() 传递 False 来替换 阻塞 的原参数来立即得知。下个例子中,worker() 会尝试获得三次锁,并计算总共尝试了几次才获得这三次锁。同时,lock_holder() 的循环会不断获取并释放锁,每次都有一小段间隔来模拟「正在加载...」。

threading_lock_noblock.py

import logging
import threading
import time

def lock_holder(lock):
    logging.debug('Starting')
    while True:
        lock.acquire()
        try:
            logging.debug('Holding')
            time.sleep(0.5)
        finally:
            logging.debug('Not holding')
            lock.release()
        time.sleep(0.5)

def worker(lock):
    logging.debug('Starting')
    num_tries = 0
    num_acquires = 0
    while num_acquires < 3:
        time.sleep(0.5)
        logging.debug('Trying to acquire')
        have_it = lock.acquire(0)
        try:
            num_tries += 1
            if have_it:
                logging.debug('Iteration %d: Acquired',
                              num_tries)
                num_acquires += 1
            else:
                logging.debug('Iteration %d: Not acquired',
                              num_tries)
        finally:
            if have_it:
                lock.release()
    logging.debug('Done after %d iterations', num_tries)

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()

holder = threading.Thread(
    target=lock_holder,
    args=(lock,),
    name='LockHolder',
    daemon=True,
)
holder.start()

worker = threading.Thread(
    target=worker,
    args=(lock,),
    name='Worker',
)
worker.start()

我们可以看到,worker() 为了获取三次锁而尝试了5次。

$ python3 threading_lock_noblock.py

(LockHolder) Starting
(LockHolder) Holding
(Worker    ) Starting
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 1: Acquired
(LockHolder) Holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 2: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 3: Acquired
(LockHolder) Holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 4: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 5: Acquired
(Worker    ) Done after 5 iterations

可重入锁

普通的 Lock 对象即使是在同一个线程中也不能被获取两次。如果在一次调用链中多个函数都需要一把锁,使用普通锁可以说是灾难性的。

threading_lock_reacquire.py

import threading

lock = threading.Lock()

print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))

上面的例子中,我们要预先给 acquire() 传入 0 来防止它一直阻塞(因为已经被获取了嘛)。

$ python3 threading_lock_reacquire.py

First try : True
Second try: False

如果在同一个线程中多个地方都需要一把锁,我们可以使用可重入的 RLock 来代替 Lock

threading_rlock.py

import threading

lock = threading.RLock()

print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))

只要把 Lock 变成 RLock 就可以啦。(译注:acquire() 两次,release() 也要两次喔。)

$ python3 threading_rlock.py

First try : True
Second try: True

锁的上下文管理器

锁自带上下文管理器 API 同时兼容 with 语句。 使用 with 我们可以不必写出显式的获取和释放。

threading_lock_with.py

import threading
import logging

def worker_with(lock):
    with lock:
        logging.debug('Lock acquired via with')

def worker_no_with(lock):
    lock.acquire()
    try:
        logging.debug('Lock acquired directly')
    finally:
        lock.release()

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))

w.start()
nw.start()

worker_with()worker_no_with() 所实现的功能一模一样的。

$ python3 threading_lock_with.py

(Thread-1  ) Lock acquired via with
(Thread-2  ) Lock acquired directly

同步线程

除了使用 Events,另一种同步线程的方法是使用 Condition 对象。 Condition 使用了 Lock,所以它会绑定共享的资源,也就会让多个线程等待资源更新完成。下面的例子中 consumer() 在继续进行前会等待 Condition 设置完成。producer() 线程则负责设置 Condition 并通知其他线程可以继续了。

threading_condition.py

import logging
import threading
import time

def consumer(cond):
    """等待 Condition,之后消耗资源。"""
    logging.debug('Starting consumer thread')
    with cond:
        cond.wait()
        logging.debug('Resource is available to consumer')

def producer(cond):
    """设置 consumer 所需的资源"""
    logging.debug('Starting producer thread')
    with cond:
        logging.debug('Making resource available')
        cond.notifyAll()

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
                      args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
                      args=(condition,))
p = threading.Thread(name='p', target=producer,
                     args=(condition,))

c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()

我们使用了上节了解的到 with 用法。 直接用 acquire()release() 也是一样的效果。

$ python3 threading_condition.py

2016-07-10 10:45:28,170 (c1) Starting consumer thread
2016-07-10 10:45:28,376 (c2) Starting consumer thread
2016-07-10 10:45:28,581 (p ) Starting producer thread
2016-07-10 10:45:28,581 (p ) Making resource available
2016-07-10 10:45:28,582 (c1) Resource is available to consumer
2016-07-10 10:45:28,582 (c2) Resource is available to consumer

「屏障」(Barrier)是另一种线程同步的机制。
每个 Barrier 会建立起一个控制点,所有处在其中的线程都会被阻塞,直到所有的线程都到达这个控制点。它会让所有的线程单独启动,然后在它们全都准备好执行下一步前先阻塞住。

threading_barrier.py

import threading
import time

def worker(barrier):
    print(threading.current_thread().name,
          'waiting for barrier with {} others'.format(
              barrier.n_waiting))
    worker_id = barrier.wait()
    print(threading.current_thread().name, 'after barrier',
          worker_id)

NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS)

threads = [
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,),
    )
    for i in range(NUM_THREADS)
]

for t in threads:
    print(t.name, 'starting')
    t.start()
    time.sleep(0.1)

for t in threads:
    t.join()

本例中,Barrier 被用来阻塞住三个线程。当条件适宜,所有的线程都会在同一时间在那个阻塞的点被释放。 wait() 的返回值表示所释放的是哪个线程,这样我们就可以用来限制一些线程的操作比如清理某共享资源。

$ python3 threading_barrier.py

worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-0 after barrier 0
worker-1 after barrier 1

Barrierabort() 方法会导致所有等待中的线程接收到一个 BrokenBarrierError。 我们可以使用此方法来告知那些被阻塞住的线程该结束了。

threading_barrier_abort.py

import threading
import time

def worker(barrier):
    print(threading.current_thread().name,
          'waiting for barrier with {} others'.format(
              barrier.n_waiting))
    try:
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        print(threading.current_thread().name, 'aborting')
    else:
        print(threading.current_thread().name, 'after barrier',
              worker_id)

NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS + 1)

threads = [
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,),
    )
    for i in range(NUM_THREADS)
]

for t in threads:
    print(t.name, 'starting')
    t.start()
    time.sleep(0.1)

barrier.abort()

for t in threads:
    t.join()

这次我们将 Barrier 设置成比实际开始的线程多一个,这样所有的线程就会被阻塞住,我们调用 abort() 就可以引起 BrokenBarrierError 了。

$ python3 threading_barrier_abort.py

worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-0 aborting
worker-2 aborting
worker-1 aborting

限制并发访问

有时我们需要允许多个工作函数在同一时间访问同一个资源,但我们也要限制可访问的总数。比如,一个连接池支持的是固定大小的连接量,一个网络应用支持的也是固定的并发下载量。基于这样的情况,使用 Semaphore 来管理这些连接是其中的一种方式。

threading_semaphore.py

import logging
import random
import threading
import time

class ActivePool:

    def __init__(self):
        super(ActivePool, self).__init__()
        self.active = []
        self.lock = threading.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            logging.debug('Running: %s', self.active)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('Running: %s', self.active)

def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:
        name = threading.current_thread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
    t = threading.Thread(
        target=worker,
        name=str(i),
        args=(s, pool),
    )
    t.start()

例子中, ActivePool 类只是用来追踪给定时刻下哪些线程在工作的。如果是实际情况中,资源池一般还要分配连接或者其他值给新的活动线程,并且当线程结束后回收这些值。在这个例子中,它只被用来显示线程的名字以表示同时最多只有两个线程在工作。

$ python3 threading_semaphore.py

2016-07-10 10:45:29,398 (0 ) Waiting to join the pool
2016-07-10 10:45:29,398 (0 ) Running: ['0']
2016-07-10 10:45:29,399 (1 ) Waiting to join the pool
2016-07-10 10:45:29,399 (1 ) Running: ['0', '1']
2016-07-10 10:45:29,399 (2 ) Waiting to join the pool
2016-07-10 10:45:29,399 (3 ) Waiting to join the pool
2016-07-10 10:45:29,501 (1 ) Running: ['0']
2016-07-10 10:45:29,501 (0 ) Running: []
2016-07-10 10:45:29,502 (3 ) Running: ['3']
2016-07-10 10:45:29,502 (2 ) Running: ['3', '2']
2016-07-10 10:45:29,607 (3 ) Running: ['2']
2016-07-10 10:45:29,608 (2 ) Running: []

线程特定数据

当某些资源需要被锁定才能让多个线程使用,其他资源需要被保护以让并不拥有它的线程使用它时,local() 类就可以在每个线程中创建一个用于隐藏值的对象容器。

threading_local.py

import random
import threading
import logging

def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)

def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

local_data.value 在当前的线程设置任何值前,对于当前线程来说它都什么都没有。

$ python3 threading_local.py

(MainThread) No value yet
(MainThread) value=1000
(Thread-1  ) No value yet
(Thread-1  ) value=33
(Thread-2  ) No value yet
(Thread-2  ) value=74

如果想设置一个初始值,需要我们继承后自己重写 __init__() 方法。

threading_local_defaults.py

import random
import threading
import logging

def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)

def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)

class MyLocal(threading.local):

    def __init__(self, value):
        super().__init__()
        logging.debug('Initializing %r', self)
        self.value = value

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

local_data = MyLocal(1000)
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

只要设置了初始值,__init__() 的调用都是同一个对象(注意 id() 的值)。

$ python3 threading_local_defaults.py

(MainThread) Initializing <__main__.MyLocal object at
0x101c6c288>
(MainThread) value=1000
(Thread-1  ) Initializing <__main__.MyLocal object at
0x101c6c288>
(Thread-1  ) value=1000
(Thread-1  ) value=18
(Thread-2  ) Initializing <__main__.MyLocal object at
0x101c6c288>
(Thread-2  ) value=1000
(Thread-2  ) value=77

参考

本文章首发在 LearnKu.com 网站上。
上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~