7.11. codecs — 字符编码和解码

未匹配的标注

目的:文本不同表现形式之间转换的编解码器。

codecs 模块提供了流和文件接口用于转换数据。它通常用于处理 Unicode 文本,但是用于其他的目的的编码格式也是可以处理的。

Unicode 入门#

CPython 3.x 版本区分文本和字节。bytes 实例使用一系列 8 位字节值。相反,str 字符串内部被作为一系列 Unicode 码点管理。每个码点被存储为 2-4 字节的序列,这个取决于 Python 编译时给出的选项。

当输出 str 值时,它使用几种标准方案之一进行编码,以便稍后字节序列可以被重建为相同的文本字符串。编码值的字节不一定与码点相同,并且编码定义了两组值之间转换的方式。读取 Unicode 数据还需要知道编码,以便传入的字节可以转换为 Unicode 类使用的内部表示。

西方语言最常见的编码是 UTF-8UTF-16 ,它们分别使用一个或者两个字节序列表示每个代码点。对于大多数字符用于两个字节的码点存储可能不太适合,所以其他的编码可能更高效。

推荐阅读#

有关 Unicode 的更多介绍性信息,请参阅本节末尾的参考列表。Python Unicode HOWTO 是特别有帮助的。

编码#

理解编码的最好方式是去查看用不同方式对相同字符串编码产生的字节序列。下面的例子用这个函数格式化字节字符串以更好地阅读。

codecs_to_hex.py

import binascii

def to_hex(t, nbytes):
    """Format text t as a sequence of nbyte long values
    separated by spaces.
    """
    chars_per_item = nbytes * 2
    hex_version = binascii.hexlify(t)
    return b' '.join(
        hex_version[start:start + chars_per_item]
        for start in range(0, len(hex_version), chars_per_item)
    )

if __name__ == '__main__':
    print(to_hex(b'abcdef', 1))
    print(to_hex(b'abcdef', 2))

这个函数使用 binascii 得到字符串的 16 进制表示,然后在返回之前每隔 nbytes 插入一个空格。

$ python3 codecs_to_hex.py

b'61 62 63 64 65 66'
b'6162 6364 6566'

第一个编码示例首先使用 unicode 类的原始表示法打印文 français,然后从 Unicode 数据库中输出每个字符的名称。接下来两行分别将字符串编码为 UTF-8UTF-16,并将编码结果显示为 16 进制。

codecs_encodings.py

import unicodedata
from codecs_to_hex import to_hex

text = 'français'

print('Raw   : {!r}'.format(text))
for c in text:
    print('  {!r}: {}'.format(c, unicodedata.name(c, c)))
print('UTF-8 : {!r}'.format(to_hex(text.encode('utf-8'), 1)))
print('UTF-16: {!r}'.format(to_hex(text.encode('utf-16'), 2)))

str 的编码结果是一个 bytes 对象。

$ python3 codecs_encodings.py

Raw   : 'français'
  'f': LATIN SMALL LETTER F
  'r': LATIN SMALL LETTER R
  'a': LATIN SMALL LETTER A
  'n': LATIN SMALL LETTER N
  'ç': LATIN SMALL LETTER C WITH CEDILLA
  'a': LATIN SMALL LETTER A
  'i': LATIN SMALL LETTER I
  's': LATIN SMALL LETTER S
UTF-8 : b'66 72 61 6e c3 a7 61 69 73'
UTF-16: b'fffe 6600 7200 6100 6e00 e700 6100 6900 7300'

给一个编码字节序列作为 bytes 实例,decode() 方法将会把它们翻译为 Unicode 码点并返回一个 str 实例。

codecs_decode.py

from codecs_to_hex import to_hex

text = 'français'
encoded = text.encode('utf-8')
decoded = encoded.decode('utf-8')

print('Original :', repr(text))
print('Encoded  :', to_hex(encoded, 1), type(encoded))
print('Decoded  :', repr(decoded), type(decoded))

编码方式不会改变输出类型。

$ python3 codecs_decode.py

Original : 'français'
Encoded  : b'66 72 61 6e c3 a7 61 69 73' <class 'bytes'>
Decoded  : 'français' <class 'str'>

注意#

默认的编码方式是在程序启动时设置,当 site 加载时。有关默认编码的讨论请参考 sys 关于 Unicode Defaults 的部分。

处理文件#

当处理 I / O 操作的时候,编解码字符串是特别重要的。当写入文件, socket 或者其它流的时候,数据必须被进行恰当的编码。一般而言,所有的文本数据当它被读的时候需要从字节形式解码,写入的时候必须从内部值转换为特定的表示。程序可以显示地编码和解码数据,但根据所使用的编码,确定是否读取了足够的字节以完全解码数据可能并不容易。好在 codecs 模块提供了管理数据编解码的类,因此程序中一般不需要那么做。

codecs 提供的简单接口可以用来替换内建的 open() 方法。新版本工作方式类似内建方法,但是添加了两个参数来指定编码以及错误处理技术。

codecs_open_write.py

from codecs_to_hex import to_hex

import codecs
import sys

encoding = sys.argv[1]
filename = encoding + '.txt'

print('Writing to', filename)
with codecs.open(filename, mode='w', encoding=encoding) as f:
    f.write('français')

# 定义用于 to_hex() 的字节分组
nbytes = {
    'utf-8': 1,
    'utf-16': 2,
    'utf-32': 4,
}.get(encoding, 1)

# 显示文件原生字节
print('File contents:')
with open(filename, mode='rb') as f:
    print(to_hex(f.read(), nbytes))

这个例子以带有 「ç」的 unicode 字符串开头,并使用在命令行上指定的编码将文本保存到文件。

$ python3 codecs_open_write.py utf-8

Writing to utf-8.txt
File contents:
b'66 72 61 6e c3 a7 61 69 73'

$ python3 codecs_open_write.py utf-16

Writing to utf-16.txt
File contents:
b'fffe 6600 7200 6100 6e00 e700 6100 6900 7300'

$ python3 codecs_open_write.py utf-32

Writing to utf-32.txt
File contents:
b'fffe0000 66000000 72000000 61000000 6e000000 e7000000 61000000
69000000 73000000'

使用 open() 读取数据很简单,只需要一个编码:编码必须实现直到,才能正确设置解码器。某些数据格式将编码,例如:XML,指定为文件的一部分,但通常有应用程序管理。codecs 简单地将编码作为参数并假设它是正确的。

codecs_open_read.py

import codecs
import sys

encoding = sys.argv[1]
filename = encoding + '.txt'

print('Reading from', filename)
with codecs.open(filename, mode='r', encoding=encoding) as f:
    print(repr(f.read()))

这个例子读取先前程序创建的文件,并将生成的 unicode 对象的表示形式打印到控制台。

$ python3 codecs_open_read.py utf-8

Reading from utf-8.txt
'français'

$ python3 codecs_open_read.py utf-16

Reading from utf-16.txt
'français'

$ python3 codecs_open_read.py utf-32

Reading from utf-32.txt
'français'

字节顺序#

在不同计算机系统之间传输数据的时候, UTF-16 和 UTF-32 等多字节编码可能会造成问题,可以通过直接复制或者网络通信复制文件。不同系统使用不同的高低字节排序。数据的这种特性(称为其字节序)取决于诸如操作系统和应用程序开发人员所做的硬件体系结构和选择等因素。事先并不总是知道给定数据使用什么字节顺序,因此多字节编码包括字节顺序标记(BOM)作为编码的输出的前几个字节。例如,UTF-16 定义 0xFFFE 和 0xFEFF 不是有效字符,可以被用于去标识字节顺序。codecs 定义了表示 UTF-16 和 UTF-32 字节顺序的常量。

codecs_bom.py

import codecs
from codecs_to_hex import to_hex

BOM_TYPES = [
    'BOM', 'BOM_BE', 'BOM_LE',
    'BOM_UTF8',
    'BOM_UTF16', 'BOM_UTF16_BE', 'BOM_UTF16_LE',
    'BOM_UTF32', 'BOM_UTF32_BE', 'BOM_UTF32_LE',
]

for name in BOM_TYPES:
    print('{:12} : {}'.format(
        name, to_hex(getattr(codecs, name), 2)))

根据当前系统原生字节顺序,BOMBOM_UTF16 和  BOM_UTF32 自动设置为大端值或者小端值。

$ python3 codecs_bom.py

BOM          : b'fffe'
BOM_BE       : b'feff'
BOM_LE       : b'fffe'
BOM_UTF8     : b'efbb bf'
BOM_UTF16    : b'fffe'
BOM_UTF16_BE : b'feff'
BOM_UTF16_LE : b'fffe'
BOM_UTF32    : b'fffe 0000'
BOM_UTF32_BE : b'0000 feff'
BOM_UTF32_LE : b'fffe 0000'

字节顺序可以被 codecs 中的解码器自动地检测和处理,但是当解码的时候可以显示声明一个字节顺序。

codecs_bom_create_file.py

import codecs
from codecs_to_hex import to_hex

# 选择 UTF-16 编码的非本地编码
if codecs.BOM_UTF16 == codecs.BOM_UTF16_BE:
    bom = codecs.BOM_UTF16_LE
    encoding = 'utf_16_le'
else:
    bom = codecs.BOM_UTF16_BE
    encoding = 'utf_16_be'

print('Native order  :', to_hex(codecs.BOM_UTF16, 2))
print('Selected order:', to_hex(bom, 2))

# 编码数据
encoded_text = 'français'.encode(encoding)
print('{:14}: {}'.format(encoding, to_hex(encoded_text, 2)))

with open('nonnative-encoded.txt', mode='wb') as f:
    # 写入字节顺序标记,它没有包含在编码文本中
        # ,因为选择编码的时候字节顺序被显示给定了。
    f.write(bom)
    # 写入编码文本的字节字符串
    f.write(encoded_text)

codecs_bom_create_file.py 检测出本地系统字节顺序,然后使用替代形式,以便下一个示例可以自动地检测出来。

$ python3 codecs_bom_create_file.py

Native order  : b'fffe'
Selected order: b'feff'
utf_16_be     : b'0066 0072 0061 006e 00e7 0061 0069 0073'

codecs_bom_detection.py 打开文件的时候没有声明编码顺序,所以解码器使用文件前两个字节的 BOM 值决定它。

codecs_bom_detection.py

import codecs
from codecs_to_hex import to_hex

# 查看原生数据
with open('nonnative-encoded.txt', mode='rb') as f:
    raw_bytes = f.read()

print('Raw    :', to_hex(raw_bytes, 2))

# 重新打开文件,并且让 codecs 检测 BOM
with codecs.open('nonnative-encoded.txt',
                 mode='r',
                 encoding='utf-16',
                 ) as f:
    decoded_text = f.read()

print('Decoded:', repr(decoded_text))

因为文件的前两个字节用于字节顺序检测,所以他们没有包含在 read() 方法的返回值中。

$ python3 codecs_bom_detection.py

Raw    : b'feff 0066 0072 0061 006e 00e7 0061 0069 0073'
Decoded: 'français'

错误处理#

前面的部分指出读取 Unicode 文件的时候需要知道编码。正确设置编码非常重要,原因有俩。如果在读取文件的时候编码设置的不正确,数据则将被解释为错误,或者解码失败。并非所有的 Unicode 字符都可以被所有编码表示,所以如果写入数据的时候使用了错误的编码将会引发错误,数据也可能丢失。

codecs 使用了 5 个错误处理选项,同 strencode() 方法和 bytesdecode() 方法相同,列出于下列表格中。

Codec 错误处理模式

错误模式 描述
strict 数据没有被正确转换将会引发错误。
replace 对于不能编码的数据替换一个特殊的标记字符。
ignore 跳过数据。
xmlcharrefreplace XML 字符 (仅用于编码)
backslashreplace 转移序列 (仅用于编码)

编码错误#

在将 Unicode 数据写入 ASCII 输出流(例如常规文件或者没有健壮的编码集的 sys.stdout )时,最常见的错误条件是接收 UnicodeEncodeError。这个例子程序可以被用于去试验不同的文件处理模式。

codecs_encode_error.py

import codecs
import sys

error_handling = sys.argv[1]

text = 'français'

try:
    # 保存数据,编码为 ASCII,使用命令行中声明的错误处理模式
    with codecs.open('encode_error.txt', 'w',
                     encoding='ascii',
                     errors=error_handling) as f:
        f.write(text)

except UnicodeEncodeError as err:
    print('ERROR:', err)

else:
    # 如果写入文件的时候没有错误,显示文件内容
    with open('encode_error.txt', 'rb') as f:
        print('File contents: {!r}'.format(f.read()))

然而严格模式是最安全的对于确保一个应用程序对于所有的 I / O 操作显式地设置正确的编码,当错误出现的时候可能导致程序崩溃。

$ python3 codecs_encode_error.py strict

ERROR: 'ascii' codec can't encode character '\xe7' in position
4: ordinal not in range(128)

其他的错误处理模式可能比较松散。例如,replace 确保没有错误引发,代价是可能丢失无法转换为请求编码的数据。Unicode 字符 π 仍然不能转换为 ASCII,但是不会引发异常,而是在输出中将它替换为 ?

$ python3 codecs_encode_error.py replace

File contents: b'fran?ais'

为了去跳过整个有问题的数据,使用 ignore。任何不能被编码的字符都将被丢弃。

$ python3 codecs_encode_error.py ignore

File contents: b'franais'

有两个无损的错误处理选项,它们都用一个独立于编码标准定义的替代表示替换字符。xmlcharrefreplace 引用 XML 字符作为替代(字符引用的列表在 W3C 文档 「XML 字符实体定义」中指定)。

$ python3 codecs_encode_error.py xmlcharrefreplace

