11.7. concurrent.futures — 管理并发任务池
本节目标:更容易管理以并发和并行方式执行的任务
concurrent.futures
模块提供一个使用线程池或进程池执行任务的接口。 两者的 API t相同,所以我们只需要进行少量修改即可在线程与进程间切换。
模块中包含两种类型的池接口类。 通常使用 Executor 来管理池中的 worker
, 使用 futures 来管理池中 worker
所返回的值。要想使用池,我们需要创建适当的 Executor 类实例,之后将任务提交给它去执行。当任务开始时,会返回 Future
实例。当我们需要提取任务的结果时,程序就会先返回 Future
阻塞住,直到有结果再进行下一步。不过有很多便捷的 API 来让我们等待任务完成,所以我们不必直接管理 Future
对象。
使用基础线程池的 map()
ThreadPoolExecutor
管理着一组线程 worker
,当它们可以工作时我们就可以传递任务过去。下面的例子使用 map()
并发的生成一组数据。task
中使用 time.sleep()
暂停一小会来展示生成数据的顺序无关并发任务的执行顺序,不过 map()
返回的结果却总以执行顺序有关。
futures_thread_pool_map.py
from concurrent import futures
import threading
import time
def task(n):
print('{}: sleeping {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: done with {}'.format(
threading.current_thread().name,
n)
)
return n / 10
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results))
map()
所返回的值实际上是一个特殊的迭代器,它被主程序迭代时会等待其中的并发任务返回,没返回就一直等待。
$ python3 futures_thread_pool_map.py
main: starting
ThreadPoolExecutor-0_0: sleeping 5
ThreadPoolExecutor-0_1: sleeping 4
main: unprocessed results <generator object
Executor.map.<locals>.result_iterator at 0x103e12780>
main: waiting for real results
ThreadPoolExecutor-0_1: done with 4
ThreadPoolExecutor-0_1: sleeping 3
ThreadPoolExecutor-0_0: done with 5
ThreadPoolExecutor-0_0: sleeping 2
ThreadPoolExecutor-0_0: done with 2
ThreadPoolExecutor-0_0: sleeping 1
ThreadPoolExecutor-0_1: done with 3
ThreadPoolExecutor-0_0: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]
执行独立任务
除了使用 map()
,我们还可以用 submit()
来执行单个任务 ,也是使用的 Future
实例等待任务结果的返回。
futures_thread_pool_submit.py
from concurrent import futures
import threading
import time
def task(n):
print('{}: sleeping {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: done with {}'.format(
threading.current_thread().name,
n)
)
return n / 10
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))
任务执行完成后,future 的状态就会改变,同时 result()
也就可以访问了。
$ python3 futures_thread_pool_submit.py
main: starting
ThreadPoolExecutor-0_0: sleeping 5
main: future: <Future at 0x1034e1ef0 state=running>
main: waiting for results
ThreadPoolExecutor-0_0: done with 5
main: result: 0.5
main: future after result: <Future at 0x1034e1ef0 state=finished
returned float>
无序状态的任务
调用 Future
的 result()
方法会阻塞到该任务完成(无论是返回了值还是抛出了异常)或者该任务取消。使用 map()
可以以原本的顺序访问多个任务的返回值。如果顺序无关紧要,也可以使用 as_completed()
在某任务执行完成后立即进行处理。
futures_as_completed.py
from concurrent import futures
import random
import time
def task(n):
time.sleep(random.random())
return (n, n / 10)
ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')
wait_for = [
ex.submit(task, i)
for i in range(5, 0, -1)
]
for f in futures.as_completed(wait_for):
print('main: result: {}'.format(f.result()))
由于我们的线程池中有足够多的 worker
,所以所有的任务都可以立即得到执行。它们的完成时间是随机的,所以每次运行例子 as_completed()
所生成的数据也是随机的。
$ python3 futures_as_completed.py
main: starting
main: result: (1, 0.1)
main: result: (5, 0.5)
main: result: (3, 0.3)
main: result: (2, 0.2)
main: result: (4, 0.4)
Future 回调
当某任务完成时,如果我们不想显式的等待其结果后再做下一步操作,我们可以使用 add_done_callback()
来添加一个函数让 Future
完成时自动调用它。这个回调函数应该是一个可调用对象,而且也只接受 Future
实例作为参数。
futures_future_callback.py
from concurrent import futures
import time
def task(n):
print('{}: sleeping'.format(n))
time.sleep(0.5)
print('{}: done'.format(n))
return n / 10
def done(fn):
if fn.cancelled():
print('{}: canceled'.format(fn.arg))
elif fn.done():
error = fn.exception()
if error:
print('{}: error returned: {}'.format(
fn.arg, error))
else:
result = fn.result()
print('{}: value returned: {}'.format(
fn.arg, result))
if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
f.arg = 5
f.add_done_callback(done)
result = f.result()
无论如何回调函数都会被 Future
「完成」时调用,所以我们有必要检测下 Future
是以何种状态「完成」的,然后在做其他操作。
$ python3 futures_future_callback.py
main: starting
5: sleeping
5: done
5: value returned: 0.5
取消任务
每个 Future
都可以被取消,如果它只是被提交但还没开始,我们可以调用 cancel()
方法取消掉该 Future
。
futures_future_callback_cancel.py
from concurrent import futures
import time
def task(n):
print('{}: sleeping'.format(n))
time.sleep(0.5)
print('{}: done'.format(n))
return n / 10
def done(fn):
if fn.cancelled():
print('{}: canceled'.format(fn.arg))
elif fn.done():
print('{}: not canceled'.format(fn.arg))
if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
tasks = []
for i in range(10, 0, -1):
print('main: submitting {}'.format(i))
f = ex.submit(task, i)
f.arg = i
f.add_done_callback(done)
tasks.append((i, f))
for i, t in reversed(tasks):
if not t.cancel():
print('main: did not cancel {}'.format(i))
ex.shutdown()
cancel()
方法返回一个布尔类型的值,表示该任务是否已被取消掉。
$ python3 futures_future_callback_cancel.py
main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled
任务中的异常
如果执行任务时抛出了一个未处理的异常,那该异常会存储在 Future
中,最终可以通过 result()
或 exception()
来获取到它。
futures_future_exception.py
from concurrent import futures
def task(n):
print('{}: starting'.format(n))
raise ValueError('the value {} is no good'.format(n))
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
error = f.exception()
print('main: error: {}'.format(error))
try:
result = f.result()
except ValueError as e:
print('main: saw error "{}" when accessing result'.format(e))
如果调用的是 result()
,那么发生在执行任务中的异常会在当前上下文中被再次抛出。
$ python3 futures_future_exception.py
main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result
上下文管理器
我们可以以上下文管理器的方式运行 Executor
,它会自动等待所有任务完成。上下文管理器退出时,Executor
的 shuntdown()
方法会被调用。
futures_context_manager.py
from concurrent import futures
def task(n):
print(n)
with futures.ThreadPoolExecutor(max_workers=2) as ex:
print('main: starting')
ex.submit(task, 1)
ex.submit(task, 2)
ex.submit(task, 3)
ex.submit(task, 4)
print('main: done')
当线程或进程资源需要在执行结束时清理时,我们可以使用这种方法快速有效的书写。
$ python3 futures_context_manager.py
main: starting
1
2
3
4
main: done
进程池
ProcessPoolExecutor
的使用方式与 ThreadPoolExecutor
一样,只不过是用进程代替了线程。进程池可以有效利用多核 CPU 达到处理 CPU 密集型任务而无需因 CPtyhon 的全局解释器锁受到限制。
futures_process_pool_map.py
from concurrent import futures
import os
def task(n):
return (n, os.getpid())
ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
print('ran task {} in process {}'.format(n, pid))
如同线程池一样,每个独立的进程都能重复利用处理多个任务。
$ python3 futures_process_pool_map.py
ran task 5 in process 40854
ran task 4 in process 40854
ran task 3 in process 40854
ran task 2 in process 40854
ran task 1 in process 40854
如果在执行期间发生了意料之外的错误导致退出, ProcessPoolExecutor
就会被打断不会再执行其他任务。
futures_process_pool_broken.py
from concurrent import futures
import os
import signal
with futures.ProcessPoolExecutor(max_workers=2) as ex:
print('getting the pid for one worker')
f1 = ex.submit(os.getpid)
pid1 = f1.result()
print('killing process {}'.format(pid1))
os.kill(pid1, signal.SIGHUP)
print('submitting another task')
f2 = ex.submit(os.getpid)
try:
pid2 = f2.result()
except futures.process.BrokenProcessPool as e:
print('could not start new tasks: {}'.format(e))
BrokenProcessPool
异常在处理结果时会抛出,而不是新任务提交时。
$ python3 futures_process_pool_broken.py
getting the pid for one worker
killing process 40858
submitting another task
could not start new tasks: A process in the process pool was
terminated abruptly while the future was running or pending.
参考
- concurrent.futures 标准库文档
- PEP 3148 -- 创建
concurrent.futures
功能集合的提案.- 线程,进程,协程交叉使用
threading
multiprocessing
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。