12.4. selectors — I/O 多路复用抽象层
目的: 基于 select 模块为 I/O 多路复用提供与平台无关的抽象。
selectors
模块在基于 [select
](pymotw.com/3/select/index.html#mod... 「select : 更高效的等待 I/O」) 里面平台特定的方法之上提供了一个平台无关的抽象层。
运行模式
selectors
里面的API是事件驱动的, 类似于 select
里面的 poll()
方法。 有几种实现方式,并且这个模块可以自动根据当前操作系统的配置设置一个别名为 DefaultSelector
来引用最有效率的一个方式。
选择器对象提供了一个用来指定 socket 上监听事件的方法, 然后让调用方以平台无关的方式等待事件。注册感兴趣的事件会创建一个 SelectorKey
来 保存 socket ,感兴趣的事件的信息和可选的应用程序信息。选择器的所有者通过调用 select()
方法来了解事件进度。该方法的返回值是一系列的 key 对象和标识发生事件种类的位掩码。使用选择器的程序需要不断的调用 select()
方法去及时的处理事件。
服务器响应
下面的服务器响应例子使用 SelectorKey
里面的应用程序数据去注册一个在新事件发生时调用的回调函数。 主循环通过 key 获取回调函数,并且将 socket 和 事件掩码传递给回调函数,当服务器启动时, 它会在主服务器 socket 上 发生读事件时调用注册的 accept()
方法。接受客户端发来的请求之后服务器会建立一个新的 socket ,在新的socket 将 read()
函数注册为读事件的回调函数。
selectors_echo_server.py
import selectors
import socket
mysel = selectors.DefaultSelector()
keep_running = True
def read(connection, mask):
"读取事件的回调"
global keep_running
client_address = connection.getpeername()
print('read({})'.format(client_address))
data = connection.recv(1024)
if data:
# 可读的客户端 socket 有数据
print(' received {!r}'.format(data))
connection.sendall(data)
else:
# 将空结果解释为关闭连接
print(' closing')
mysel.unregister(connection)
connection.close()
# 告诉主进程停止
keep_running = False
def accept(sock, mask):
"有新连接的回调"
new_connection, addr = sock.accept()
print('accept({})'.format(addr))
new_connection.setblocking(False)
mysel.register(new_connection, selectors.EVENT_READ, read)
server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address))
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.bind(server_address)
server.listen(5)
mysel.register(server, selectors.EVENT_READ, accept)
while keep_running:
print('waiting for I/O')
for key, mask in mysel.select(timeout=1):
callback = key.data
callback(key.fileobj, mask)
print('shutting down')
mysel.close()
当 read()
从 socket处取不到数据的时候, 会被解释成另一端终端被关闭而不是正在发送数据。它会将 socket 关闭并从选择器里面移除。为了避免无限循环,该服务器在完成与单个客户端的通信之后也会自己关闭。
客户端响应
下面的客户端响应的例子中, 在主循环中处理所有的 I/O 事件,它将选择器设置为当 socket 返回读事件时和 socket 返回可发送事件时发报告, 而不是使用回调函数。因为它同时监控了两个事件,所以客户端必须通过检查掩码值来检测发生的事件。当所有数据都发送完毕的时候,它会将选择器配置为只当有数据需要读取时才发报告。
selectors_echo_client.py
import selectors
import socket
mysel = selectors.DefaultSelector()
keep_running = True
outgoing = [
b'It will be repeated.',
b'This is the message. ',
]
bytes_sent = 0
bytes_received = 0
# 连接是一个阻塞操作, 因此在返回之后调用 setblocking() 方法
server_address = ('localhost', 10000)
print('connecting to {} port {}'.format(*server_address))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address)
sock.setblocking(False)
# 设置选择器去监听 socket 是否可写的和是否可读
mysel.register(
sock,
selectors.EVENT_READ | selectors.EVENT_WRITE,
)
while keep_running:
print('waiting for I/O')
for key, mask in mysel.select(timeout=1):
connection = key.fileobj
client_address = connection.getpeername()
print('client({})'.format(client_address))
if mask & selectors.EVENT_READ:
print(' ready to read')
data = connection.recv(1024)
if data:
# A readable client socket has data
print(' received {!r}'.format(data))
bytes_received += len(data)
# 当返回结果为空或者收到了我们发送的数据就将连接关闭
keep_running = not (
data or
(bytes_received and
(bytes_received == bytes_sent))
)
if mask & selectors.EVENT_WRITE:
print(' ready to write')
if not outgoing:
# 我们的消息为空了,这意味着我们再也不必要写数据,
# 所以将我们的配置更改为从服务器读取
print(' switching to read-only')
mysel.modify(sock, selectors.EVENT_READ)
else:
# Send the next message.
next_msg = outgoing.pop()
print(' sending {!r}'.format(next_msg))
sock.sendall(next_msg)
bytes_sent += len(next_msg)
print('shutting down')
mysel.unregister(connection)
connection.close()
mysel.close()
客户端会追踪它发送的数量和它接收的数量,当这两个的值匹配并且不等于零的时候, 客户端会退出处理循环并且通过从选择器里面移除 socket ,然后关闭 socket 和选择器的方式来安全的关闭连接。
客户端和服务端
客户端和服务端应该运行在分开的终端窗口, 这样他们可以互相通信。服务端的输出显示传入的连接和数据, 以及发送回客户端的响应。
$ python3 source/selectors/selectors_echo_server.py
starting up on localhost port 10000
waiting for I/O
waiting for I/O
accept(('127.0.0.1', 59850))
waiting for I/O
read(('127.0.0.1', 59850))
received b'This is the message. It will be repeated.'
waiting for I/O
read(('127.0.0.1', 59850))
closing
shutting down
客户端输出显示从客户端传出的信息和服务端返回的响应。
$ python3 source/selectors/selectors_echo_client.py
connecting to localhost port 10000
waiting for I/O
client(('127.0.0.1', 10000))
ready to write
sending b'This is the message. '
waiting for I/O
client(('127.0.0.1', 10000))
ready to write
sending b'It will be repeated.'
waiting for I/O
client(('127.0.0.1', 10000))
ready to write
switching to read-only
waiting for I/O
client(('127.0.0.1', 10000))
ready to read
received b'This is the message. It will be repeated.'
shutting down
扩展阅读
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。