File contents: b'français'

另一个无损错误处理选项是 backslashreplace,它产生一个输出格式就像打印 unicode 对象的方法 repr() 返回的那样。Unicode 字符被替换为以 \u 开始,接下来是码点的 16 进制值。

$ python3 codecs_encode_error.py backslashreplace

File contents: b'fran\\xe7ais'

解码错误#

很可能在解码数据的时候看到错误,特别是当使用了错误的编码的时候。

codecs_decode_error.py

import codecs
import sys

from codecs_to_hex import to_hex

error_handling = sys.argv[1]

text = 'français'
print('Original     :', repr(text))

# 使用一个编码保存数据
with codecs.open('decode_error.txt', 'w',
                 encoding='utf-16') as f:
    f.write(text)

# 转化文件中的字节
with open('decode_error.txt', 'rb') as f:
    print('File contents:', to_hex(f.read(), 1))

# 尝试以错误的编码读取数据
with codecs.open('decode_error.txt', 'r',
                 encoding='utf-8',
                 errors=error_handling) as f:
    try:
        data = f.read()
    except UnicodeDecodeError as err:
        print('ERROR:', err)
    else:
        print('Read         :', repr(data))

当指定解码格式的时候,strict 错误处理模式在字节流不能被正确解码的时候将会引发错误。这个例子中,UnicodeDecodeError 错误是由于尝试使用 UTF-8 解码器转化部分 UTF-16 BOM 字符。

$ python3 codecs_decode_error.py strict

Original     : 'français'
File contents: b'ff fe 66 00 72 00 61 00 6e 00 e7 00 61 00 69 00
73 00'
ERROR: 'utf-8' codec can't decode byte 0xff in position 0:
invalid start byte

切换为 ignore 将会导致解码器跳过无效的字节。但是结果仍然不尽人意,因为解码输出内嵌了空字节。

$ python3 codecs_decode_error.py ignore

Original     : 'français'
File contents: b'ff fe 66 00 72 00 61 00 6e 00 e7 00 61 00 69 00
73 00'
Read         : 'f\x00r\x00a\x00n\x00\x00a\x00i\x00s\x00'

replace 模式中,无效的字节将被替换为 \uFFFD,官方的 Unicode 替换字符,看起来像一个黑色背景包含白色问好的钻石?。

$ python3 codecs_decode_error.py replace

Original     : 'français'
File contents: b'ff fe 66 00 72 00 61 00 6e 00 e7 00 61 00 69 00
73 00'
Read         : '��f\x00r\x00a\x00n\x00�\x00a\x00i\x00s\x00'

编码翻译#

虽然大多数程序可以再内部使用 str 数据,但是将编解码作为 I / O 操作的一部分,有时候更改文件的编码而不保留中间数据格式是非常有用的。EncodedFile()  将使用了某个编码的文件句柄包装为一个类,这个类在 I / O 发生时将数据转换为另外一种编码。

codecs_encodedfile.py

from codecs_to_hex import to_hex

import codecs
import io

# 原始数据
data = 'français'

# 手动编码为 UTF-8
utf8 = data.encode('utf-8')
print('Start as UTF-8   :', to_hex(utf8, 1))

# 设置输出缓冲池,并将它包装为 EncodedFile
output = io.BytesIO()
encoded_file = codecs.EncodedFile(output, data_encoding='utf-8',
                                  file_encoding='utf-16')
encoded_file.write(utf8)

# 获取缓冲内容,并编码为 UTF-16
utf16 = output.getvalue()
print('Encoded to UTF-16:', to_hex(utf16, 2))

# 使用 UTF-16 数据设置另一个缓冲池
# 并且包装为另一个 EncodedFile
buffer = io.BytesIO(utf16)
encoded_file = codecs.EncodedFile(buffer, data_encoding='utf-8',
                                  file_encoding='utf-16')

# 读取数据的 UTF-8 版本
recoded = encoded_file.read()
print('Back to UTF-8    :', to_hex(recoded, 1))

这个例子显示了从 EncodedFile() 返回并写入单独句柄的过程。无论是用于读还是写,file_encoding 始终引用打开的文件句柄的编码,data_encodingread() 或者 write 调用的时候使用。

$ python3 codecs_encodedfile.py

Start as UTF-8   : b'66 72 61 6e c3 a7 61 69 73'
Encoded to UTF-16: b'fffe 6600 7200 6100 6e00 e700 6100 6900
7300'
Back to UTF-8    : b'66 72 61 6e c3 a7 61 69 73'

非 Unicode 字符#

尽管先前的例子都是用于 Unicode 编码,但是 codecs 可以用于许多其他的数据转换。Python 中包含的 codecs 可以用于处理 base-64bzip2ROT-13ZIP 等其它数据格式。

