11.7. concurrent.futures — 管理并发任务池

未匹配的标注

本节目标:更容易管理以并发和并行方式执行的任务

concurrent.futures 模块提供一个使用线程池或进程池执行任务的接口。 两者的 API t 相同,所以我们只需要进行少量修改即可在线程与进程间切换。

模块中包含两种类型的池接口类。 通常使用 Executor 来管理池中的 worker, 使用 futures 来管理池中 worker 所返回的值。要想使用池,我们需要创建适当的 Executor 类实例,之后将任务提交给它去执行。当任务开始时,会返回 Future 实例。当我们需要提取任务的结果时,程序就会先返回 Future 阻塞住,直到有结果再进行下一步。不过有很多便捷的 API 来让我们等待任务完成,所以我们不必直接管理 Future 对象。

使用基础线程池的 map ()#

ThreadPoolExecutor 管理着一组线程 worker,当它们可以工作时我们就可以传递任务过去。下面的例子使用 map() 并发的生成一组数据。task 中使用 time.sleep() 暂停一小会来展示生成数据的顺序无关并发任务的执行顺序,不过 map() 返回的结果却总以执行顺序有关。

futures_thread_pool_map.py

from concurrent import futures
import threading
import time

def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results))

map() 所返回的值实际上是一个特殊的迭代器,它被主程序迭代时会等待其中的并发任务返回,没返回就一直等待。

$ python3 futures_thread_pool_map.py

main: starting
ThreadPoolExecutor-0_0: sleeping 5
ThreadPoolExecutor-0_1: sleeping 4
main: unprocessed results <generator object
Executor.map.<locals>.result_iterator at 0x103e12780>
main: waiting for real results
ThreadPoolExecutor-0_1: done with 4
ThreadPoolExecutor-0_1: sleeping 3
ThreadPoolExecutor-0_0: done with 5
ThreadPoolExecutor-0_0: sleeping 2
ThreadPoolExecutor-0_0: done with 2
ThreadPoolExecutor-0_0: sleeping 1
ThreadPoolExecutor-0_1: done with 3
ThreadPoolExecutor-0_0: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]

执行独立任务#

除了使用 map() ,我们还可以用 submit() 来执行单个任务 ,也是使用的 Future 实例等待任务结果的返回。

futures_thread_pool_submit.py

from concurrent import futures
import threading
import time

def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))

任务执行完成后,future 的状态就会改变,同时 result() 也就可以访问了。

$ python3 futures_thread_pool_submit.py

main: starting
ThreadPoolExecutor-0_0: sleeping 5
main: future: <Future at 0x1034e1ef0 state=running>
main: waiting for results
ThreadPoolExecutor-0_0: done with 5
main: result: 0.5
main: future after result: <Future at 0x1034e1ef0 state=finished
 returned float>

无序状态的任务#

调用 Futureresult() 方法会阻塞到该任务完成(无论是返回了值还是抛出了异常)或者该任务取消。使用 map() 可以以原本的顺序访问多个任务的返回值。如果顺序无关紧要,也可以使用 as_completed() 在某任务执行完成后立即进行处理。

futures_as_completed.py

from concurrent import futures
import random
import time

def task(n):
    time.sleep(random.random())
    return (n, n / 10)

ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')

wait_for = [
    ex.submit(task, i)
    for i in range(5, 0, -1)
]

for f in futures.as_completed(wait_for):
    print('main: result: {}'.format(f.result()))

由于我们的线程池中有足够多的 worker,所以所有的任务都可以立即得到执行。它们的完成时间是随机的,所以每次运行例子 as_completed() 所生成的数据也是随机的。

$ python3 futures_as_completed.py

main: starting
main: result: (1, 0.1)
main: result: (5, 0.5)
main: result: (3, 0.3)
main: result: (2, 0.2)
main: result: (4, 0.4)

Future 回调#

当某任务完成时,如果我们不想显式的等待其结果后再做下一步操作,我们可以使用 add_done_callback() 来添加一个函数让 Future 完成时自动调用它。这个回调函数应该是一个可调用对象,而且也只接受 Future 实例作为参数。

futures_future_callback.py

from concurrent import futures
import time

def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10

def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print('{}: error returned: {}'.format(
                fn.arg, error))
        else:
            result = fn.result()
            print('{}: value returned: {}'.format(
                fn.arg, result))

