11.7. concurrent.futures — 管理并发任务池

本节目标:更容易管理以并发和并行方式执行的任务

concurrent.futures 模块提供一个使用线程池或进程池执行任务的接口。 两者的 API t相同,所以我们只需要进行少量修改即可在线程与进程间切换。

模块中包含两种类型的池接口类。 通常使用 Executor 来管理池中的 worker, 使用 futures 来管理池中 worker 所返回的值。要想使用池,我们需要创建适当的 Executor 类实例,之后将任务提交给它去执行。当任务开始时,会返回 Future 实例。当我们需要提取任务的结果时,程序就会先返回 Future 阻塞住,直到有结果再进行下一步。不过有很多便捷的 API 来让我们等待任务完成,所以我们不必直接管理 Future 对象。

使用基础线程池的 map()

ThreadPoolExecutor 管理着一组线程 worker,当它们可以工作时我们就可以传递任务过去。下面的例子使用 map() 并发的生成一组数据。task 中使用 time.sleep() 暂停一小会来展示生成数据的顺序无关并发任务的执行顺序,不过 map() 返回的结果却总以执行顺序有关。

futures_thread_pool_map.py

from concurrent import futures
import threading
import time

def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results))

map() 所返回的值实际上是一个特殊的迭代器,它被主程序迭代时会等待其中的并发任务返回,没返回就一直等待。

$ python3 futures_thread_pool_map.py

main: starting
ThreadPoolExecutor-0_0: sleeping 5
ThreadPoolExecutor-0_1: sleeping 4
main: unprocessed results <generator object
Executor.map.<locals>.result_iterator at 0x103e12780>
main: waiting for real results
ThreadPoolExecutor-0_1: done with 4
ThreadPoolExecutor-0_1: sleeping 3
ThreadPoolExecutor-0_0: done with 5
ThreadPoolExecutor-0_0: sleeping 2
ThreadPoolExecutor-0_0: done with 2
ThreadPoolExecutor-0_0: sleeping 1
ThreadPoolExecutor-0_1: done with 3
ThreadPoolExecutor-0_0: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]

执行独立任务

除了使用 map() ,我们还可以用 submit() 来执行单个任务 ,也是使用的 Future 实例等待任务结果的返回。

futures_thread_pool_submit.py

from concurrent import futures
import threading
import time

def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))

任务执行完成后,future 的状态就会改变,同时 result() 也就可以访问了。

$ python3 futures_thread_pool_submit.py

main: starting
ThreadPoolExecutor-0_0: sleeping 5
main: future: <Future at 0x1034e1ef0 state=running>
main: waiting for results
ThreadPoolExecutor-0_0: done with 5
main: result: 0.5
main: future after result: <Future at 0x1034e1ef0 state=finished
 returned float>

无序状态的任务

调用 Futureresult() 方法会阻塞到该任务完成(无论是返回了值还是抛出了异常)或者该任务取消。使用 map() 可以以原本的顺序访问多个任务的返回值。如果顺序无关紧要,也可以使用 as_completed() 在某任务执行完成后立即进行处理。

futures_as_completed.py

from concurrent import futures
import random
import time

def task(n):
    time.sleep(random.random())
    return (n, n / 10)

ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')

wait_for = [
    ex.submit(task, i)
    for i in range(5, 0, -1)
]

for f in futures.as_completed(wait_for):
    print('main: result: {}'.format(f.result()))

由于我们的线程池中有足够多的 worker,所以所有的任务都可以立即得到执行。它们的完成时间是随机的,所以每次运行例子 as_completed() 所生成的数据也是随机的。

$ python3 futures_as_completed.py

main: starting
main: result: (1, 0.1)
main: result: (5, 0.5)
main: result: (3, 0.3)
main: result: (2, 0.2)
main: result: (4, 0.4)

Future 回调

当某任务完成时,如果我们不想显式的等待其结果后再做下一步操作,我们可以使用 add_done_callback() 来添加一个函数让 Future 完成时自动调用它。这个回调函数应该是一个可调用对象,而且也只接受 Future 实例作为参数。

futures_future_callback.py

from concurrent import futures
import time

def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10

