queue — 一个同步队列类

源代码: Lib/queue.py


queue 模块实现了多生产者、多消费者队列。当需要在多个线程之间安全地交换信息时,它在线程编程中尤其有用。此模块中的 Queue 类实现了所有必需的锁定语义。

该模块实现了三种类型的队列,它们之间的区别仅在于检索条目的顺序。在 FIFO 队列中,先添加的任务先被检索。在 LIFO 队列中,最近添加的条目先被检索(像堆栈一样操作)。使用优先级队列,条目会保持排序(使用 heapq 模块),并且首先检索值最低的条目。

在内部,这三种类型的队列使用锁来临时阻止竞争线程;但是,它们并非旨在处理线程内的重入。

此外,该模块还实现了一个“简单”的 FIFO 队列类型 SimpleQueue,其特定实现提供了额外的保证,以换取较小的功能。

queue 模块定义了以下类和异常

class queue.Queue(maxsize=0)

用于 FIFO 队列的构造函数。maxsize 是一个整数,用于设置可以放入队列中的项目数的上限。一旦达到此大小,插入操作将阻塞,直到队列中的项目被消耗完。如果 maxsize 小于或等于零,则队列大小是无限的。

class queue.LifoQueue(maxsize=0)

用于 LIFO 队列的构造函数。maxsize 是一个整数,用于设置可以放入队列中的项目数的上限。一旦达到此大小,插入操作将阻塞,直到队列中的项目被消耗完。如果 maxsize 小于或等于零,则队列大小是无限的。

class queue.PriorityQueue(maxsize=0)

优先级队列的构造函数。maxsize 是一个整数,用于设置可以放入队列中的项目数的上限。一旦达到此大小,插入操作将阻塞,直到队列中的项目被消耗完。如果 maxsize 小于或等于零,则队列大小是无限的。

首先检索值最低的条目(值最低的条目是由 min(entries) 返回的条目)。条目的典型模式是元组形式:(priority_number, data)

如果 data 元素不可比较,则可以将数据包装在一个忽略数据项并仅比较优先级编号的类中

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

无界 FIFO 队列的构造函数。简单队列缺少任务跟踪等高级功能。

在 3.7 版本中添加。

exception queue.Empty

当在空的 Queue 对象上调用非阻塞的 get() (或 get_nowait()) 时引发的异常。

exception queue.Full

当在已满的 Queue 对象上调用非阻塞的 put() (或 put_nowait()) 时引发的异常。

exception queue.ShutDown

当在已关闭的 Queue 对象上调用 put()get() 时引发的异常。

在 3.13 版本中添加。

队列对象

队列对象 (QueueLifoQueuePriorityQueue) 提供如下所述的公共方法。

Queue.qsize()

返回队列的大概大小。请注意,qsize() > 0 不能保证后续的 get() 不会阻塞,qsize() < maxsize 也不能保证 put() 不会阻塞。

Queue.empty()

如果队列为空,则返回 True,否则返回 False。如果 empty() 返回 True,则不能保证后续调用 put() 不会阻塞。同样,如果 empty() 返回 False,则不能保证后续调用 get() 不会阻塞。

Queue.full()

如果队列已满,则返回 True,否则返回 False。如果 full() 返回 True,则不能保证后续调用 get() 不会阻塞。同样,如果 full() 返回 False,则不能保证后续调用 put() 不会阻塞。

Queue.put(item, block=True, timeout=None)

item 放入队列。如果可选参数 block 为 true 且 timeoutNone(默认值),则在有空闲槽位可用之前阻塞(如有必要)。如果 timeout 为正数,则最多阻塞 timeout 秒,如果在该时间内没有可用的空闲槽位,则引发 Full 异常。否则(block 为 false),如果有空闲槽位立即可用,则将项放入队列,否则引发 Full 异常(在这种情况下会忽略 timeout)。

如果队列已关闭,则引发 ShutDown

Queue.put_nowait(item)

等效于 put(item, block=False)

Queue.get(block=True, timeout=None)

