8.10. QueueA synchronized queue class

Note

在 Python 3中,Queue 模块被重命名为 queue在转换您的代码到 Python 3 的时候,2to3 工具会自动调整 import 语句。

源码: Lib/Queue.py


Queue 模块实现了多生产者、多消费者队列。它特别适用于信息必须在多个线程间安全地交换的多线程程序中。这个模块中的 Queue 类实现了所有必须的锁语义。它依赖于 Python 中的线程支持的可用性;参见threading 模块。

The module implements three types of queue, which differ only in the order in which the entries are retrieved.In a FIFO queue, the first tasks added are the first retrieved.In a LIFO queue, the most recently added entry is the first retrieved (operating like a stack).With a priority queue, the entries are kept sorted (using the heapq module) and the lowest valued entry is retrieved first.

Queue 模块定义了下列的类和异常:

class Queue.Queue(maxsize=0)

Constructor for a FIFO queue.maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

class Queue.LifoQueue(maxsize=0)

构造一个LIFO队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

出现于版本2.6.

class Queue.PriorityQueue(maxsize=0)

构造一个优先队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

拥有最小值的任务会被最先得到(sorted(list(entries))[0]的返回值即为拥有最小值的任务)。任务的典型模式就是如(priority_number, data)这样的元组。

出现于版本2.6.

exception Queue.Empty

在空的Queue对象上调用非阻塞的get()(或者get_nowait())会抛出此异常。

exception Queue.Full

在满的Queue对象上调用非阻塞的put()(或者put_nowait())会抛出此异常。

8.10.1. Queue 对象

Queue对象(QueueLifoQueuePriorityQueue)提供了下述的公共方法。

Queue.qsize()

返回队列的近似大小。注意,队列大小大于0并不保证接下来的get()调用不会被阻塞,队列大小小于maxsize也不保证接下来的put()调用不会被阻塞。

Queue.empty()

如果队列为空返回True,否则返回False如果empty()返回True并不保证接下来的put()调用不会被阻塞。类似的,如果empty()返回False也不能保证接下来的get()调用不会被阻塞。

Queue.full()

如果队列是满的返回True,否则返回False如果full()返回True并不能保证接下来的get()调用不会被阻塞。类似的,如果full()返回False并不能保证接下来的put()调用不会被阻塞。

Queue.put(item[, block[, timeout]])

item放入队列中。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),如有必要(比如队列满),阻塞调用线程,直到有空闲槽可用。如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空闲槽可用,抛出Full异常(带超时的阻塞调用)。如果block为假,如果有空闲槽可用将数据放入队列,否则立即抛出Full异常(非阻塞调用,timeout被忽略)。

出现于版本2.3: timeout参数。

Queue.put_nowait(item)

等同于put(item, False)(非阻塞调用)。

Queue.get([block[, timeout]])

从队列中移除并返回一个数据。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),阻塞调用进程直到有数据可用。如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无数据可用,抛出Empty异常(带超时的阻塞调用)。如果block为假,如果有数据可用返回数据,否则立即抛出Empty异常(非阻塞调用,timeout被忽略)。

出现于版本2.3: timeout参数。

Queue.get_nowait()

等同于get(False)(非阻塞调用)。

为了跟踪入队任务被消费者线程完全的处理掉,Queue对象提供了两个额外的方法。

Queue.task_done()

意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(meaning that a task_done() call was received for every item that had been put() into the queue).

如果该方法被调用的次数多于被放入队列中的任务的个数,ValueError异常会被抛出。

出现于版本2.5。

Queue.join()

阻塞调用线程,直到队列中的所有任务被处理掉。

只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

出现于版本2.5。

等待入队任务被怎样完成的例子:

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done