codecs_rot13.py

import codecs
import io

buffer = io.StringIO()
stream = codecs.getwriter('rot_13')(buffer)

text = 'abcdefghijklmnopqrstuvwxyz'

stream.write(text)
stream.flush()

print('Original:', text)
print('ROT-13  :', buffer.getvalue())

任何可以表示为一个函数的转换只需要一个输入参数并且返回一个字节或者 Unicode 字符串即可注册为一个解码器。对于 「rot_13」,输入应该是一个 Unicode 字符串,输出也将是一个 Unicode 字符串。

$ python3 codecs_rot13.py

Original: abcdefghijklmnopqrstuvwxyz
ROT-13  : nopqrstuvwxyzabcdefghijklm

使用 codecs 封装数据流相比直接使用 zlib 提供了更简单的接口。

codecs_zlib.py

import codecs
import io

from codecs_to_hex import to_hex

buffer = io.BytesIO()
stream = codecs.getwriter('zlib')(buffer)

text = b'abcdefghijklmnopqrstuvwxyz\n' * 50

stream.write(text)
stream.flush()

print('Original length :', len(text))
compressed_data = buffer.getvalue()
print('ZIP compressed  :', len(compressed_data))

buffer = io.BytesIO(compressed_data)
stream = codecs.getreader('zlib')(buffer)

first_line = stream.readline()
print('Read first line :', repr(first_line))

uncompressed_data = first_line + stream.read()
print('Uncompressed    :', len(uncompressed_data))
print('Same            :', text == uncompressed_data)

并非所有的压缩和编码系统支持读取部分数据通过流接口 readline() 或者 read(),因为他们需要找到压缩段的末尾来扩展它。如果程序不能在内存中保存所有未压缩的数据,使用压缩库的增量访问功能而不是编解码器。

$ python3 codecs_zlib.py

Original length : 1350
ZIP compressed  : 48
Read first line : b'abcdefghijklmnopqrstuvwxyz\n'
Uncompressed    : 1350
Same            : True

增量编码#

提供的一些编码,特别是 zlib 或者 bz2,可能会在数据流处理时大幅改变数据流的长度。对于大数据集,这些操作最好是渐进式的,一次只处理小量数据块。IncrementalEncoderIncrementalDecoder API 设计用于这个目的。

codecs_incremental_bz2.py

import codecs
import sys

from codecs_to_hex import to_hex

text = b'abcdefghijklmnopqrstuvwxyz\n'
repetitions = 50

print('Text length :', len(text))
print('Repetitions :', repetitions)
print('Expected len:', len(text) * repetitions)

# 重复文本以构建大量数据
encoder = codecs.getincrementalencoder('bz2')()
encoded = []

print()
print('Encoding:', end=' ')
last = repetitions - 1
for i in range(repetitions):
    en_c = encoder.encode(text, final=(i == last))
    if en_c:
        print('\nEncoded : {} bytes'.format(len(en_c)))
        encoded.append(en_c)
    else:
        sys.stdout.write('.')

all_encoded = b''.join(encoded)
print()
print('Total encoded length:', len(all_encoded))
print()

# 一次只解压一个字节串
decoder = codecs.getincrementaldecoder('bz2')()
decoded = []

print('Decoding:', end=' ')
for i, b in enumerate(all_encoded):
    final = (i + 1) == len(text)
    c = decoder.decode(bytes([b]), final)
    if c:
        print('\nDecoded : {} characters'.format(len(c)))
        print('Decoding:', end=' ')
        decoded.append(c)
    else:
        sys.stdout.write('.')
print()

restored = b''.join(decoded)

print()
print('Total uncompressed length:', len(restored))

每次传递给编码器或者解码器的时候,其内部状态会更新。当状态一致的时候(由编解码器定义),数据返回并且状态重置。在那之前,调用 encode() 或者 decode() 将不会返回任何数据。当传入最后一批数据时,参数 final 应该设置为 True,因此编解码器直到清除所有剩余的缓冲数据。

$ python3 codecs_incremental_bz2.py

Text length : 27
Repetitions : 50
Expected len: 1350

Encoding: .................................................
Encoded : 99 bytes

Total encoded length: 99

Decoding: ......................................................
..................................
Decoded : 1350 characters
Decoding: ..........

Total uncompressed length: 1350

Unicode 数据和网络通信#

网络套接字是字节流,与标准输入和输出不同,他们默认不支持编码。这意味着想要通过网络发送或接收 Unicode 数据的程序必须在写入套接字之前将其编码为字节。这个 Server 将它接收到的数据送回给发送端。

codecs_socket_fail.py

import sys
import socketserver

class Echo(socketserver.BaseRequestHandler):

    def handle(self):
        # 接收字节并送回客户端
        data = self.request.recv(1024)
        self.request.send(data)
        return

if __name__ == '__main__':
    import codecs
    import socket
    import threading

    address = ('localhost', 0)  # 向内核申请一个端口
    server = socketserver.TCPServer(address, Echo)
    ip, port = server.server_address  # 分配的端口和ip?

    t = threading.Thread(target=server.serve_forever)
    t.setDaemon(True)  # 设置为守护进程
    t.start()

    # 连接到服务器
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((ip, port))

    # 发送数据
    # 错误:没有进行编码!
    text = 'français'
    len_sent = s.send(text)

    # 接收一个响应
    response = s.recv(len_sent)
    print(repr(response))

    # 清场
    s.close()
    server.socket.close()

可以在每次调用 send() 之前显示编码数据,但是如果缺少一个 send() 调用将导致编码错误。

$ python3 codecs_socket_fail.py

Traceback (most recent call last):
  File "codecs_socket_fail.py", line 43, in <module>
    len_sent = s.send(text)
TypeError: a bytes-like object is required, not 'str'

使用 makefile() 为套接字获取类文件句柄,然后使用基于流的读取器或者写入器对其进行封装,这意味着 Unicode 字符串将在进出套接字的过程中进行编码。

codecs_socket.py

import sys
import socketserver

class Echo(socketserver.BaseRequestHandler):

    def handle(self):
        """Get some bytes and echo them back to the client.

        There is no need to decode them, since they are not used.

        """
        data = self.request.recv(1024)
        self.request.send(data)

class PassThrough:

    def __init__(self, other):
        self.other = other

    def write(self, data):
        print('Writing :', repr(data))
        return self.other.write(data)

    def read(self, size=-1):
        print('Reading :', end=' ')
        data = self.other.read(size)
        print(repr(data))
        return data

    def flush(self):
        return self.other.flush()

    def close(self):
        return self.other.close()

if __name__ == '__main__':
    import codecs
    import socket
    import threading

    address = ('localhost', 0)  # 让内核分配一个端口
    server = socketserver.TCPServer(address, Echo)
    ip, port = server.server_address  # 分配的端口和ip?

    t = threading.Thread(target=server.serve_forever)
    t.setDaemon(True)  # 设置守护进程
    t.start()

    # 连接服务器
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((ip, port))

    # 使用读取器和写入器包装套接字
    read_file = s.makefile('rb')
    incoming = codecs.getreader('utf-8')(PassThrough(read_file))
    write_file = s.makefile('wb')
    outgoing = codecs.getwriter('utf-8')(PassThrough(write_file))

    # 发送数据
    text = 'français'
    print('Sending :', repr(text))
    outgoing.write(text)
    outgoing.flush()

    # 接收响应
    response = incoming.read()
    print('Received:', repr(response))

    # 清场
    s.close()
    server.socket.close()

这个例子展示了使用 PassThrough 去显示发送之前编码的数据,以及在客户端接收响应后对其进行解码。

$ python3 codecs_socket.py

Sending : 'français'
Writing : b'fran\xc3\xa7ais'
Reading : b'fran\xc3\xa7ais'
Reading : b''
Received: 'français'

自定义编码#

因为 Python 已经自带了大量的标准编解码器,一般情况下应用不可能需要自定义编解码器。但是如果需要的时候,codecs 中有几个基类使这个过程变得更容易。

第一步是去了解编码转换的性质。这些例子将使用 「invertcaps」编码,进行大小写转换。以下是对输入字符串执行此转换的编码函数的简单定义。

codecs_invertcaps.py

import string

def invertcaps(text):
    """Return new string with the case of all letters switched.
    """
    return ''.join(
        c.upper() if c in string.ascii_lowercase
        else c.lower() if c in string.ascii_uppercase
        else c
        for c in text
    )

if __name__ == '__main__':
    print(invertcaps('ABCdef'))
    print(invertcaps('abcDEF'))

这个例子中,编解码器是同一个函数(与 ROT-13 一样)。

$ python3 codecs_invertcaps.py

abcDEF
ABCdef

尽管它很容易理解,但是实现并不高效,特别是对于大文本字符串。幸运的是,codecs 包含一些用于创建基于字符映射的编解码器的辅助函数。字符映射编码由两个字典组成。编码字典将输入字符串中的字符值转换为输出中的字节值,解码字典则以另一种方式运行。首先创建解码映射,然后使用 make_encoding_map() 将其转换为编码映射。C 函数 charmap_encode()charmap_decode() 使用这种映射关系高效地转换他们的输入数据。

codecs_invertcaps_charmap.py

import codecs
import string

# 映射每个字符到它自己
decoding_map = codecs.make_identity_dict(range(256))

# 创建一个大小写字母序数值对列表
pairs = list(zip(
    [ord(c) for c in string.ascii_lowercase],
    [ord(c) for c in string.ascii_uppercase],
))

# 修改映射关系,大小写相互转换
decoding_map.update({
    upper: lower
    for (lower, upper)
    in pairs
})
decoding_map.update({
    lower: upper
    for (lower, upper)
    in pairs
})

# 创建一个独立的编码映射图
encoding_map = codecs.make_encoding_map(decoding_map)

if __name__ == '__main__':
    print(codecs.charmap_encode('abcDEF', 'strict',
                                encoding_map))
    print(codecs.charmap_decode(b'abcDEF', 'strict',
                                decoding_map))
    print(encoding_map == decoding_map)

虽然逆变器的编码和解码图是相同的,但情况并非总是如此。 make_encoding_map() 检测多个输入字符被编码为相同输出的情况,如果这样,就将编码值替换为 None 以将编码标记为未定义的情况。

$ python3 codecs_invertcaps_charmap.py

(b'ABCdef', 6)
('ABCdef', 6)
True

字符映射编码器和解码器支持前面描述的所有标准错误处理方法,因此不需要额外的工作来准守相关的 API 标准。

codecs_invertcaps_error.py

import codecs
from codecs_invertcaps_charmap import encoding_map

text = 'pi: \u03c0'

for error in ['ignore', 'replace', 'strict']:
    try:
        encoded = codecs.charmap_encode(
            text, error, encoding_map)
    except UnicodeEncodeError as err:
        encoded = str(err)
    print('{:7}: {}'.format(error, encoded))

因为 π 的 Unicode 码点没有在这个编码图中,严格模式错误处理将会引发一个异常。

$ python3 codecs_invertcaps_error.py

ignore : (b'PI: ', 5)
replace: (b'PI: ?', 5)
strict : 'charmap' codec can't encode character '\u03c0' in
position 4: character maps to <undefined>

定义编码和解码映射关系之后,还需要设置一些额外的类以注册该编码。register() 向注册表添加搜索功能,以便当用户想要使用编解码器时可以找到它。搜索函数必须采用一个表示编码名称的字符串作为参数,并在找到编码时返回 CodecInfo 对象,否则返回 None

codecs_register.py

import codecs
import encodings

def search1(encoding):
    print('search1: Searching for:', encoding)
    return None

def search2(encoding):
    print('search2: Searching for:', encoding)
    return None

codecs.register(search1)
codecs.register(search2)

utf8 = codecs.lookup('utf-8')
print('UTF-8:', utf8)

try:
    unknown = codecs.lookup('no-such-encoding')
except LookupError as err:
    print('ERROR:', err)

可以注册多个搜索函数,并以此调用每个搜索函数,直到返回一个 CodecInfo 或者找完列表。codecs 内部注册的搜索函数知道怎么去加载标准的编解码器,例如,UTF-8,因此这些名称永远不会传递到自定义搜索函数。

$ python3 codecs_register.py

UTF-8: <codecs.CodecInfo object for encoding utf-8 at
0x1007773a8>
search1: Searching for: no-such-encoding
search2: Searching for: no-such-encoding
ERROR: unknown encoding: no-such-encoding

由搜索函数返回的 CodecInfo 实例告诉 codecs 如何使用所支持的所有不同机制进行编码和解码:无状态,增量和流。codecs 包含基类以帮助设置字符映射编码。本示例将所有这些部分放在一起来注册一个所搜函数,该函数返回为 invertcaps 编解码器配置的 CodecInfo 实例。

codecs_invertcaps_register.py

import codecs

from codecs_invertcaps_charmap import encoding_map, decoding_map

class InvertCapsCodec(codecs.Codec):
    "Stateless encoder/decoder"

    def encode(self, input, errors='strict'):
        return codecs.charmap_encode(input, errors, encoding_map)

    def decode(self, input, errors='strict'):
        return codecs.charmap_decode(input, errors, decoding_map)

class InvertCapsIncrementalEncoder(codecs.IncrementalEncoder):
    def encode(self, input, final=False):
        data, nbytes = codecs.charmap_encode(input,
                                             self.errors,
                                             encoding_map)
        return data

class InvertCapsIncrementalDecoder(codecs.IncrementalDecoder):
    def decode(self, input, final=False):
        data, nbytes = codecs.charmap_decode(input,
                                             self.errors,
                                             decoding_map)
        return data

class InvertCapsStreamReader(InvertCapsCodec,
                             codecs.StreamReader):
    pass

class InvertCapsStreamWriter(InvertCapsCodec,
                             codecs.StreamWriter):
    pass

def find_invertcaps(encoding):
    """Return the codec for 'invertcaps'.
    """
    if encoding == 'invertcaps':
        return codecs.CodecInfo(
            name='invertcaps',
            encode=InvertCapsCodec().encode,
            decode=InvertCapsCodec().decode,
            incrementalencoder=InvertCapsIncrementalEncoder,
            incrementaldecoder=InvertCapsIncrementalDecoder,
            streamreader=InvertCapsStreamReader,
            streamwriter=InvertCapsStreamWriter,
        )
    return None

codecs.register(find_invertcaps)

if __name__ == '__main__':

    # 无状态编解码器
    encoder = codecs.getencoder('invertcaps')
    text = 'abcDEF'
    encoded_text, consumed = encoder(text)
    print('Encoded "{}" to "{}", consuming {} characters'.format(
        text, encoded_text, consumed))

    # 流写入器
    import io
    buffer = io.BytesIO()
    writer = codecs.getwriter('invertcaps')(buffer)
    print('StreamWriter for io buffer: ')
    print('  writing "abcDEF"')
    writer.write('abcDEF')
    print('  buffer contents: ', buffer.getvalue())

    # 增量解码器
    decoder_factory = codecs.getincrementaldecoder('invertcaps')
    decoder = decoder_factory()
    decoded_text_parts = []
    for c in encoded_text:
        decoded_text_parts.append(
            decoder.decode(bytes([c]), final=False)
        )
    decoded_text_parts.append(decoder.decode(b'', final=True))
    decoded_text = ''.join(decoded_text_parts)
    print('IncrementalDecoder converted {!r} to {!r}'.format(
        encoded_text, decoded_text))

无状态编解码器基类是 Codec ,使用新的实现重写了 encode()decode() (这个例子中,分别调用了 charmap_encode()charmap_decode())。每个方法必须返回一个包含转换数据元祖以及消费的输入字节或者字符的数量。

IncrementalEncoder 和 IncrementalDecoder 作为增量接口的基类。增量类的 encode()decode() 方法是这样定义的,它们只返回实际转换后的数据。然和有关缓冲池的信息都将保存为内部状态。「invertcaps」不需要缓冲数据(它使用一对一映射)。对于根据正在处理的数据产生不同输出量的编码(例如压缩算法),BufferedIncrementalEncoderBufferedIncrementalDecoder 是更合适的基类,因为他们管理输入的未处理部分。

StreamReader 和 StreamWriter 也需要 encode()decode() 方法,并且由于他们需要返回与 Codec 版本相同的值,因此可以使用多重继承来实现。 

$ python3 codecs_invertcaps_register.py

Encoded "abcDEF" to "b'ABCdef'", consuming 6 characters
StreamWriter for io buffer:
  writing "abcDEF"
  buffer contents:  b'ABCdef'
IncrementalDecoder converted b'ABCdef' to 'abcDEF'

推荐阅读#

  • codecs 标准库文档
  • locale -- 访问管理基于本地化的配置设置行为。
  • io -- io 模块也包含了处理编解码的文件和流包装。
  • socketserver -- 更详细的 echo 服务器的例子,请看 socketserver 模块。
  • encodings -- Python 标准库提供的编解码器实现包。
  • PEP 100 -- Python Unicode 集成 PEP。
  • Unicode 入门 -- Python 官方 Unicode 使用指导。
  • Text vs. Data Instead of Unicode vs. 8-bit -- Python 3.0 的「新增功能」文章中关于文本处理更改的部分。
  • Python Unicode 对象 -- Fredrik Lundh 关于在 Python 2.0 中使用非 ASCII 字符集的文章。
  • Python 中如何使用 UTF-8 -- Evan Jones 使用 Unicode 的快速指南,包括 XML 数据和 Byte-Order 标记。
  • Unicode 优点 -- Bray 介绍国际化和 Unicode 。
  • 关于字符串 -- Bray 介绍了编程语言中字符串处理的历史。
  • 字符 vs. 字节 -- Tim Bray 的「计算机程序员现代字符串处理的文章」的第一部分。本部分涵盖了 ASCII 字节以外格式的文本内存表示。
  • 字节顺序 -- 维基百科中字节顺序解释。
  • W3C XML 字符实体定义 -- 无法用编码表示的字符引用的 XML 表示规范。

本文章首发在 LearnKu.com 网站上。

本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

原文地址:https://learnku.com/docs/pymotw/codecs-c...

译文地址:https://learnku.com/docs/pymotw/codecs-c...

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~