从队列中移除并返回一个项。如果可选参数 block 为 true 且 timeoutNone (默认值),则在必要时阻塞,直到有项可用。如果 timeout 为正数,则最多阻塞 timeout 秒,如果在该时间内没有可用的项,则引发 Empty 异常。否则(block 为 false),如果立即有项可用,则返回该项,否则引发 Empty 异常(在这种情况下会忽略 timeout)。

在 POSIX 系统上的 3.0 之前的版本中,以及在 Windows 上的所有版本中,如果 block 为 true 且 timeoutNone,则此操作会在底层锁上进入不可中断的等待。这意味着不会发生任何异常,特别是 SIGINT 不会触发 KeyboardInterrupt

如果队列已关闭且为空,或者队列已立即关闭,则会引发 ShutDown

Queue.get_nowait()

等同于 get(False)

提供了两种方法来支持跟踪排队的任务是否已由守护进程消费者线程完全处理。

Queue.task_done()

指示先前排队的任务已完成。由队列消费者线程使用。对于每个用于获取任务的 get(),随后对 task_done() 的调用会告诉队列,该任务的处理已完成。

如果 join() 当前正在阻塞,则当所有项都已处理完毕时(意味着对放入队列的每个项都已收到 task_done() 调用时),它将恢复。

shutdown(immediate=True) 对队列中剩余的每个项调用 task_done()

如果调用次数多于放入队列中的项数,则会引发 ValueError

Queue.join()

阻塞,直到队列中的所有项都已被获取和处理。

每当向队列添加项时,未完成任务的计数就会增加。每当消费者线程调用 task_done() 以指示该项已被检索且其上的所有工作均已完成时,该计数就会减少。当未完成任务的计数降至零时,join() 将解除阻塞。

如何等待排队任务完成的示例

import threading
import queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

终止队列

可以使 Queue 对象通过关闭它们来防止进一步的交互。

Queue.shutdown(immediate=False)

关闭队列,使 get()put() 引发 ShutDown

默认情况下,关闭队列上的 get() 仅在队列为空时才会引发。将 immediate 设置为 true 以使 get() 立即引发。

所有阻塞的 put()get() 调用者将被解除阻塞。如果 immediate 为 true,则队列中剩余的每个项都将被标记为已完成,这可能会解除 join() 的调用者的阻塞。

在 3.13 版本中添加。

SimpleQueue 对象

SimpleQueue 对象提供以下描述的公共方法。

SimpleQueue.qsize()

返回队列的大致大小。请注意,qsize() > 0 并不能保证后续的 get() 不会阻塞。

SimpleQueue.empty()

如果队列为空,则返回 True,否则返回 False。如果 empty() 返回 False,则不能保证后续对 get() 的调用不会阻塞。

SimpleQueue.put(item, block=True, timeout=None)

item 放入队列。该方法永远不会阻塞,并且总是成功(除了潜在的低级错误,例如无法分配内存)。可选参数 blocktimeout 将被忽略,仅为了与 Queue.put() 兼容而提供。

CPython 实现细节:此方法具有可重入的 C 实现。也就是说,put()get() 调用可以被同一线程中的另一个 put() 调用中断,而不会发生死锁或损坏队列内部状态。这使其适用于析构函数(例如 __del__ 方法)或 weakref 回调。

SimpleQueue.put_nowait(item)

等同于 put(item, block=False),为了与 Queue.put_nowait() 兼容而提供。

SimpleQueue.get(block=True, timeout=None)

从队列中移除并返回一个项。如果可选参数 block 为 true 且 timeoutNone (默认值),则在必要时阻塞,直到有项可用。如果 timeout 为正数,则最多阻塞 timeout 秒,如果在该时间内没有可用的项,则引发 Empty 异常。否则(block 为 false),如果立即有项可用,则返回该项,否则引发 Empty 异常(在这种情况下会忽略 timeout)。

SimpleQueue.get_nowait()

等同于 get(False)

另请参阅

multiprocessing.Queue

用于多进程(而不是多线程)环境中的队列类。

collections.deque 是无界队列的另一种实现,具有快速的原子 append()popleft() 操作,这些操作不需要锁定,并且还支持索引。