def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print('{}: error returned: {}'.format(
                fn.arg, error))
        else:
            result = fn.result()
            print('{}: value returned: {}'.format(
                fn.arg, result))

if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    f = ex.submit(task, 5)
    f.arg = 5
    f.add_done_callback(done)
    result = f.result()

无论如何回调函数都会被 Future 「完成」时调用,所以我们有必要检测下 Future 是以何种状态「完成」的,然后在做其他操作。

$ python3 futures_future_callback.py

main: starting
5: sleeping
5: done
5: value returned: 0.5

取消任务

每个 Future 都可以被取消,如果它只是被提交但还没开始,我们可以调用 cancel() 方法取消掉该 Future

futures_future_callback_cancel.py

from concurrent import futures
import time

def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10

def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        print('{}: not canceled'.format(fn.arg))

if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    tasks = []

    for i in range(10, 0, -1):
        print('main: submitting {}'.format(i))
        f = ex.submit(task, i)
        f.arg = i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, t in reversed(tasks):
        if not t.cancel():
            print('main: did not cancel {}'.format(i))

    ex.shutdown()

cancel() 方法返回一个布尔类型的值,表示该任务是否已被取消掉。

$ python3 futures_future_callback_cancel.py

main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled

任务中的异常

如果执行任务时抛出了一个未处理的异常,那该异常会存储在 Future 中,最终可以通过 result()exception() 来获取到它。

futures_future_exception.py

from concurrent import futures

def task(n):
    print('{}: starting'.format(n))
    raise ValueError('the value {} is no good'.format(n))

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)

error = f.exception()
print('main: error: {}'.format(error))

try:
    result = f.result()
except ValueError as e:
    print('main: saw error "{}" when accessing result'.format(e))

如果调用的是 result() ,那么发生在执行任务中的异常会在当前上下文中被再次抛出。

$ python3 futures_future_exception.py

main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result

上下文管理器

我们可以以上下文管理器的方式运行 Executor,它会自动等待所有任务完成。上下文管理器退出时,Executorshuntdown() 方法会被调用。

futures_context_manager.py

from concurrent import futures

def task(n):
    print(n)

with futures.ThreadPoolExecutor(max_workers=2) as ex:
    print('main: starting')
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)

print('main: done')

当线程或进程资源需要在执行结束时清理时,我们可以使用这种方法快速有效的书写。

$ python3 futures_context_manager.py

main: starting
1
2
3
4
main: done

进程池

ProcessPoolExecutor 的使用方式与 ThreadPoolExecutor 一样,只不过是用进程代替了线程。进程池可以有效利用多核 CPU 达到处理 CPU 密集型任务而无需因 CPtyhon 的全局解释器锁受到限制。

futures_process_pool_map.py

from concurrent import futures
import os

def task(n):
    return (n, os.getpid())

ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
    print('ran task {} in process {}'.format(n, pid))

如同线程池一样,每个独立的进程都能重复利用处理多个任务。

$ python3 futures_process_pool_map.py

ran task 5 in process 40854
ran task 4 in process 40854
ran task 3 in process 40854
ran task 2 in process 40854
ran task 1 in process 40854

如果在执行期间发生了意料之外的错误导致退出, ProcessPoolExecutor 就会被打断不会再执行其他任务。

futures_process_pool_broken.py

from concurrent import futures
import os
import signal

with futures.ProcessPoolExecutor(max_workers=2) as ex:
    print('getting the pid for one worker')
    f1 = ex.submit(os.getpid)
    pid1 = f1.result()

    print('killing process {}'.format(pid1))
    os.kill(pid1, signal.SIGHUP)

    print('submitting another task')
    f2 = ex.submit(os.getpid)
    try:
        pid2 = f2.result()
    except futures.process.BrokenProcessPool as e:
        print('could not start new tasks: {}'.format(e))

BrokenProcessPool 异常在处理结果时会抛出,而不是新任务提交时。

$ python3 futures_process_pool_broken.py

getting the pid for one worker
killing process 40858
submitting another task
could not start new tasks: A process in the process pool was
terminated abruptly while the future was running or pending.

参考

本文章首发在 LearnKu.com 网站上。
上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~