8.5. sqlite3 — 嵌入式关系型数据库

未匹配的标注

目的:实现支持 SQL 的内嵌关系型数据库。

 sqlite3 模块基于 Python DB-API 2.0 实现了与SQLite(进程内关系型数据库) 兼容的接口。SQLite 被设计为主要应用在应用程序内部,不像 MySQL, PostgreSQL 或者 Oracle 那样需要一个单独的数据库服务器。它快速,灵活并且经过严格测试,适用于一些应用的原型和生产部署。

创建一个数据库

SQLite 数据库做为单个文件存储在文件系统中。它管理对文件的访问,包括锁定文件以防止多个写入时发生损坏。数据库在第一次访问文件时创建,但在应用程序内管理数据库中的表定义或 schema

这个示例在使用 connect() 打开数据库之前查找数据库文件,以便知晓何时数据库创建。

sqlite3_createdb.py

import os
import sqlite3

db_filename = 'todo.db'

db_is_new = not os.path.exists(db_filename)

conn = sqlite3.connect(db_filename)

if db_is_new:
    print('Need to create schema')
else:
    print('Database exists, assume schema does, too.')

conn.close()

运行这段脚本两次说明当一个数据库文件不存在时会先创建一个空的文件。

$ ls *.db

ls: *.db: No such file or directory

$ python3 sqlite3_createdb.py

Need to create schema

$ ls *.db

todo.db

$ python3 sqlite3_createdb.py

Database exists, assume schema does, too.

创建一个新的数据库文件后,下一步是创建一个 schema 去定义数据库内的表。本章接下来的示例都会使用一个相同的数据库表结构去管理下面两个表。

表 project

字段 类型 描述
name text 项目名称
description text 很长的项目描述
deadline date 整个项目的交付时间

表 task

字段 类型 描述
id number 单独的任务标识
priority integer 任务优先级,数字越低越重要
details text 详细的任务信息
status text 任务进度(「新建」,「等待」,「完成」或「取消」)
deadline date 任务的交付时间
completed_on date 任务的完成时间
project text 任务的项目名称

这里的创建表使用数据定义语言( DDL )表述为:

todo_schema.sql

-- to-do 应用示例的数据库表结构

-- 项目表是由任务组成的
create table project (
    name        text primary key,
    description text,
    deadline    date
);

-- 任务是完成项目的步骤
create table task (
    id           integer primary key autoincrement not null,
    priority     integer default 1,
    details      text,
    status       text,
    deadline     date,
    completed_on date,
    project      text not null references project(name)
);

 Connection 中的 executescript() 方法可以用来执行 DDL 指令去创建数据库表结构。

sqlite3_create_schema.py

import os
import sqlite3

db_filename = 'todo.db'
schema_filename = 'todo_schema.sql'

db_is_new = not os.path.exists(db_filename)

with sqlite3.connect(db_filename) as conn:
    if db_is_new:
        print('Creating schema')
        with open(schema_filename, 'rt') as f:
            schema = f.read()
        conn.executescript(schema)

        print('Inserting initial data')

        conn.executescript("""
        insert into project (name, description, deadline)
        values ('pymotw', 'Python Module of the Week',
                '2016-11-01');

        insert into task (details, status, deadline, project)
        values ('write about select', 'done', '2016-04-25',
                'pymotw');

        insert into task (details, status, deadline, project)
        values ('write about random', 'waiting', '2016-08-22',
                'pymotw');

        insert into task (details, status, deadline, project)
        values ('write about sqlite3', 'active', '2017-07-31',
                'pymotw');
        """)
    else:
        print('Database exists, assume schema does, too.')

在创建表之后,一些 insert 语句用于创建一个示例项目和相关的任务。 sqlite3 命令行工具可用于检查数据库的内容。

$ rm -f todo.db
$ python3 sqlite3_create_schema.py

Creating schema
Inserting initial data

$ sqlite3 todo.db 'select * from task'

1|1|write about select|done|2016-04-25||pymotw
2|1|write about random|waiting|2016-08-22||pymotw
3|1|write about sqlite3|active|2017-07-31||pymotw

取出数据

在 Python 中,通过一个数据库链接创建一个 游标 可以取出保存在 task 表中的值。游标可以产生一致的数据视图,并且是与 SQLite 等事务型数据库系统进行交互的主要手段。

sqlite3_select_tasks.py

import sqlite3

db_filename = 'todo.db'

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    cursor.execute("""
    select id, priority, details, status, deadline from task
    where project = 'pymotw'
    """)

    for row in cursor.fetchall():
        task_id, priority, details, status, deadline = row
        print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
            task_id, priority, details, status, deadline))

查询是一个两步操作。首先,游标的 execute() 方法会告诉数据库引擎收集什么样的数据。然后,使用 fetchall() 方法去取回结果。最终返回的值是一系列元祖,包含查询的 select 语句需要的字段的值。

$ python3 sqlite3_select_tasks.py

 1 [1] write about select        [done    ] (2016-04-25)
 2 [1] write about random        [waiting ] (2016-08-22)
 3 [1] write about sqlite3       [active  ] (2017-07-31)

可以用 fetchone() 一次取出一个结果,也可以用 fetchmany() 批量取出固定数量的结果。

sqlite3_select_variations.py

import sqlite3

db_filename = 'todo.db'

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    cursor.execute("""
    select name, description, deadline from project
    where name = 'pymotw'
    """)
    name, description, deadline = cursor.fetchone()

    print('Project details for {} ({})\n  due {}'.format(
        description, name, deadline))

    cursor.execute("""
    select id, priority, details, status, deadline from task
    where project = 'pymotw' order by deadline
    """)

    print('\nNext 5 tasks:')
    for row in cursor.fetchmany(5):
        task_id, priority, details, status, deadline = row
        print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
            task_id, priority, details, status, deadline))

传递给 fetchmany() 的值是要返回数据的最大条数。 如果没有那么多条,则有多少返回多少。

$ python3 sqlite3_select_variations.py

Project details for Python Module of the Week (pymotw)
  due 2016-11-01

Next 5 tasks:
 1 [1] write about select        [done    ] (2016-04-25)
 2 [1] write about random        [waiting ] (2016-08-22)
 3 [1] write about sqlite3       [active  ] (2017-07-31)

查询元数据

依据 DB-API 2.0 的规范,在调用 execute() 方法后, 游标 应该设置其 description 属性去保存 fetch 方法返回的数据信息。这些数据信息应该是包含字段名称,类型,显示大小,内部大小,精度,比例和是否接受空值的一系列元组。

sqlite3_cursor_description.py

import sqlite3

db_filename = 'todo.db'

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    cursor.execute("""
    select * from task where project = 'pymotw'
    """)

    print('Task table has these columns:')
    for colinfo in cursor.description:
        print(colinfo)

由于 sqlite3 不会对插入数据库的数据进行类型或大小的限制,因此这里只填入字段名称。

$ python3 sqlite3_cursor_description.py

Task table has these columns:
('id', None, None, None, None, None, None)
('priority', None, None, None, None, None, None)
('details', None, None, None, None, None, None)
('status', None, None, None, None, None, None)
('deadline', None, None, None, None, None, None)
('completed_on', None, None, None, None, None, None)
('project', None, None, None, None, None, None)

行对象

默认情况下,fetch 方法从数据库中返回的值是以「行」的形式的元组。 调用方需要依据查询语句中的字段顺序并从元组中得到各个值。 当查询的数量增加或处理数据的代码在不同的地方时,使用对象结合字段名称得到查询结果通常会更方便。 这样,随着查询语句的变化,元组内容的数量和顺序也可以相应的变化,并且依赖查询结果去执行的代码不容易受到影响。

Connection 对象有一个 row_factory 属性,该属性允许调用的代码去控制查询结果集中的每行的对象类型。 sqlite3 还包含一个 Row 类,可以用作行工厂,即 row_factory 。 字段的值可以通过 Row 的实例对象的索引或名称访问。

sqlite3_row_factory.py

import sqlite3

db_filename = 'todo.db'

with sqlite3.connect(db_filename) as conn:
    # 修改行工厂为 Row
    conn.row_factory = sqlite3.Row

    cursor = conn.cursor()

    cursor.execute("""
    select name, description, deadline from project
    where name = 'pymotw'
    """)
    name, description, deadline = cursor.fetchone()

    print('Project details for {} ({})\n  due {}'.format(
        description, name, deadline))

    cursor.execute("""
    select id, priority, status, deadline, details from task
    where project = 'pymotw' order by deadline
    """)

    print('\nNext 5 tasks:')
    for row in cursor.fetchmany(5):
        print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
            row['id'], row['priority'], row['details'],
            row['status'], row['deadline'],
        ))

这个版本的 sqlite3_select_variations.py 示例使用 Row 实例代替了元组。 打印表 project 中的行仍然通过位置来得到字段值,但打印表 task 中的查询结果使用了关键字去查找,因此 print 语句中字段名称的顺序更改就无所谓了。

$ python3 sqlite3_row_factory.py

Project details for Python Module of the Week (pymotw)
  due 2016-11-01

Next 5 tasks:
 1 [1] write about select        [done    ] (2016-04-25)
 2 [1] write about random        [waiting ] (2016-08-22)
 3 [1] write about sqlite3       [active  ] (2017-07-31)

在查询中使用变量

使用定义好的字符串在一个程序中去查询是不灵活的。 例如,当另一个表 project 被添加到数据库时,显示前五个 task 的查询语句就需要去修改来适配新的表。增加灵活性的一种方法是通过 Python 进行字符串组合拼接来构建出所需要的 SQL 语句,但是这种方式存在安全问题,应该避免。 如果无法在查询语句的变量部分正确转义特殊字符,可能会导致 SQL 解析错误,更糟糕的情况可能导致 SQL注入攻击 的安全漏洞,它允许入侵者在数据库中执行任意 SQL 语句。

在查询中使用动态值的正确方法是将 host variables 和 SQL指令一起传递给  execute() 方法。执行语句时,SQL 中的占位符会将 host variables 的值进行替换。 使用 host variables 而不是将任意值插入到 SQL 中即可避免注入攻击,因为不可信的值是不会影响 SQL 的解析的。 SQLite 支持两种形式的占位符,位置参数或是命名参数。

位置参数

问号()表示位置参数,会作为元组中的一个元素传入 execute() 方法。

sqlite3_argument_positional.py

import sqlite3
import sys

db_filename = 'todo.db'
project_name = sys.argv[1]

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    query = """
    select id, priority, details, status, deadline from task
    where project = ?
    """

    cursor.execute(query, (project_name,))

    for row in cursor.fetchall():
        task_id, priority, details, status, deadline = row
        print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
            task_id, priority, details, status, deadline))

命令行参数作为位置参数安全地传入查询中,是没法破坏到数据库的。

$ python3 sqlite3_argument_positional.py pymotw

 1 [1] write about select        [done    ] (2016-04-25)
 2 [1] write about random        [waiting ] (2016-08-22)
 3 [1] write about sqlite3       [active  ] (2017-07-31)

命名参数

当需要大量参数的复杂查询,或者在查询中需要多次重复某些参数时,可以使用命名参数。 命名参数以冒号作为前缀(例如  :param_name )。

sqlite3_argument_named.py

import sqlite3
import sys

db_filename = 'todo.db'
project_name = sys.argv[1]

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    query = """
    select id, priority, details, status, deadline from task
    where project = :project_name
    order by deadline, priority
    """

    cursor.execute(query, {'project_name': project_name})

    for row in cursor.fetchall():
        task_id, priority, details, status, deadline = row
        print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
            task_id, priority, details, status, deadline))

无论是位置参数还是命名参数都不需要引用或转义,因为查询解析器已经进行了特殊处理。

$ python3 sqlite3_argument_named.py pymotw

 1 [1] write about select        [done    ] (2016-04-25)
 2 [1] write about random        [waiting ] (2016-08-22)
 3 [1] write about sqlite3       [active  ] (2017-07-31)

查询参数可以与 selectinsertupdate 语句一起使用。 它们可以出现在一个表面合法的查询的任意部分。

sqlite3_argument_update.py

import sqlite3
import sys

db_filename = 'todo.db'
id = int(sys.argv[1])
status = sys.argv[2]

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()
    query = "update task set status = :status where id = :id"
    cursor.execute(query, {'status': status, 'id': id})

这个例子中 update 语句使用了两个命名参数。 id 用于查找出要修改的正确的行,status 用于将值写入表中。

$ python3 sqlite3_argument_update.py 2 done
$ python3 sqlite3_argument_named.py pymotw

 1 [1] write about select        [done    ] (2016-04-25)
 2 [1] write about random        [done    ] (2016-08-22)
 3 [1] write about sqlite3       [active  ] (2017-07-31)

批量加载

要将相同的 SQL 指令应用于大量数据,请使用 executemany() 方法。 它对于加载数据很有用,因为它避免了在 Python 中循环输入,并对底层的循环进行了优化。 下面的示例程序使用 csv  模块从一个 csv 文件中读取一系列数值并将它们加载到数据库中。

sqlite3_load_csv.py

import csv
import sqlite3
import sys

db_filename = 'todo.db'
data_filename = sys.argv[1]

SQL = """
insert into task (details, priority, status, deadline, project)
values (:details, :priority, 'active', :deadline, :project)
"""

with open(data_filename, 'rt') as csv_file:
    csv_reader = csv.DictReader(csv_file)

    with sqlite3.connect(db_filename) as conn:
        cursor = conn.cursor()
        cursor.executemany(SQL, csv_reader)

 tasks.csv 文件包含如下内容:

deadline,project,priority,details
2016-11-30,pymotw,2,"finish reviewing markup"
2016-08-20,pymotw,2,"revise chapter intros"
2016-11-01,pymotw,1,"subtitle"

运行这个程序的结果如下:

$ python3 sqlite3_load_csv.py tasks.csv
$ python3 sqlite3_argument_named.py pymotw

 1 [1] write about select        [done    ] (2016-04-25)
 5 [2] revise chapter intros     [active  ] (2016-08-20)
 2 [1] write about random        [done    ] (2016-08-22)
 6 [1] subtitle                  [active  ] (2016-11-01)
 4 [2] finish reviewing markup   [active  ] (2016-11-30)
 3 [1] write about sqlite3       [active  ] (2017-07-31)

定义新的字段类型

SQLite 本身就支持整数,浮点数和文本字段。 这些类型的数据由 sqlite3 模块从 Python 中表示的值自动转换为可以存储在数据库中的值,并根据需要返回。 整数值在数据库中会加载到 int 或 long 变量中,具体取决于值的大小。 如果 Connectiontext_factory 属性未被更改,文本值的保存和取出都是 str

尽管 SQLite 内部只支持很少的数据类型,但  sqlite3 模块包含了定义自定义类型的工具,以允许 Python 应用程序在字段中存储任何类型的数据。 在数据库连接时使用 detect_types 标志可以开启对默认支持字段类型以外的类型进行转换。如果在定义表时需要自定义的字段,请使用 PARSE_DECLTYPES

sqlite3_date_types.py

import sqlite3
import sys

db_filename = 'todo.db'

sql = "select id, details, deadline from task"

def show_deadline(conn):
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    cursor.execute(sql)
    row = cursor.fetchone()
    for col in ['id', 'details', 'deadline']:
        print('  {:<8}  {!r:<26} {}'.format(
            col, row[col], type(row[col])))
    return

print('Without type detection:')
with sqlite3.connect(db_filename) as conn:
    show_deadline(conn)

print('\nWith type detection:')
with sqlite3.connect(db_filename,
                     detect_types=sqlite3.PARSE_DECLTYPES,
                     ) as conn:
    show_deadline(conn)

sqlite3  模块使用了datetime模块中的 date 类和  datetime 类对日期和时间字段提供转换器。类型检测打开时,这些日期相关的转换器也自动是可用状态。

$ python3 sqlite3_date_types.py

Without type detection:
  id        1                          <class 'int'>
  details   'write about select'       <class 'str'>
  deadline  '2016-04-25'               <class 'str'>

With type detection:
  id        1                          <class 'int'>
  details   'write about select'       <class 'str'>
  deadline  datetime.date(2016, 4, 25) <class 'datetime.date'>

当定义一个新的字段类型时需要注册两个函数。 适配器 函数用于将 Python 对象做为输入转换为一个字节串存储在数据库中。转换器 函数从数据库中取出数据转换为一个 Python 对象。使用 register_adapter() 可以注册一个适配器函数, register_converter() 可以注册一个转换器函数。

sqlite3_custom_type.py

import pickle
import sqlite3

db_filename = 'todo.db'

def adapter_func(obj):
    """从内存转换对象为可存储
    """
    print('adapter_func({})\n'.format(obj))
    return pickle.dumps(obj)

def converter_func(data):
    """从存储数据中转换为一个内存中的对象
    """
    print('converter_func({!r})\n'.format(data))
    return pickle.loads(data)

class MyObj:

    def __init__(self, arg):
        self.arg = arg

    def __str__(self):
        return 'MyObj({!r})'.format(self.arg)

# 注册函数控制字段类型
sqlite3.register_adapter(MyObj, adapter_func)
sqlite3.register_converter("MyObj", converter_func)

# 创建需要保存的一些对象。使用包含一系列元组
# 的列表便于直接传入 executemany()函数。
to_save = [
    (MyObj('this is a value to save'),),
    (MyObj(42),),
]

with sqlite3.connect(
        db_filename,
        detect_types=sqlite3.PARSE_DECLTYPES) as conn:
    # 创建一个表包含 "MyObj" 的字段类型
    conn.execute("""
    create table if not exists obj (
        id    integer primary key autoincrement not null,
        data  MyObj
    )
    """)
    cursor = conn.cursor()

    # 在数据库中插入对象
    cursor.executemany("insert into obj (data) values (?)",
                       to_save)

    # 查询刚刚插入的结果
    cursor.execute("select id, data from obj")
    for obj_id, obj in cursor.fetchall():
        print('Retrieved', obj_id, obj)
        print('  with type', type(obj))
        print()

这个示例使用 pickle 模块将一个对象转换为字符串存储在数据库中,这是一种存储任意对象很有用的方法,但它无法基于对象的属性进行查询。一个真正的 对象关系映射 系统,比如 SQLAlchemy 有其自己的存储字段的方式,对存储海量数据是非常有效的。

$ python3 sqlite3_custom_type.py

adapter_func(MyObj('this is a value to save'))

adapter_func(MyObj(42))

converter_func(b'\x80\x03c__main__\nMyObj\nq\x00)\x81q\x01}q\x02X\x0
3\x00\x00\x00argq\x03X\x17\x00\x00\x00this is a value to saveq\x04sb
.')

converter_func(b'\x80\x03c__main__\nMyObj\nq\x00)\x81q\x01}q\x02X\x0
3\x00\x00\x00argq\x03K*sb.')

Retrieved 1 MyObj('this is a value to save')
  with type <class '__main__.MyObj'>

Retrieved 2 MyObj(42)
  with type <class '__main__.MyObj'>

确定字段类型

有两种方式定义一条查询语句中的字段类型信息。一种是如前所示的原始表格声明可以识别出实际的字段类型。还有一种是在查询语句的 select 从句中使用 as "name [type]" 也可以指定字段类型。

sqlite3_custom_type_column.py

import pickle
import sqlite3

db_filename = 'todo.db'

def adapter_func(obj):
    """从内存转换对象为可存储
    """
    print('adapter_func({})\n'.format(obj))
    return pickle.dumps(obj)

def converter_func(data):
    """从存储数据中转换为一个内存中的对象
    """
    print('converter_func({!r})\n'.format(data))
    return pickle.loads(data)

class MyObj:

    def __init__(self, arg):
        self.arg = arg

    def __str__(self):
        return 'MyObj({!r})'.format(self.arg)

# 注册函数控制字段类型
sqlite3.register_adapter(MyObj, adapter_func)
sqlite3.register_converter("MyObj", converter_func)

# 创建需要保存的一些对象。使用包含一系列元组
# 的列表便于直接传入 executemany()函数。
to_save = [
    (MyObj('this is a value to save'),),
    (MyObj(42),),
]

with sqlite3.connect(
        db_filename,
        detect_types=sqlite3.PARSE_COLNAMES) as conn:
    # 创建一个包含 "text" 类型字段的表
    conn.execute("""
    create table if not exists obj2 (
        id    integer primary key autoincrement not null,
        data  text
    )
    """)
    cursor = conn.cursor()

    # 在数据库中插入对象
    cursor.executemany("insert into obj2 (data) values (?)",
                       to_save)

    # 查询刚刚插入的结果,使用 `as "name [type]"` 
        # 转换 text 文本为对象
    cursor.execute(
        'select id, data as "pickle [MyObj]" from obj2',
    )
    for obj_id, obj in cursor.fetchall():
        print('Retrieved', obj_id, obj)
        print('  with type', type(obj))
        print()

当类型是查询的一部分而不是原始表的定义时,将 detect_types 标志 设为 PARSE_COLNAMES

$ python3 sqlite3_custom_type_column.py

adapter_func(MyObj('this is a value to save'))

adapter_func(MyObj(42))

converter_func(b'\x80\x03c__main__\nMyObj\nq\x00)\x81q\x01}q\x02X\x0
3\x00\x00\x00argq\x03X\x17\x00\x00\x00this is a value to saveq\x04sb
.')

converter_func(b'\x80\x03c__main__\nMyObj\nq\x00)\x81q\x01}q\x02X\x0
3\x00\x00\x00argq\x03K*sb.')

Retrieved 1 MyObj('this is a value to save')
  with type <class '__main__.MyObj'>

Retrieved 2 MyObj(42)
  with type <class '__main__.MyObj'>

事务

关系型数据库的一个核心特性就是使用 事务 来保证一致的内部状态。启用事务后,多项数据变更可以在一个连接内实现而不会影响到其他用户,直到结果已经 提交 并刷新至实际的数据库中。

保存更改

无论通过 insert 还是  update 语句,若想数据库变更得以保存,必须明确的调用 commit() 方法。这项要求使得应用程序可以将多项相关的数据修改一起提交,所以这属于 原子性 而非增量性,而且避免了多个客户端同时连接至数据库仅看到部分数据更新了的情况。

调用 commit() 方法的效果可以通过一个程序内多个数据库连接来研究。下面示例中,一条新的数据通过第一个连接插入数据库,然后另外两个连接尝试去读取它。

sqlite3_transaction_commit.py

import sqlite3

db_filename = 'todo.db'

def show_projects(conn):
    cursor = conn.cursor()
    cursor.execute('select name, description from project')
    for name, desc in cursor.fetchall():
        print('  ', name)

with sqlite3.connect(db_filename) as conn1:
    print('Before changes:')
    show_projects(conn1)

    # 插入一个游标
    cursor1 = conn1.cursor()
    cursor1.execute("""
    insert into project (name, description, deadline)
    values ('virtualenvwrapper', 'Virtualenv Extensions',
            '2011-01-01')
    """)

    print('\nAfter changes in conn1:')
    show_projects(conn1)

    # 通过另一个连接查询,先不提交
    print('\nBefore commit:')
    with sqlite3.connect(db_filename) as conn2:
        show_projects(conn2)

    # 提交后通过另一个连接查询
    conn1.commit()
    print('\nAfter commit:')
    with sqlite3.connect(db_filename) as conn3:
        show_projects(conn3)

当 conn1 提交前调用 show_projects() 方法,查询的结果取决于当前是用哪个数据库连接查询的。由于是通过 conn1 进行修改的,因此通过它看到的是修改后的数据。然而,conn2 看到的是修改前的。在提交修改后, conn3 也能看到新插入的数据了。

$ python3 sqlite3_transaction_commit.py

Before changes:
   pymotw

After changes in conn1:
   pymotw
   virtualenvwrapper

Before commit:
   pymotw

After commit:
   pymotw
   virtualenvwrapper

放弃修改

未提交的数据修改也可以通过调用  rollback() 方法完全放弃修改。通常在一个 try:except 代码块中使用  commit() 方法和 rollback() 方法处理数据的提交和出现错误时的数据回滚。

sqlite3_transaction_rollback.py

import sqlite3

db_filename = 'todo.db'

def show_projects(conn):
    cursor = conn.cursor()
    cursor.execute('select name, description from project')
    for name, desc in cursor.fetchall():
        print('  ', name)

with sqlite3.connect(db_filename) as conn:

    print('Before changes:')
    show_projects(conn)

    try:

        # 插入
        cursor = conn.cursor()
        cursor.execute("""delete from project
                       where name = 'virtualenvwrapper'
                       """)

        # 显示设置
        print('\nAfter delete:')
        show_projects(conn)

        # 假装处理导致一个错误
        raise RuntimeError('simulated error')

    except Exception as err:
        # 放弃数据变更
        print('ERROR:', err)
        conn.rollback()

    else:
        # 保存数据变更
        conn.commit()

    # 显示结果
    print('\nAfter rollback:')
    show_projects(conn)

在调用 rollback() 方法后,将不存在对数据库的修改。

$ python3 sqlite3_transaction_rollback.py

Before changes:
   pymotw
   virtualenvwrapper

After delete:
   pymotw
ERROR: simulated error

After rollback:
   pymotw
   virtualenvwrapper

隔离级别

sqlite3 支持三种锁定模式,称为 隔离级别,它用于防止连接之间发生不兼容的数据变更。当连接打开时,可以通过传递一个字符串作为 isolation_level 的参数来设置隔离级别,因此不同的连接可以使用不同的隔离级别。

下面的这个程序演示了不同的隔离级别使用不同的连接至同一个数据库的线程事件顺序的影响。创建了四个线程,两个线程用于写入数据更新已经存在的两行数据,另外两个线程尝试读取表 task 的所有行。

sqlite3_isolation_levels.py

import logging
import sqlite3
import sys
import threading
import time

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-10s) %(message)s',
)

db_filename = 'todo.db'
isolation_level = sys.argv[1]

def writer():
    with sqlite3.connect(
            db_filename,
            isolation_level=isolation_level) as conn:
        cursor = conn.cursor()
        cursor.execute('update task set priority = priority + 1')
        logging.debug('waiting to synchronize')
        ready.wait()  # 同步线程
        logging.debug('PAUSING')
        time.sleep(1)
        conn.commit()
        logging.debug('CHANGES COMMITTED')

def reader():
    with sqlite3.connect(
            db_filename,
            isolation_level=isolation_level) as conn:
        cursor = conn.cursor()
        logging.debug('waiting to synchronize')
        ready.wait()  # 同步线程
        logging.debug('wait over')
        cursor.execute('select * from task')
        logging.debug('SELECT EXECUTED')
        cursor.fetchall()
        logging.debug('results fetched')

if __name__ == '__main__':
    ready = threading.Event()

    threads = [
        threading.Thread(name='Reader 1', target=reader),
        threading.Thread(name='Reader 2', target=reader),
        threading.Thread(name='Writer 1', target=writer),
        threading.Thread(name='Writer 2', target=writer),
    ]

    [t.start() for t in threads]

    time.sleep(1)
    logging.debug('setting ready')
    ready.set()

    [t.join() for t in threads]

这些线程使用 threading 模块中的 Event 对象同步。 writer() 方法连接并改变数据库,但在事件触发前并未提交。 reader() 方法连接并读取数据库直到事件同步生效。

延迟模式

默认的隔离模式是 DEFERRED 。使用延迟模式时,只有在一个数据变更发生时是延迟的,所有示例都是使用延迟模式的。

$ python3 sqlite3_isolation_levels.py DEFERRED

2016-08-20 17:46:26,972 (Reader 1  ) waiting to synchronize
2016-08-20 17:46:26,972 (Reader 2  ) waiting to synchronize
2016-08-20 17:46:26,973 (Writer 1  ) waiting to synchronize
2016-08-20 17:46:27,977 (MainThread) setting ready
2016-08-20 17:46:27,979 (Reader 1  ) wait over
2016-08-20 17:46:27,979 (Writer 1  ) PAUSING
2016-08-20 17:46:27,979 (Reader 2  ) wait over
2016-08-20 17:46:27,981 (Reader 1  ) SELECT EXECUTED
2016-08-20 17:46:27,982 (Reader 1  ) results fetched
2016-08-20 17:46:27,982 (Reader 2  ) SELECT EXECUTED
2016-08-20 17:46:27,982 (Reader 2  ) results fetched
2016-08-20 17:46:28,985 (Writer 1  ) CHANGES COMMITTED
2016-08-20 17:46:29,043 (Writer 2  ) waiting to synchronize
2016-08-20 17:46:29,043 (Writer 2  ) PAUSING
2016-08-20 17:46:30,044 (Writer 2  ) CHANGES COMMITTED

即时模式

当一个数据变更发生时,为了防止其他游标也去修改数据库,即时模式会迅速锁定数据库直到事务提交。它适用于复杂的写入数据库,而且是读取多于写入,因为在事务处理时读取不会被阻塞。

$ python3 sqlite3_isolation_levels.py IMMEDIATE

2016-08-20 17:46:30,121 (Reader 1  ) waiting to synchronize
2016-08-20 17:46:30,121 (Reader 2  ) waiting to synchronize
2016-08-20 17:46:30,123 (Writer 1  ) waiting to synchronize
2016-08-20 17:46:31,122 (MainThread) setting ready
2016-08-20 17:46:31,122 (Reader 1  ) wait over
2016-08-20 17:46:31,122 (Reader 2  ) wait over
2016-08-20 17:46:31,122 (Writer 1  ) PAUSING
2016-08-20 17:46:31,124 (Reader 1  ) SELECT EXECUTED
2016-08-20 17:46:31,124 (Reader 2  ) SELECT EXECUTED
2016-08-20 17:46:31,125 (Reader 2  ) results fetched
2016-08-20 17:46:31,125 (Reader 1  ) results fetched
2016-08-20 17:46:32,128 (Writer 1  ) CHANGES COMMITTED
2016-08-20 17:46:32,199 (Writer 2  ) waiting to synchronize
2016-08-20 17:46:32,199 (Writer 2  ) PAUSING
2016-08-20 17:46:33,200 (Writer 2  ) CHANGES COMMITTED

独占模式

独占模式会对数据库的所有读取和写入操作锁定。当数据库性能很重要时,应该慎用独占模式,因为每个独占连接都会对其他用户阻塞。

$ python3 sqlite3_isolation_levels.py EXCLUSIVE

2016-08-20 17:46:33,320 (Reader 1  ) waiting to synchronize
2016-08-20 17:46:33,320 (Reader 2  ) waiting to synchronize
2016-08-20 17:46:33,324 (Writer 2  ) waiting to synchronize
2016-08-20 17:46:34,323 (MainThread) setting ready
2016-08-20 17:46:34,323 (Reader 1  ) wait over
2016-08-20 17:46:34,323 (Writer 2  ) PAUSING
2016-08-20 17:46:34,323 (Reader 2  ) wait over
2016-08-20 17:46:35,327 (Writer 2  ) CHANGES COMMITTED
2016-08-20 17:46:35,368 (Reader 2  ) SELECT EXECUTED
2016-08-20 17:46:35,368 (Reader 2  ) results fetched
2016-08-20 17:46:35,369 (Reader 1  ) SELECT EXECUTED
2016-08-20 17:46:35,369 (Reader 1  ) results fetched
2016-08-20 17:46:35,385 (Writer 1  ) waiting to synchronize
2016-08-20 17:46:35,385 (Writer 1  ) PAUSING
2016-08-20 17:46:36,386 (Writer 1  ) CHANGES COMMITTED

由于第一次写入已经开始修改数据,读取和第二次写入将阻塞直到第一次写入提交。 sleep() 在写入线程中引入了一个假设的延迟,为了突显出其他连接正在阻塞。

自动提交模式

将连接的 isolation_level 参数设为 None 会开启自动提交模式。当开启该模式时,每条 execute() 方法中的 SQL 语句执行后会立即提交。自动提交模式适用于简短的事务,例如插入少量数据至单个表。数据库被锁定的时间会尽可能的短,所以不同线程竞争的机会也会小很多。

在 sqlite3_autocommit.py 中,隔离级别被设为 None ,也移除了显式的调用 commit() 方法。输出是不同的,但两个写入的线程在任一读取线程开始查询时已经完成了。

$ python3 sqlite3_autocommit.py

2016-08-20 17:46:36,451 (Reader 1  ) waiting to synchronize
2016-08-20 17:46:36,451 (Reader 2  ) waiting to synchronize
2016-08-20 17:46:36,455 (Writer 1  ) waiting to synchronize
2016-08-20 17:46:36,456 (Writer 2  ) waiting to synchronize
2016-08-20 17:46:37,452 (MainThread) setting ready
2016-08-20 17:46:37,452 (Reader 1  ) wait over
2016-08-20 17:46:37,452 (Writer 2  ) PAUSING
2016-08-20 17:46:37,452 (Reader 2  ) wait over
2016-08-20 17:46:37,453 (Writer 1  ) PAUSING
2016-08-20 17:46:37,453 (Reader 1  ) SELECT EXECUTED
2016-08-20 17:46:37,454 (Reader 2  ) SELECT EXECUTED
2016-08-20 17:46:37,454 (Reader 1  ) results fetched
2016-08-20 17:46:37,454 (Reader 2  ) results fetched

内存数据库

SQLite 支持在 RAM 中管理整个数据库,而无需一个硬盘文件。内存数据库对自动化测试非常有用,这种情况下,数据在不同的测试运行时不需要保留。或者在实验一些模式或其他数据库特性时,内存数据库也很有用。要打开一个内存数据库,在创建 Connection 时使用 ':memory:' 字符串代替文件名称即可。每个 ':memory:' 连接都会创建一个单独的数据库实例,所以一个实例的数据变更不会影响到其他连接。

导出数据库内容

内存数据库的内容可以使用 Connection 的 iterdump() 方法保存。iterdump() 方法返回一个迭代器会产生一系列字符串,这些字符串一起构建 SQL 指令以重建数据库状态。

sqlite3_iterdump.py

import sqlite3

schema_filename = 'todo_schema.sql'

with sqlite3.connect(':memory:') as conn:
    conn.row_factory = sqlite3.Row

    print('Creating schema')
    with open(schema_filename, 'rt') as f:
        schema = f.read()
    conn.executescript(schema)

    print('Inserting initial data')
    conn.execute("""
    insert into project (name, description, deadline)
    values ('pymotw', 'Python Module of the Week',
            '2010-11-01')
    """)
    data = [
        ('write about select', 'done', '2010-10-03',
         'pymotw'),
        ('write about random', 'waiting', '2010-10-10',
         'pymotw'),
        ('write about sqlite3', 'active', '2010-10-17',
         'pymotw'),
    ]
    conn.executemany("""
    insert into task (details, status, deadline, project)
    values (?, ?, ?, ?)
    """, data)

    print('Dumping:')
    for text in conn.iterdump():
        print(text)

iterdump() 也可用于保存数据库为文件,但它对于一个不应被保存的数据库做防护会更有用。这里的输出结果在保证语法正确的情况下适配了页面的展示。

$ python3 sqlite3_iterdump.py

Creating schema
Inserting initial data
Dumping:
BEGIN TRANSACTION;
CREATE TABLE project (
    name        text primary key,
    description text,
    deadline    date
);
INSERT INTO "project" VALUES('pymotw','Python Module of the
Week','2010-11-01');
DELETE FROM "sqlite_sequence";
INSERT INTO "sqlite_sequence" VALUES('task',3);
CREATE TABLE task (
    id           integer primary key autoincrement not null,
    priority     integer default 1,
    details      text,
    status       text,
    deadline     date,
    completed_on date,
    project      text not null references project(name)
);
INSERT INTO "task" VALUES(1,1,'write about
select','done','2010-10-03',NULL,'pymotw');
INSERT INTO "task" VALUES(2,1,'write about
random','waiting','2010-10-10',NULL,'pymotw');
INSERT INTO "task" VALUES(3,1,'write about
sqlite3','active','2010-10-17',NULL,'pymotw');
COMMIT;

在 SQL 中使用 Python 函数

SQL 语法支持在查询语句中调用函数,无论是字段列表还是 select 语句或者 where 子句。 这项特性支持在将数据从查询中返回前处理数据,并可用于在不同数据格式之间转换,在纯 SQL 中执行笨拙的计算,并重用应用程序代码。

sqlite3_create_function.py

import codecs
import sqlite3

db_filename = 'todo.db'

def encrypt(s):
    print('Encrypting {!r}'.format(s))
    return codecs.encode(s, 'rot-13')

def decrypt(s):
    print('Decrypting {!r}'.format(s))
    return codecs.encode(s, 'rot-13')

with sqlite3.connect(db_filename) as conn:

    conn.create_function('encrypt', 1, encrypt)
    conn.create_function('decrypt', 1, decrypt)
    cursor = conn.cursor()

    # 原始的值
    print('Original values:')
    query = "select id, details from task"
    cursor.execute(query)
    for row in cursor.fetchall():
        print(row)

    print('\nEncrypting...')
    query = "update task set details = encrypt(details)"
    cursor.execute(query)

    print('\nRaw encrypted values:')
    query = "select id, details from task"
    cursor.execute(query)
    for row in cursor.fetchall():
        print(row)

    print('\nDecrypting in query...')
    query = "select id, decrypt(details) from task"
    cursor.execute(query)
    for row in cursor.fetchall():
        print(row)

    print('\nDecrypting...')
    query = "update task set details = decrypt(details)"
    cursor.execute(query)

Connection 中的 create_function() 方法会将函数暴露出。参数包括函数在 SQL 中使用时的名称,函数需要多少个参数,在 Python 中要暴露的函数。

$ python3 sqlite3_create_function.py

Original values:
(1, 'write about select')
(2, 'write about random')
(3, 'write about sqlite3')
(4, 'finish reviewing markup')
(5, 'revise chapter intros')
(6, 'subtitle')

Encrypting...
Encrypting 'write about select'
Encrypting 'write about random'
Encrypting 'write about sqlite3'
Encrypting 'finish reviewing markup'
Encrypting 'revise chapter intros'
Encrypting 'subtitle'

Raw encrypted values:
(1, 'jevgr nobhg fryrpg')
(2, 'jevgr nobhg enaqbz')
(3, 'jevgr nobhg fdyvgr3')
(4, 'svavfu erivrjvat znexhc')
(5, 'erivfr puncgre vagebf')
(6, 'fhogvgyr')

Decrypting in query...
Decrypting 'jevgr nobhg fryrpg'
Decrypting 'jevgr nobhg enaqbz'
Decrypting 'jevgr nobhg fdyvgr3'
Decrypting 'svavfu erivrjvat znexhc'
Decrypting 'erivfr puncgre vagebf'
Decrypting 'fhogvgyr'
(1, 'write about select')
(2, 'write about random')
(3, 'write about sqlite3')
(4, 'finish reviewing markup')
(5, 'revise chapter intros')
(6, 'subtitle')

Decrypting...
Decrypting 'jevgr nobhg fryrpg'
Decrypting 'jevgr nobhg enaqbz'
Decrypting 'jevgr nobhg fdyvgr3'
Decrypting 'svavfu erivrjvat znexhc'
Decrypting 'erivfr puncgre vagebf'
Decrypting 'fhogvgyr'

在查询中使用正则

Sqlite 支持多种与 SQL 语法相关的特殊用户函数。 例如,regexp 函数可以在查询中使用,用下面的语法可以检查字段的字符串值是否与正则表达式匹配。

SELECT * FROM table
WHERE column REGEXP '.*pattern.*'

下面这个示例使用 Python 的 re模块将一个函数和 regexp() 关联起来去测试正确的值。

sqlite3_regex.py

import re
import sqlite3

db_filename = 'todo.db'

def regexp(pattern, input):
    return bool(re.match(pattern, input))

with sqlite3.connect(db_filename) as conn:
    conn.row_factory = sqlite3.Row
    conn.create_function('regexp', 2, regexp)
    cursor = conn.cursor()

    pattern = '.*[wW]rite [aA]bout.*'

    cursor.execute(
        """
        select id, priority, details, status, deadline from task
        where details regexp :pattern
        order by deadline, priority
        """,
        {'pattern': pattern},
    )

    for row in cursor.fetchall():
        task_id, priority, details, status, deadline = row
        print('{:2d} [{:d}] {:<25} [{:<8}] ({})'.format(
            task_id, priority, details, status, deadline))

输出是所有匹配的字段的详细信息。

$ python3 sqlite3_regex.py

 1 [9] write about select        [done    ] (2016-04-25)
 2 [9] write about random        [done    ] (2016-08-22)
 3 [9] write about sqlite3       [active  ] (2017-07-31)

自定义聚合

聚合函数表示会将许多单个的值以某种方式处理汇总为一个值。例如常见的内置函数 avg() , min(), max()count()

 sqlite3 中的聚合 API 需要定义一个类包含两个方法。 step() 方法用于在处理查询时对每一个数据调用一次。 finalize() 方法用于在查询结束需要对聚合结果进行返回时调用。下面示例实现了一个计算模式的聚合器,它会返回输入的最频繁的值。

sqlite3_create_aggregate.py

import sqlite3
import collections

db_filename = 'todo.db'

class Mode:

    def __init__(self):
        self.counter = collections.Counter()

    def step(self, value):
        print('step({!r})'.format(value))
        self.counter[value] += 1

    def finalize(self):
        result, count = self.counter.most_common(1)[0]
        print('finalize() -> {!r} ({} times)'.format(
            result, count))
        return result

with sqlite3.connect(db_filename) as conn:
    conn.create_aggregate('mode', 1, Mode)

    cursor = conn.cursor()
    cursor.execute("""
    select mode(deadline) from task where project = 'pymotw'
    """)
    row = cursor.fetchone()
    print('mode(deadline) is:', row[0])

聚合类通过 Connection 的 create_aggregate() 方法注册。参数为在 SQL 语句中要使用的方法名称,聚合类中 step() 方法需要的参数数量,以及聚合类。

$ python3 sqlite3_create_aggregate.py

step('2016-04-25')
step('2016-08-22')
step('2017-07-31')
step('2016-11-30')
step('2016-08-20')
step('2016-11-01')
finalize() -> '2016-11-01' (1 times)
mode(deadline) is: 2016-11-01

连接在线程中共享

由于历史原因,一些旧版本的 SQLite 中, Connection 对象不能在不同线程间共享。每个线程必须创建自己的数据库连接对象。

sqlite3_threading.py

import sqlite3
import sys
import threading
import time

db_filename = 'todo.db'
isolation_level = None  # 自动提交模式

def reader(conn):
    print('Starting thread')
    try:
        cursor = conn.cursor()
        cursor.execute('select * from task')
        cursor.fetchall()
        print('results fetched')
    except Exception as err:
        print('ERROR:', err)

if __name__ == '__main__':
    with sqlite3.connect(db_filename,
                         isolation_level=isolation_level,
                         ) as conn:
        t = threading.Thread(name='Reader 1',
                             target=reader,
                             args=(conn,),
                             )
        t.start()
        t.join()

如上例,尝试在线程之间共享一个连接时会引发异常。

$ python3 sqlite3_threading.py

Starting thread
ERROR: SQLite objects created in a thread can only be used in that
same thread.The object was created in thread id 140735234088960
and this is thread id 123145307557888

限制对数据的访问

尽管 SQLite 不像其他大型关系数据库由用户权限控制功能,但它有限制字段访问的相关机制。每个连接可以安装一个 授权函数 按所需规则在运行时允许或拒绝字段的访问。授权函数在 SQL 语句解析时被调用,需要传入5个参数。第一个参数是正在执行操作的操作类型(例如读取,写入,删除等),其余的参数取决于第一个参数。例如 SQLITE_READ 操作,其余的参数是表名称,字段名称,where 子句在 SQL 中的位置(主查询,触发器等),和一个 None

sqlite3_set_authorizer.py

import sqlite3

db_filename = 'todo.db'

def authorizer_func(action, table, column, sql_location, ignore):
    print('\nauthorizer_func({}, {}, {}, {}, {})'.format(
        action, table, column, sql_location, ignore))

    response = sqlite3.SQLITE_OK  # be permissive by default

    if action == sqlite3.SQLITE_SELECT:
        print('requesting permission to run a select statement')
        response = sqlite3.SQLITE_OK

    elif action == sqlite3.SQLITE_READ:
        print('requesting access to column {}.{} from {}'.format(
            table, column, sql_location))
        if column == 'details':
            print('  ignoring details column')
            response = sqlite3.SQLITE_IGNORE
        elif column == 'priority':
            print('  preventing access to priority column')
            response = sqlite3.SQLITE_DENY

    return response

with sqlite3.connect(db_filename) as conn:
    conn.row_factory = sqlite3.Row
    conn.set_authorizer(authorizer_func)

    print('Using SQLITE_IGNORE to mask a column value:')
    cursor = conn.cursor()
    cursor.execute("""
    select id, details from task where project = 'pymotw'
    """)
    for row in cursor.fetchall():
        print(row['id'], row['details'])

    print('\nUsing SQLITE_DENY to deny access to a column:')
    cursor.execute("""
    select id, priority from task where project = 'pymotw'
    """)
    for row in cursor.fetchall():
        print(row['id'], row['details'])

这个示例使用 SQLITE_IGNORE 将查询结果中 task.details 字段的字符串值替换为空值。它还通过 SQLITE_DENY 限制了去查询 task.priority 字段的值,如果去查询 SQLite 会引发一个异常。

$ python3 sqlite3_set_authorizer.py

Using SQLITE_IGNORE to mask a column value:

authorizer_func(21, None, None, None, None)
requesting permission to run a select statement

authorizer_func(20, task, id, main, None)
requesting access to column task.id from main

authorizer_func(20, task, details, main, None)
requesting access to column task.details from main
  ignoring details column

authorizer_func(20, task, project, main, None)
requesting access to column task.project from main
1 None
2 None
3 None
4 None
5 None
6 None

Using SQLITE_DENY to deny access to a column:

authorizer_func(21, None, None, None, None)
requesting permission to run a select statement

authorizer_func(20, task, id, main, None)
requesting access to column task.id from main

authorizer_func(20, task, priority, main, None)
requesting access to column task.priority from main
  preventing access to priority column
Traceback (most recent call last):
  File "sqlite3_set_authorizer.py", line 53, in <module>
    """)
sqlite3.DatabaseError: access to task.priority is prohibited

 sqlite3 使用 SQLITE_ 前缀定义可以执行的操作常量。每种类型的 SQL 语句可以被标记,且也可以很好的控制对单个字段的访问。

扩展阅读

  • sqlite3 标准库文档
  • PEP 249 -- DB API 2.0 规范 (访问关系型数据库的模块需要提供的接口)
  • SQLite -- SQLite 官方网站
  • shelve -- 通过键值存储任意 Python 对象
  • SQLAlchemy -- 流行的对象关系映射系统,支持 SQLite 以及很多其他关系型数据库

本文章首发在 LearnKu.com 网站上。

本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
上一篇 下一篇
Summer
讨论数量: 0
发起讨论 只看当前版本


暂无话题~