python 协程用法总结(一)

一、python协程关键字

async: 声明协程

注意:通过async声明的不是函数,单独调用不会执行

await: 用来等待可等待的对象,包括协程(就是async声明的协程)、任务(asyncio.create_task()、asyncio.ensure_future()创建的任务)和Future(是一个低层级的可等待对象,表示一个异步操作的 最终结果,一般不要自己创建)

注意:当前协程会再await处等待阻塞(回去执行其他协程),知道等待的协程或任务完成

import asyncio

async def main(input):
    print('hello')
    await asyncio.sleep(1)
    print('world')
asyncio.run(main())

二、可等待对象

可等待 对象有三种主要类型: 协程, 任务 和 Future,可以用关键字await等待事件完成

协程

  • 协程函数: 定义形式为 async def 的函数;
  • 协程对象: 调用 协程函数 所返回的对象。

任务

  • 通过asyncio.create_task()将协程函数创建任务,该协程将自动排入日程准备立即运行
    注意: python >= 3.7
  • python3.6 使用asyncio.ensure_future()创建任务

Future

  • 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。
  • 当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
  • 一般没有必要在应用层级的代码中创建 Future 对象

可以使用一下方法

  • asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

    1. 如果 coros_or_futures* 中的某个可等待对象为协程,它将自动作为一个任务加入日程
    2. 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
    3. 如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。 如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。
    4. 如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。
      如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 – 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。
  • asyncio.shield(arg, *, loop=None)

    1. 保护一个 可等待对象 防止其被 取消。
    2. arg是一个协程,它将自动作为任务加入日程。
  • asyncio.wait_for(fut, timeout, *, loop=None)

    1. 用来等待一个future设置timeout,timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。
    2. 如果发生超时,任务将取消并引发 asyncio.TimeoutError.
    3. 要避免任务 取消,可以加上 shield()。
  • asyncio.wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

    import asyncio
    
    async def wait1():
        print('in 1')
        await asyncio.sleep(1)
        return 1
    
    async def wait2():
        print('in 2')
        await asyncio.sleep(2)
        return 2
  async def wait3():
      print('in 3')
      await asyncio.sleep(3)
      return 3


  async def main():
      done, pending = await asyncio.wait([wait1(), wait2(), wait3()], timeout=2.5)
      print(done,'\n', pending)

  asyncio.run(main())

结果
in 1
in 3
in 2
{<Task finished coro=<wait1() done, defined at /Users/zhangjintao/Desktop/随笔/小测试代码/多任务/协程/3.7/test.py:3> result=1>, <Task finished coro=<wait2() done, defined at /Users/zhangjintao/Desktop/随笔/小测试代码/多任务/协程/3.7/test.py:8> result=2>}
{<Task pending coro=<wait3() running at /Users/zhangjintao/Desktop/随笔/小测试代码/多任务/协程/3.7/test.py:16> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10acc2650>()]>>}

  1. timeout 超时不会引发异常,超时会返回未完成的任务的状态, done是完成的任务set,pending是未完成的任务set
  2. 如果timeout不设置,会默认所有完成后返回结果,pending返回的是空set

三、再线程池中执行代码

loop.run_in_executor`(executor, func, *args)

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

详见:https://docs.python.org/zh-cn/3.7/library/...

本作品采用《CC 协议》,转载必须注明作者和本文链接
Sprint_dV
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!