17.7. queue - 同步队列类

源代码: Lib / queue.py

queue模块实现多生产者,多消费者队列。它特别适用于信息必须在多个线程间安全地交换的多线程程序中。该模块中的Queue类实现了所有需要的锁定语义。这取决于Python中线程支持的可用性;请参见threading模块。

模块实现了三类队列,主要差别在于取得数据的顺序上。在FIFO(First In First Out,先进先出)队列中,最早加入的任务会被最先得到。在LIFO(Last In First Out,后进先出)队列中,最后加入的任务会被最先得到(就像栈一样)。在优先队列中,任务被保持有序(使用heapq模块),拥有最小值的任务(优先级最高)被最先得到。

queue模块定义了以下类和异常:

class queue.Queue(maxsize=0)

构造一个FIFO队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

class queue.LifoQueue(maxsize=0)

构造一个LIFO队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

class queue.PriorityQueue(maxsize=0)

构造一个优先队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

首先检索最低值条目(最低值条目是由sorted(list(entries))[0]返回的条目。任务的典型模式就是如(priority_number, data)这样的元组。

exception queue.Empty

在空的get()对象上调用非阻塞的get_nowait()(或者Queue)会抛出此异常。

exception queue.Full

在满的put()对象上调用非阻塞的put_nowait()(或者Queue)会抛出此异常。

17.7.1. Queue Objects

Queue对象(QueueLifoQueuePriorityQueue)提供了下述的公共方法。

Queue.qsize()

返回队列的近似大小。注意,qsize()> 0不保证随后的get()不会阻塞,qsize() < maxsize也不会保证put()不会被阻塞。

Queue.empty()

如果队列为空,返回True,否则返回False如果empty()返回True,它不保证后续调用put()不会阻塞。类似的,如果empty()返回False也不能保证接下来的get()调用不会被阻塞。

Queue.full()

如果队列已满,则返回True,否则返回False如果full()返回True,它不保证后续调用get()不会阻塞。类似的,如果full()返回False并不能保证接下来的put()调用不会被阻塞。

Queue.put(item, block=True, timeout=None)

item放入队列中。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),如有必要(比如队列满),阻塞调用线程,直到有空闲槽可用。如果timeout是正数,则它最多阻塞timeout秒,如果在该时间内没有空闲插槽,则引发Full异常。如果block为假,如果有空闲槽可用将数据放入队列,否则立即抛出Full异常(非阻塞调用,timeout被忽略)。

Queue.put_nowait(item)

等同于put(item, False)(非阻塞调用)。

Queue.get(block=True, timeout=None)

从队列中移除并返回一个数据。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),阻塞调用进程直到有数据可用。如果超时是正数,则它最多阻塞超时秒,如果在该时间内没有可用的项,则引发Empty异常。如果block为假,如果有数据可用返回数据,否则立即抛出Empty异常(非阻塞调用,timeout被忽略)。

Queue.get_nowait()

等同于get(False)(非阻塞调用)。

提供了两种方法来支持跟踪入队任务是否已完全由守护进程消费者线程处理。

Queue.task_done()

意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

如果该方法被调用的次数多于被放入队列中的任务的个数,ValueError异常会被抛出。

Queue.join()

阻塞调用线程,直到队列中的所有任务被处理掉。

只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()以指示该项目已检索并且其上的所有工作都已完成时,计数将减少。当未完成的任务数降到0,join()解除阻塞。

等待入队任务被怎样完成的例子:

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

参见

multiprocessing.Queue
用于在多进程(而不是多线程)上下文中使用的队列类。

collections.deque是不需要锁定的快速原子append()popleft()操作的无界队列的替代实现。