12.4. selectors — I/O 多路复用抽象层

未匹配的标注

目的: 基于 select 模块为 I/O 多路复用提供与平台无关的抽象。

selectors 模块在基于 [select](pymotw.com/3/select/index.html#mod... 「select : 更高效的等待 I/O」) 里面平台特定的方法之上提供了一个平台无关的抽象层。

运行模式

selectors 里面的API是事件驱动的, 类似于 select 里面的 poll() 方法。 有几种实现方式,并且这个模块可以自动根据当前操作系统的配置设置一个别名为 DefaultSelector 来引用最有效率的一个方式。

选择器对象提供了一个用来指定 socket 上监听事件的方法, 然后让调用方以平台无关的方式等待事件。注册感兴趣的事件会创建一个 SelectorKey 来 保存 socket ,感兴趣的事件的信息和可选的应用程序信息。选择器的所有者通过调用 select() 方法来了解事件进度。该方法的返回值是一系列的 key 对象和标识发生事件种类的位掩码。使用选择器的程序需要不断的调用 select() 方法去及时的处理事件。

服务器响应

下面的服务器响应例子使用 SelectorKey 里面的应用程序数据去注册一个在新事件发生时调用的回调函数。 主循环通过 key 获取回调函数,并且将 socket 和 事件掩码传递给回调函数,当服务器启动时, 它会在主服务器 socket 上 发生读事件时调用注册的 accept() 方法。接受客户端发来的请求之后服务器会建立一个新的 socket ,在新的socket 将 read() 函数注册为读事件的回调函数。

selectors_echo_server.py

import selectors
import socket

mysel = selectors.DefaultSelector()
keep_running = True

def read(connection, mask):
    "读取事件的回调"
    global keep_running

    client_address = connection.getpeername()
    print('read({})'.format(client_address))
    data = connection.recv(1024)
    if data:
        # 可读的客户端 socket 有数据
        print('  received {!r}'.format(data))
        connection.sendall(data)
    else:
        # 将空结果解释为关闭连接
        print('  closing')
        mysel.unregister(connection)
        connection.close()
        # 告诉主进程停止
        keep_running = False

def accept(sock, mask):
    "有新连接的回调"
    new_connection, addr = sock.accept()
    print('accept({})'.format(addr))
    new_connection.setblocking(False)
    mysel.register(new_connection, selectors.EVENT_READ, read)

server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address))
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.bind(server_address)
server.listen(5)

mysel.register(server, selectors.EVENT_READ, accept)

while keep_running:
    print('waiting for I/O')
    for key, mask in mysel.select(timeout=1):
        callback = key.data
        callback(key.fileobj, mask)

print('shutting down')
mysel.close()

read() 从 socket处取不到数据的时候, 会被解释成另一端终端被关闭而不是正在发送数据。它会将 socket 关闭并从选择器里面移除。为了避免无限循环,该服务器在完成与单个客户端的通信之后也会自己关闭。

客户端响应

下面的客户端响应的例子中, 在主循环中处理所有的 I/O 事件,它将选择器设置为当 socket 返回读事件时和 socket 返回可发送事件时发报告, 而不是使用回调函数。因为它同时监控了两个事件,所以客户端必须通过检查掩码值来检测发生的事件。当所有数据都发送完毕的时候,它会将选择器配置为只当有数据需要读取时才发报告。

selectors_echo_client.py

import selectors
import socket

mysel = selectors.DefaultSelector()
keep_running = True
outgoing = [
    b'It will be repeated.',
    b'This is the message.  ',
]
bytes_sent = 0
bytes_received = 0

# 连接是一个阻塞操作, 因此在返回之后调用 setblocking() 方法
server_address = ('localhost', 10000)
print('connecting to {} port {}'.format(*server_address))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address)
sock.setblocking(False)

# 设置选择器去监听 socket 是否可写的和是否可读
mysel.register(
    sock,
    selectors.EVENT_READ | selectors.EVENT_WRITE,
)

while keep_running:
    print('waiting for I/O')
    for key, mask in mysel.select(timeout=1):
        connection = key.fileobj
        client_address = connection.getpeername()
        print('client({})'.format(client_address))

        if mask & selectors.EVENT_READ:
            print('  ready to read')
            data = connection.recv(1024)
            if data:
                # A readable client socket has data
                print('  received {!r}'.format(data))
                bytes_received += len(data)

            # 当返回结果为空或者收到了我们发送的数据就将连接关闭
            keep_running = not (
                data or
                (bytes_received and
                 (bytes_received == bytes_sent))
            )

        if mask & selectors.EVENT_WRITE:
            print('  ready to write')
            if not outgoing:
                # 我们的消息为空了,这意味着我们再也不必要写数据,
                                # 所以将我们的配置更改为从服务器读取
                print('  switching to read-only')
                mysel.modify(sock, selectors.EVENT_READ)
            else:
                # Send the next message.
                next_msg = outgoing.pop()
                print('  sending {!r}'.format(next_msg))
                sock.sendall(next_msg)
                bytes_sent += len(next_msg)

print('shutting down')
mysel.unregister(connection)
connection.close()
mysel.close()

客户端会追踪它发送的数量和它接收的数量,当这两个的值匹配并且不等于零的时候, 客户端会退出处理循环并且通过从选择器里面移除 socket ,然后关闭 socket 和选择器的方式来安全的关闭连接。

客户端和服务端

客户端和服务端应该运行在分开的终端窗口, 这样他们可以互相通信。服务端的输出显示传入的连接和数据, 以及发送回客户端的响应。

$ python3 source/selectors/selectors_echo_server.py
starting up on localhost port 10000
waiting for I/O
waiting for I/O
accept(('127.0.0.1', 59850))
waiting for I/O
read(('127.0.0.1', 59850))
  received b'This is the message.  It will be repeated.'
waiting for I/O
read(('127.0.0.1', 59850))
  closing
shutting down

客户端输出显示从客户端传出的信息和服务端返回的响应。

$ python3 source/selectors/selectors_echo_client.py
connecting to localhost port 10000
waiting for I/O
client(('127.0.0.1', 10000))
  ready to write
  sending b'This is the message.  '
waiting for I/O
client(('127.0.0.1', 10000))
  ready to write
  sending b'It will be repeated.'
waiting for I/O
client(('127.0.0.1', 10000))
  ready to write
  switching to read-only
waiting for I/O
client(('127.0.0.1', 10000))
  ready to read
  received b'This is the message.  It will be repeated.'
shutting down

扩展阅读

本文章首发在 LearnKu.com 网站上。

本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
上一篇 下一篇
Summer