if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    f = ex.submit(task, 5)
    f.arg = 5
    f.add_done_callback(done)
    result = f.result()

无论如何回调函数都会被 Future 「完成」时调用,所以我们有必要检测下 Future 是以何种状态「完成」的,然后在做其他操作。

$ python3 futures_future_callback.py

main: starting
5: sleeping
5: done
5: value returned: 0.5

取消任务#

每个 Future 都可以被取消,如果它只是被提交但还没开始,我们可以调用 cancel() 方法取消掉该 Future

futures_future_callback_cancel.py

from concurrent import futures
import time

def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10

def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        print('{}: not canceled'.format(fn.arg))

if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    tasks = []

    for i in range(10, 0, -1):
        print('main: submitting {}'.format(i))
        f = ex.submit(task, i)
        f.arg = i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, t in reversed(tasks):
        if not t.cancel():
            print('main: did not cancel {}'.format(i))

    ex.shutdown()

cancel() 方法返回一个布尔类型的值,表示该任务是否已被取消掉。

$ python3 futures_future_callback_cancel.py

main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled

任务中的异常#

如果执行任务时抛出了一个未处理的异常,那该异常会存储在 Future 中,最终可以通过 result()exception() 来获取到它。

futures_future_exception.py

from concurrent import futures

def task(n):
    print('{}: starting'.format(n))
    raise ValueError('the value {} is no good'.format(n))

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)

error = f.exception()
print('main: error: {}'.format(error))

try:
    result = f.result()
except ValueError as e:
    print('main: saw error "{}" when accessing result'.format(e))

如果调用的是 result() ,那么发生在执行任务中的异常会在当前上下文中被再次抛出。

$ python3 futures_future_exception.py

main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result

上下文管理器#

我们可以以上下文管理器的方式运行 Executor,它会自动等待所有任务完成。上下文管理器退出时,Executorshuntdown() 方法会被调用。

futures_context_manager.py

from concurrent import futures

def task(n):
    print(n)

with futures.ThreadPoolExecutor(max_workers=2) as ex:
    print('main: starting')
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)

print('main: done')

当线程或进程资源需要在执行结束时清理时,我们可以使用这种方法快速有效的书写。

$ python3 futures_context_manager.py

main: starting
1
2
3
4
main: done

进程池#

ProcessPoolExecutor 的使用方式与 ThreadPoolExecutor 一样,只不过是用进程代替了线程。进程池可以有效利用多核 CPU 达到处理 CPU 密集型任务而无需因 CPtyhon 的全局解释器锁受到限制。

futures_process_pool_map.py

from concurrent import futures
import os

def task(n):
    return (n, os.getpid())

ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
    print('ran task {} in process {}'.format(n, pid))

如同线程池一样,每个独立的进程都能重复利用处理多个任务。

$ python3 futures_process_pool_map.py

ran task 5 in process 40854
ran task 4 in process 40854
ran task 3 in process 40854
ran task 2 in process 40854
ran task 1 in process 40854

如果在执行期间发生了意料之外的错误导致退出, ProcessPoolExecutor 就会被打断不会再执行其他任务。

futures_process_pool_broken.py

from concurrent import futures
import os
import signal

with futures.ProcessPoolExecutor(max_workers=2) as ex:
    print('getting the pid for one worker')
    f1 = ex.submit(os.getpid)
    pid1 = f1.result()

    print('killing process {}'.format(pid1))
    os.kill(pid1, signal.SIGHUP)

    print('submitting another task')
    f2 = ex.submit(os.getpid)
    try:
        pid2 = f2.result()
    except futures.process.BrokenProcessPool as e:
        print('could not start new tasks: {}'.format(e))

BrokenProcessPool 异常在处理结果时会抛出,而不是新任务提交时。

$ python3 futures_process_pool_broken.py

getting the pid for one worker
killing process 40858
submitting another task
could not start new tasks: A process in the process pool was
terminated abruptly while the future was running or pending.

参考#

本文章首发在 LearnKu.com 网站上。

本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

原文地址:https://learnku.com/docs/pymotw/concurre...

译文地址:https://learnku.com/docs/pymotw/concurre...

上一篇 下一篇
讨论数量: 0
发起讨论 查看所有版本


暂无话题~