python 协程用法总结(一)
一、python协程关键字
async: 声明协程
注意:通过async声明的不是函数,单独调用不会执行
await: 用来等待可等待的对象,包括协程(就是async声明的协程)、任务(asyncio.create_task()、asyncio.ensure_future()创建的任务)和Future(是一个低层级的可等待对象,表示一个异步操作的 最终结果,一般不要自己创建)
注意:当前协程会再await处等待阻塞(回去执行其他协程),知道等待的协程或任务完成
import asyncio
async def main(input):
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
二、可等待对象
可等待 对象有三种主要类型: 协程, 任务 和 Future,可以用关键字await等待事件完成
协程
- 协程函数: 定义形式为 async def 的函数;
- 协程对象: 调用 协程函数 所返回的对象。
任务
- 通过asyncio.create_task()将协程函数创建任务,该协程将自动排入日程准备立即运行
注意: python >= 3.7 - python3.6 使用asyncio.ensure_future()创建任务
Future
- 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。
- 当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
- 一般没有必要在应用层级的代码中创建 Future 对象
可以使用一下方法
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
- 如果 coros_or_futures* 中的某个可等待对象为协程,它将自动作为一个任务加入日程
- 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
- 如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。 如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。
- 如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。
如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 – 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。
asyncio.shield(arg, *, loop=None)
- 保护一个 可等待对象 防止其被 取消。
- arg是一个协程,它将自动作为任务加入日程。
asyncio.wait_for(fut, timeout, *, loop=None)
- 用来等待一个future设置timeout,timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。
- 如果发生超时,任务将取消并引发 asyncio.TimeoutError.
- 要避免任务 取消,可以加上 shield()。
asyncio.wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
import asyncio async def wait1(): print('in 1') await asyncio.sleep(1) return 1 async def wait2(): print('in 2') await asyncio.sleep(2) return 2
async def wait3():
print('in 3')
await asyncio.sleep(3)
return 3
async def main():
done, pending = await asyncio.wait([wait1(), wait2(), wait3()], timeout=2.5)
print(done,'\n', pending)
asyncio.run(main())
结果
in 1
in 3
in 2
{<Task finished coro=<wait1() done, defined at /Users/zhangjintao/Desktop/随笔/小测试代码/多任务/协程/3.7/test.py:3> result=1>, <Task finished coro=<wait2() done, defined at /Users/zhangjintao/Desktop/随笔/小测试代码/多任务/协程/3.7/test.py:8> result=2>}
{<Task pending coro=<wait3() running at /Users/zhangjintao/Desktop/随笔/小测试代码/多任务/协程/3.7/test.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10acc2650>()]>>}
- timeout 超时不会引发异常,超时会返回未完成的任务的状态, done是完成的任务set,pending是未完成的任务set
- 如果timeout不设置,会默认所有完成后返回结果,pending返回的是空set
三、再线程池中执行代码
loop.run_in_executor`(executor, func, *args)
import asyncio
import concurrent.futures
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
## Options:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
详见:https://docs.python.org/zh-cn/3.7/library/...
本作品采用《CC 协议》,转载必须注明作者和本文链接