深入理解 python 异步 i/o 库 —— asyncio
python 的 asyncio 库以协程为基础,event_loop 作为协程的驱动和调度模型。该模型是一个单线程的异步模型,类似于 node.js。下图我所理解的该模型
事件循环通过 select () 来监听是否存在就绪的事件,如果存在就把事件对应的 callback 添加到一个 task list 中。然后从 task list 头部中取出一个 task 执行。在单线程中不断的注册事件,执行事件,从而实现了我们的 event_loop 模型。
event_loop 中执行的 task 并不是函数
如果我们把上图当成一个 web 服务器,左边的一个 task 当成一次 http 请求需要执行的完整任务。如果我们每一次 run_task () 都执行完一个完整的任务,再去 run 下一个 task。 那这跟普通的串行服务器并没有区别。在并发环境下造成的用户体验非常差。
具体有多差你可以脑补一下,毕竟我们现在是使用单线程方式实现的 web 服务器
所以 task 如果对应一个完整的 http 请求那么其不可能是一个函数,因为函数需要从头执行到尾占用着整个线程。那你觉得 task 是什么呢?
如果你不知道答案的话可以看一看我的另一篇文章 简述 python 的 yield 和 yield from
没错,task 是一个 generator,或者可以叫做可中断的函数。task 的代码依旧是从上写到下来处理一个 http 请求。也就是我们所说的同步的代码组织。
但是有所不同的是,在 task 中,我们遇到 i/o 操作时,我们就把 i/o 操作交给 selector(稍后我们解析一下 selector,并且把该 i/o 操作准备完毕后需要执行的回调也告诉 selector。然后我们使用 yield 保存并中断该函数。
此时线程的控制权回到 event_loop 手中。event_loop 首先看一下 selector 中是否存在就绪的数据,存在的话就把对应的回调放到 task list 的尾部(如图),然后从头部继续 run_task ()。
你可能想问上面中断的 task 什么时候才能继续执行呢?我前一句说过了,event_loop 每一次循环都会检测 selector 中是否存在就绪的 i/o 操作,如果存在就绪的 i/o 操作,我们对应就把 callback 放到 task 的尾部,当 event_loop 执行到这个 task 时。我们就能回到我们刚刚中断的函数继续执行啦,而且此时我们需要的 i/o 操作得到的数据也已经准备好了。
这种操作如果你站在函数的角度会有种神奇的感觉,在函数眼里,自己需要 get 遥远服务器的一些数据,于是调动 get (),然后瞬间就得到了遥远服务器的数据。没错在函数的眼里就是瞬间得到,这感觉就仿佛是穿越到了未来一样。
你可能又想问,为什么把 callback 放到 task,然后 run 一下就回到原有的函数执行位置了?
这我也不知道,我并没有深追 asyncio 的代码,这对于我来说有些复杂。但如果是我的话,我只要在 callback 中设置一个变量 gen 指向我们的 generator 就行了,然后只要在 callback 中 gen.send(res_data)
,我们就能回到中断处继续执行了。如果你有兴趣的话可以自己使用 debug 来追一下代码。
不过我更推荐你阅读一下这篇博文 深入理解 Python 异步编程 (上)
这里还有几个问题。#
比如我们在 task 中需要执行一个 1+2+3 + 到 2000 万这样一个操作,这个操作耗时有些长,而且不属于 i/o 操作,没法交给 selector 去调度,此时我们需要自己 yield,让其他的 task 能有机会来使用我们唯一的线程。这样就又有一个新的问题。yield 后,我们什么时候再次来执行这个被中断的函数呢?
问题代码示例
import asyncio
def print_sum():
sum = 0
count = 0
for a in range(20000000):
sum += a
count += 1
if count > 1000000:
count = 0
yield
print('1+到2000万的和是{}'.format(sum))
@asyncio.coroutine
def init():
yield from print_sum()
loop = asyncio.get_event_loop()
loop.run_until_complete(init())
loop.run_forever()
我想我们可以这样,把这个中断的 task 直接加入到 task list 的尾部,然后继续 event_loop,这样让其他 task 有机会执行,并且处理起来更加的简单。 asyncio 库也确实是这样做的。
但是 asyncio 还提供了更好的做法,我们可以再启动一个线程来执行这种 cpu 密集型运算
再来看看另外一个问题。如果在一个凌晨三点半,你 task list 此时是空的,那么你的 event_loop 怎么运作?继续不停的 loop 等待新的 http 请求进来? no,我们不允许如此浪费 cpu 的资源。asyncio 库也不允许。
首先看两行 event_loop 中的代码片段,也就是上图中右上角部分的 select (timeout) 部分
event_list = self._selector.select(timeout)
self._process_events(event_list)
补充一点,作为一台 web 服务器,我们总是需要 socket ()、bind ()、listen ()、来创建一个监听描述符 sockfd,用来监听到来的 http 请求,与 http 请求完成三路握手。然后通过 accept () 操作来得到一个已连接描述符 connectfd。
这里的两个文件描述符,此时都存在于我们的系统中,其中 sockfd 继续用来执行监听 http 请求操作。已经连接了的客户端我们则通过 connectfd 来与其通信。一般都是一个 sockfd 对多个 connectfd。
更多的细节推荐阅读 ——《unix 网络编程卷一》中的关于 socket 编程的几章
asyncio 对于网络 i/o 使用了 selector 模块,selector 模块的底层则是由 epoll () 来实现。也就是一个同步的 i/o 复用系统调用(你定会惊讶于 asyncio 的竟然使用了同步 i/o 来实现?我们在下一节来解读一下 epoll 函数)
这里你可以去读一下 python 手册中的 selector 模块,看看这个模块的作用
epoll () 函数有个 timeout 参数,用来控制该函数是否阻塞,阻塞多久。映射到高层就是我们上面的 selector.select(timeout)
中的 timeout。原来我们的 event_loop 中的存在一个 timeout。这样凌晨三点半我们如何处理 event_loop 我想你已经心里有数了吧。
asyncio 的实现和你想的差不多。如果 task list is not None 那么我们的 timeout=0 也就是非阻塞的。解释一下就是,我们调用 selector.select (timeout = 0),该函数会马上返回结果,我们对结果做一个上面讲过的处理,也就是 self._process_events(event_list)
。然后我们继续 run task。
如果我们的 task list is None, 那么我们则把 timeout=None。也就是设置成阻塞操作。此时我们的代码或者说线程会阻塞在 selector.select (timeout = 0) 处,换句话说就是等待该函数的返回。当然这样做的前提是,你往 selector 中注册了需要等待的 socket 描述符。
还有一些其他的问题,比如异步 mysql 是如何在 asyncio 的基础上实现的,这可能需要去阅读 aiomysql 库了。
你也许发现,我们一旦使用了 event_loop 实现单线程异步服务器,我们写的所有代码就都不是我们来控制执行了,代码的执行权全部交给了 event_loop,event_loop 在适当的时间 run task。读过廖雪峰 python 教程的小伙伴一定看过这句话
这就是异步编程的一个原则:一旦决定使用异步,则系统每一层都必须是异步,“开弓没有回头箭”。
这就是异步编程。
你也许对 asyncio 的作用,或者使用,或者代码实现有着很多的疑问,我也是如此。但是很抱歉,我并不怎么熟悉 python,也没有使用 asyncio 做过项目,只是出于好奇所以我对 python 的异步 i/o 进行了一个了解。
我是一个纸上谈兵的门外汉,到最后我也没能看清 asyncio 库的具体实现。我接下来的计划中并不打算对 asyncio 库进行更多的研究,但是我又不甘心这两天对 asyncio 库的研究付诸东流。所以我留下这篇博文,算是对自己的一个交待!希望下次能够有机会,能够更加了解 python 和 asyncio 的前提下,再写一篇深入解析 python—asyncio 的博文。
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: