3.7. queue — 线程安全的 FIFO 队列

目的:提供线程安全的FIFO实现

queue 模块提供了一个适用于多线程编程的先进先出(FIFO)数据结构。它可以用来安全地在生产者和消费者线程之间传递消息或其他数据。锁是调用者来处理的,所有多个线程能够安全且容易的使用同样的Queue 实例工作。Queue 的大小(它包含的元素的数量)可能会受到限制,以调节内存的适用或处理。

注意

这个讨论假定你已经理解了队列的一般性质。如果您不了解,您可能需要阅读一些参考资料,然后再继续。

基本的 FIFO 队列

Queue 类实现了一个基本的先进先出的容器。使用 put() 将元素添加到序列的一端,并适用 get() 从另一端移除。

queue_fifo.py

import queue

q = queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

本示例使用单个线程来说明将元素按照插入顺序从队列中删除。

$ python3 queue_fifo.py

0 1 2 3 4

LIFO Queue (后进先出型队列)

与标准 FIFO (先进先出型队列)队列 Queue 相反, LifoQueue 使用的是后进先出的模式(与普通的栈结构类似)。

queue_lifo.py

import queue

q = queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

最后一次被 put 进队列的数据会首先被 get 出来,看执行结果:

$ python3 queue_lifo.py

4 3 2 1 0

Priority Queue (优先队列)

有时,我们想要以队列中数据的某些属性为序进行处理。比如,工资部门想要打印出的工作名单可能与开发者所想的不一样。这时使用 PriorityQueue 就比较好,它内部进行的排序方式是以其数据而定,而不是数据添加到队列中时的顺序。

queue_priority.py

import functools
import queue
import threading

@functools.total_ordering
class Job:

    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print('New job:', description)
        return

    def __eq__(self, other):
        try:
            return self.priority == other.priority
        except AttributeError:
            return NotImplemented

    def __lt__(self, other):
        try:
            return self.priority < other.priority
        except AttributeError:
            return NotImplemented

q = queue.PriorityQueue()

q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))

def process_job(q):
    while True:
        next_job = q.get()
        print('Processing job:', next_job.description)
        q.task_done()

workers = [
    threading.Thread(target=process_job, args=(q,)),
    threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
    w.setDaemon(True)
    w.start()

q.join()

例子中使用了多个线程来取出工作信息,当 get() 发生时,所取出的数据都是基于各个数据的优先级(例子中是 priority 属性)来返回的。消费者线程运行时,其处理队列中的数据的顺序取决于线程上下文切换。

$ python3 queue_priority.py

New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job

基于线程的播客客户端

本播客客户端代码用于演示 Queue类如何同多个线程一起工作。本程序会读取一个或多个 RSS 源,并从每个源里获取出5个最新的广播数据添加到下载队列中,之后使用线程以并行的方式同时进行多个下载,其结构的部署正好演示了 queue 模块的使用。

首先,创建一些后续所需要参数,通常情况下,这些参数应该来自于用户输入(比如用户输入自己喜欢的源或者从数据库中获取)。本例使用硬编码的值来进行演示。

fetch_podcasts.py

from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse

import feedparser

# 定义一些全局变量
num_fetch_threads = 2
enclosure_queue = Queue()

# 真实场景不会使用硬编码数据。
feed_urls = [
    'http://talkpython.fm/episodes/rss',
]

def message(s):
    print('{}: {}'.format(threading.current_thread().name, s))

download_enclosures() 函数会在工作线程中运行,同时使用 urllib 来进行下载。

def download_enclosures(q):
    """
        本函数是一个工作线程函数。
        本函数会不断处理从队列中取出的数据。
        这些守护线程会被放入一个无限循环中,只有当主线程结束时才会退出。
    """
    while True:
        message('looking for the next enclosure')
        url = q.get()
        filename = url.rpartition('/')[-1]
        message('downloading {}'.format(filename))
        response = urllib.request.urlopen(url)
        data = response.read()
        # 将所下载的文件保存在当前目录中
        message('writing to {}'.format(filename))
        with open(filename, 'wb') as outfile:
            outfile.write(data)
        q.task_done()

一旦定义了线程的目标函数( target 参数),就可以开始这个线程了。当 download_enclosure() 函数执行到 url = q.get() 时,就会阻塞并且等待队列里有什么数据返回。也就是说在队列中有任何数据前,启动线程也是安全的。

# 设置一些线程来获取广播数据
for i in range(num_fetch_threads):
    worker = threading.Thread(
        target=download_enclosures,
        args=(enclosure_queue,),
        name='worker-{}'.format(i),
    )
    worker.setDaemon(True)
    worker.start()

下一步我们使用 feedparser 模块来取得其中的内容,并把广播数据的 URL 塞入队列。只要一有 URL 被塞入队列,其中的一个工作线程就会获取出它并开始下载。这个循环会持续不断地添加数据进队列直到这个源被榨干,同时工作线程也会不断轮流从队列中获取出 URL 并进行下载。

# 下载源的内容并将其中的 URL 塞入队列。
for url in feed_urls:
    response = feedparser.parse(url, agent='fetch_podcasts.py')
    for entry in response['entries'][:5]:
        for enclosure in entry.get('enclosures', []):
            parsed_url = urlparse(enclosure['url'])
            message('queuing {}'.format(
                parsed_url.path.rpartition('/')[-1]))
            enclosure_queue.put(enclosure['url'])

唯一不要忘记做的,就是等待队列再次被榨干,请使用 join()

# 等待队列被榨干,也就是我们已经下载了所有要下载的东西。
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')

运行的话会出现类似的打印信息

$ python3 fetch_podcasts.py

worker-0: looking for the next enclosure
worker-1: looking for the next enclosure
MainThread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3
MainThread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3
MainThread: queuing openstack-cloud-computing-built-on-python.mp3
MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3
MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3
MainThread: *** main thread waiting
worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3
worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3
worker-0: looking for the next enclosure
worker-0: downloading openstack-cloud-computing-built-on-python.mp3
worker-1: looking for the next enclosure
worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3
worker-0: looking for the next enclosure
worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3
worker-1: looking for the next enclosure
worker-0: looking for the next enclosure
MainThread: *** done

实际输出取决于 RSS 源的内容。

参考

本文章首发在 LearnKu.com 网站上。
上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~