3.7. queue — 线程安全的 FIFO 队列
目的:提供线程安全的 FIFO 实现
queue
模块提供了一个适用于多线程编程的先进先出(FIFO)数据结构。它可以用来安全地在生产者和消费者线程之间传递消息或其他数据。锁是调用者来处理的,所有多个线程能够安全且容易的使用同样的 Queue
实例工作。Queue
的大小(它包含的元素的数量)可能会受到限制,以调节内存的适用或处理。
注意#
这个讨论假定你已经理解了队列的一般性质。如果您不了解,您可能需要阅读一些参考资料,然后再继续。
基本的 FIFO 队列#
Queue
类实现了一个基本的先进先出的容器。使用 put()
将元素添加到序列的一端,并适用 get()
从另一端移除。
queue_fifo.py
import queue
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
本示例使用单个线程来说明将元素按照插入顺序从队列中删除。
$ python3 queue_fifo.py
0 1 2 3 4
LIFO Queue (后进先出型队列)#
与标准 FIFO (先进先出型队列)队列 Queue
相反, LifoQueue
使用的是后进先出的模式(与普通的栈结构类似)。
queue_lifo.py
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
最后一次被 put
进队列的数据会首先被 get
出来,看执行结果:
$ python3 queue_lifo.py
4 3 2 1 0
Priority Queue (优先队列)#
有时,我们想要以队列中数据的某些属性为序进行处理。比如,工资部门想要打印出的工作名单可能与开发者所想的不一样。这时使用 PriorityQueue
就比较好,它内部进行的排序方式是以其数据而定,而不是数据添加到队列中时的顺序。
queue_priority.py
import functools
import queue
import threading
@functools.total_ordering
class Job:
def __init__(self, priority, description):
self.priority = priority
self.description = description
print('New job:', description)
return
def __eq__(self, other):
try:
return self.priority == other.priority
except AttributeError:
return NotImplemented
def __lt__(self, other):
try:
return self.priority < other.priority
except AttributeError:
return NotImplemented
q = queue.PriorityQueue()
q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))
def process_job(q):
while True:
next_job = q.get()
print('Processing job:', next_job.description)
q.task_done()
workers = [
threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
w.setDaemon(True)
w.start()
q.join()
例子中使用了多个线程来取出工作信息,当 get()
发生时,所取出的数据都是基于各个数据的优先级(例子中是 priority 属性)来返回的。消费者线程运行时,其处理队列中的数据的顺序取决于线程上下文切换。
$ python3 queue_priority.py
New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job
基于线程的播客客户端#
本播客客户端代码用于演示 Queue
类如何同多个线程一起工作。本程序会读取一个或多个 RSS 源,并从每个源里获取出 5 个最新的广播数据添加到下载队列中,之后使用线程以并行的方式同时进行多个下载,其结构的部署正好演示了 queue
模块的使用。
首先,创建一些后续所需要参数,通常情况下,这些参数应该来自于用户输入(比如用户输入自己喜欢的源或者从数据库中获取)。本例使用硬编码的值来进行演示。
fetch_podcasts.py
from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse
import feedparser
# 定义一些全局变量
num_fetch_threads = 2
enclosure_queue = Queue()
# 真实场景不会使用硬编码数据。
feed_urls = [
'http://talkpython.fm/episodes/rss',
]
def message(s):
print('{}: {}'.format(threading.current_thread().name, s))
download_enclosures()
函数会在工作线程中运行,同时使用 urllib
来进行下载。
def download_enclosures(q):
"""
本函数是一个工作线程函数。
本函数会不断处理从队列中取出的数据。
这些守护线程会被放入一个无限循环中,只有当主线程结束时才会退出。
"""
while True:
message('looking for the next enclosure')
url = q.get()
filename = url.rpartition('/')[-1]
message('downloading {}'.format(filename))
response = urllib.request.urlopen(url)
data = response.read()
# 将所下载的文件保存在当前目录中
message('writing to {}'.format(filename))
with open(filename, 'wb') as outfile:
outfile.write(data)
q.task_done()
一旦定义了线程的目标函数( target 参数),就可以开始这个线程了。当 download_enclosure()
函数执行到 url = q.get()
时,就会阻塞并且等待队列里有什么数据返回。也就是说在队列中有任何数据前,启动线程也是安全的。
# 设置一些线程来获取广播数据
for i in range(num_fetch_threads):
worker = threading.Thread(
target=download_enclosures,
args=(enclosure_queue,),
name='worker-{}'.format(i),
)
worker.setDaemon(True)
worker.start()
下一步我们使用 feedparser
模块来取得其中的内容,并把广播数据的 URL 塞入队列。只要一有 URL 被塞入队列,其中的一个工作线程就会获取出它并开始下载。这个循环会持续不断地添加数据进队列直到这个源被榨干,同时工作线程也会不断轮流从队列中获取出 URL 并进行下载。
# 下载源的内容并将其中的 URL 塞入队列。
for url in feed_urls:
response = feedparser.parse(url, agent='fetch_podcasts.py')
for entry in response['entries'][:5]:
for enclosure in entry.get('enclosures', []):
parsed_url = urlparse(enclosure['url'])
message('queuing {}'.format(
parsed_url.path.rpartition('/')[-1]))
enclosure_queue.put(enclosure['url'])
唯一不要忘记做的,就是等待队列再次被榨干,请使用 join()
。
# 等待队列被榨干,也就是我们已经下载了所有要下载的东西。
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')
运行的话会出现类似的打印信息
$ python3 fetch_podcasts.py
worker-0: looking for the next enclosure
worker-1: looking for the next enclosure
MainThread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3
MainThread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3
MainThread: queuing openstack-cloud-computing-built-on-python.mp3
MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3
MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3
MainThread: *** main thread waiting
worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3
worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3
worker-0: looking for the next enclosure
worker-0: downloading openstack-cloud-computing-built-on-python.mp3
worker-1: looking for the next enclosure
worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3
worker-0: looking for the next enclosure
worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3
worker-1: looking for the next enclosure
worker-0: looking for the next enclosure
MainThread: *** done
实际输出取决于 RSS 源的内容。
参考#
- 队列标准库文档
collections
中的 deque --- 双端队列- 数据结构 --- 队列 -- 维基百科解读队列
- 先进先出参考 -- 维基百科解读先进,先出的数据结构
- feedparser 模块 -- 一个用于编译 RSS 和 Atom 源的模块,由 Mark Pilgrim 创建,Kurt McKee 维护。
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
推荐文章: