queue — 同步队列类

源代码: Lib/queue.py


queue 模块实现了多生产者、多消费者队列。在需要多个线程之间安全交换信息的线程编程中,它特别有用。Queue 类在这个模块中实现了所有必需的锁定语义。

该模块实现了三种类型的队列,它们仅在检索条目的顺序上有所不同。在 FIFO 队列中,首先添加的任务首先被检索。在 LIFO 队列中,最近添加的条目首先被检索(像堆栈一样操作)。对于优先级队列,条目保持排序(使用 heapq 模块),并且值最低的条目首先被检索。

在内部,这三种类型的队列使用锁来暂时阻塞竞争线程;然而,它们并非设计用于处理同一线程内的重入。

此外,该模块实现了一种“简单”的 FIFO 队列类型,即 SimpleQueue,其特定实现以较小的功能换取额外的保证。

queue 模块定义了以下类和异常

class queue.Queue(maxsize=0)

构造一个 FIFO 队列。 maxsize 是一个整数,设置了可以放入队列的项数的上限。一旦达到此大小,插入将阻塞,直到队列项被消费。如果 maxsize 小于或等于零,则队列大小是无限的。

class queue.LifoQueue(maxsize=0)

构造一个 LIFO 队列。 maxsize 是一个整数,设置了可以放入队列的项数的上限。一旦达到此大小,插入将阻塞,直到队列项被消费。如果 maxsize 小于或等于零,则队列大小是无限的。

class queue.PriorityQueue(maxsize=0)

构造一个优先级队列。 maxsize 是一个整数,设置了可以放入队列的项数的上限。一旦达到此大小,插入将阻塞,直到队列项被消费。如果 maxsize 小于或等于零,则队列大小是无限的。

值最低的条目首先被检索(值最低的条目是 min(entries) 将返回的那个)。条目的典型模式是形如 (priority_number, data) 的元组。

如果 data 元素不可比较,则可以将数据封装在一个忽略数据项而只比较优先级数字的类中

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

构造一个无界 FIFO 队列。简单队列缺乏高级功能,例如任务跟踪。

在 3.7 版本加入。

exception queue.Empty

当在空的 Queue 对象上调用非阻塞的 get() (或 get_nowait())时引发的异常。

exception queue.Full

当在已满的 Queue 对象上调用非阻塞的 put() (或 put_nowait())时引发的异常。

exception queue.ShutDown

当在已关闭的 Queue 对象上调用 put()get() 时引发的异常。

在 3.13 版本加入。

队列对象

队列对象(QueueLifoQueuePriorityQueue)提供下面描述的公共方法。

Queue.qsize()

返回队列的大致大小。请注意,qsize() > 0 不保证随后的 get() 不会阻塞,qsize() < maxsize 也不保证 put() 不会阻塞。

Queue.empty()

如果队列为空,则返回 True,否则返回 False。如果 empty() 返回 True,并不保证随后调用 put() 不会阻塞。同样,如果 empty() 返回 False,也不保证随后调用 get() 不会阻塞。

Queue.full()

如果队列已满,则返回 True,否则返回 False。如果 full() 返回 True,并不保证随后调用 get() 不会阻塞。同样,如果 full() 返回 False,也不保证随后调用 put() 不会阻塞。

Queue.put(item, block=True, timeout=None)

item 放入队列。如果可选参数 block 为真且 timeoutNone (默认值),则必要时阻塞,直到有空闲槽可用。如果 timeout 是一个正数,它将最多阻塞 timeout 秒,如果在此时间内没有空闲槽可用,则引发 Full 异常。否则(block 为假),如果有空闲槽立即可用,则将项放入队列,否则引发 Full 异常(在这种情况下 timeout 被忽略)。

如果队列已关闭,则引发 ShutDown

Queue.put_nowait(item)

等同于 put(item, block=False)

Queue.get(block=True, timeout=None)

从队列中移除并返回一个项。如果可选参数 block 为真且 timeoutNone (默认值),则必要时阻塞,直到有项可用。如果 timeout 是一个正数,它将最多阻塞 timeout 秒,如果在此时间内没有项可用,则引发 Empty 异常。否则(block 为假),如果有项立即可用,则返回一个项,否则引发 Empty 异常(在这种情况下 timeout 被忽略)。

在 POSIX 系统上 3.0 版本之前,以及在所有版本的 Windows 上,如果 block 为真且 timeoutNone,此操作将进入底层锁上的不可中断等待。这意味着不会发生任何异常,特别是 SIGINT 不会触发 KeyboardInterrupt

如果队列已关闭且为空,或者队列已立即关闭,则引发 ShutDown

Queue.get_nowait()

等同于 get(False)

提供了两种方法来支持跟踪已入队任务是否已被守护消费者线程完全处理。

Queue.task_done()

指示一个先前入队的任务已完成。由队列消费者线程使用。对于每个用于获取任务的 get() 调用,随后的 task_done() 调用会告知队列该任务的处理已完成。

如果 join() 当前正在阻塞,它将在所有项目都被处理后恢复(这意味着对于队列中每个 put() 进去的项目,都收到了一个 task_done() 调用)。

如果调用次数多于队列中放入的项数,则引发 ValueError

Queue.join()

阻塞直到队列中的所有项目都被获取和处理。

每当有项目添加到队列时,未完成任务的计数就会增加。每当消费者线程调用 task_done() 来表示该项目已被检索并且其所有工作都已完成时,计数就会减少。当未完成任务的计数降至零时,join() 会解除阻塞。

等待任务完成

等待入队任务完成的示例

import threading
import queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

终止队列

当不再需要时,Queue 对象可以正常结束直到为空,或通过硬关闭立即终止。

Queue.shutdown(immediate=False)

Queue 实例置于关闭模式。

队列无法再增长。将来对 put() 的调用将引发 ShutDown。目前阻塞的 put() 调用者将被解除阻塞,并在先前阻塞的线程中引发 ShutDown

如果 immediate 为 false(默认值),队列可以通过 get() 调用正常结束,以提取已加载的任务。

如果对每个剩余任务都调用了 task_done(),则挂起的 join() 将正常解除阻塞。

一旦队列为空,将来对 get() 的调用将引发 ShutDown

如果 immediate 为真,队列将立即终止。队列将被清空。所有 join() 的调用者都会解除阻塞,无论未完成任务的数量如何。阻塞的 get() 调用者也会解除阻塞,并将引发 ShutDown,因为队列已为空。

当使用 immediate 设置为 true 的 join() 时请谨慎。这会解除 join 的阻塞,即使任务尚未完成任何工作,违反了连接队列的通常不变量。

在 3.13 版本加入。

SimpleQueue 对象

SimpleQueue 对象提供以下公共方法。

SimpleQueue.qsize()

返回队列的大致大小。请注意,qsize() > 0 不保证随后的 get() 不会阻塞。

SimpleQueue.empty()

如果队列为空,则返回 True,否则返回 False。如果 empty() 返回 False,并不保证随后调用 get() 不会阻塞。

SimpleQueue.put(item, block=True, timeout=None)

item 放入队列。此方法永不阻塞且始终成功(除了潜在的低级错误,例如内存分配失败)。可选参数 blocktimeout 被忽略,仅为了与 Queue.put() 兼容而提供。

CPython 实现细节: 此方法有一个 C 实现,它是可重入的。也就是说,在同一个线程中,一个 put()get() 调用可以被另一个 put() 调用中断,而不会死锁或破坏队列内部状态。这使得它适用于析构函数,例如 __del__ 方法或 weakref 回调。

SimpleQueue.put_nowait(item)

等同于 put(item, block=False),为了与 Queue.put_nowait() 兼容而提供。

SimpleQueue.get(block=True, timeout=None)

从队列中移除并返回一个项。如果可选参数 block 为真且 timeoutNone (默认值),则必要时阻塞,直到有项可用。如果 timeout 是一个正数,它将最多阻塞 timeout 秒,如果在此时间内没有项可用,则引发 Empty 异常。否则(block 为假),如果有项立即可用,则返回一个项,否则引发 Empty 异常(在这种情况下 timeout 被忽略)。

SimpleQueue.get_nowait()

等同于 get(False)

参见

multiprocessing.Queue

一个用于多进程(而非多线程)上下文的队列类。

collections.deque 是无界队列的另一种实现,具有快速原子 append()popleft() 操作,无需锁定且支持索引。