queue — 同步队列类

源代码: Lib/queue.py


queue 模块实现了多生产者、多消费者队列。当信息必须在多个线程之间安全地交换时,它在多线程编程中特别有用。此模块中的 Queue 类实现了所有必需的锁定语义。

该模块实现了三种类型的队列,它们的区别仅在于检索条目的顺序。在 FIFO 队列中,最先添加的任务将最先被检索。在 LIFO 队列中,最近添加的条目将最先被检索(像堆栈一样运行)。使用优先级队列,条目将保持排序(使用 heapq 模块),并且值最低的条目将最先被检索。

在内部,这三种类型的队列使用锁来临时阻塞竞争线程;但是,它们并非设计用于处理线程内的重入。

此外,该模块还实现了一种“简单”的 FIFO 队列类型,即 SimpleQueue,其特定实现以较小的功能换取额外的保证。

queue 模块定义了以下类和异常:

class queue.Queue(maxsize=0)

FIFO 队列的构造函数。 *maxsize* 是一个整数,用于设置可以放入队列中的项目数量上限。一旦达到此大小,插入将被阻塞,直到队列项目被消费。如果 *maxsize* 小于或等于零,则队列大小为无限。

class queue.LifoQueue(maxsize=0)

LIFO 队列的构造函数。 *maxsize* 是一个整数,用于设置可以放入队列中的项目数量上限。一旦达到此大小,插入将被阻塞,直到队列项目被消费。如果 *maxsize* 小于或等于零,则队列大小为无限。

class queue.PriorityQueue(maxsize=0)

优先级队列的构造函数。 *maxsize* 是一个整数,用于设置可以放入队列中的项目数量上限。一旦达到此大小,插入将被阻塞,直到队列项目被消费。如果 *maxsize* 小于或等于零,则队列大小为无限。

值最低的条目将最先被检索(值最低的条目是 min(entries) 将返回的条目)。条目的典型模式是以下形式的元组:(priority_number, data)

如果 *data* 元素不可比较,则可以将数据包装在一个忽略数据项并且仅比较优先级编号的类中

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

无界 FIFO 队列的构造函数。简单队列缺少任务跟踪等高级功能。

3.7 版新增。

exception queue.Empty

在空的 Queue 对象上调用非阻塞 get()(或 get_nowait())时引发的异常。

exception queue.Full

在满的 Queue 对象上调用非阻塞 put()(或 put_nowait())时引发的异常。

队列对象

队列对象(QueueLifoQueuePriorityQueue)提供以下公共方法。

Queue.qsize()

返回队列的大致大小。注意,qsize() > 0 并不能保证后续的 get() 调用不会阻塞,qsize() < maxsize 也不能保证 put() 调用不会阻塞。

Queue.empty()

如果队列为空,则返回 True,否则返回 False。如果 empty() 返回 True,则不能保证后续对 put() 的调用不会阻塞。同样,如果 empty() 返回 False,则不能保证后续对 get() 的调用不会阻塞。

Queue.full()

如果队列已满,则返回 True,否则返回 False。如果 full() 返回 True,则不能保证后续对 get() 的调用不会阻塞。同样,如果 full() 返回 False,则不能保证后续对 put() 的调用不会阻塞。

Queue.put(item, block=True, timeout=None)

item 放入队列。如果可选参数 block 为 true 且 timeoutNone(默认值),则在必要时阻塞,直到有空闲插槽可用。如果 timeout 是一个正数,它最多阻塞 timeout 秒,如果在该时间内没有可用的空闲插槽,则引发 Full 异常。否则(block 为 false),如果立即有空闲插槽可用,则将项目放入队列,否则引发 Full 异常(在这种情况下,timeout 将被忽略)。

Queue.put_nowait(item)

等效于 put(item, block=False)

Queue.get(block=True, timeout=None)

从队列中移除并返回一个项目。如果可选参数 block 为 true 且 timeoutNone(默认值),则在必要时阻塞,直到有项目可用。如果 timeout 是一个正数,它最多阻塞 timeout 秒,如果在该时间内没有可用的项目,则引发 Empty 异常。否则(block 为 false),如果立即有项目可用,则返回该项目,否则引发 Empty 异常(在这种情况下,timeout 将被忽略)。

在 POSIX 系统上 3.0 版本之前,以及 Windows 上的所有版本中,如果 block 为 true 且 timeoutNone,则此操作将在底层锁上进入不可中断的等待状态。这意味着不会发生任何异常,特别是 SIGINT 不会触发 KeyboardInterrupt

Queue.get_nowait()

等效于 get(False)

提供了两种方法来支持跟踪守护程序消费者线程是否已完全处理入队任务。

Queue.task_done()

指示先前入队的任务已完成。由队列消费者线程使用。对于用于获取任务的每个 get(),对 task_done() 的后续调用会告诉队列该任务的处理已完成。

如果 join() 当前正在阻塞,则当所有项目都已处理完毕时(这意味着已为已放入队列的每个项目接收到 task_done() 调用),它将恢复。

如果调用的次数超过放入队列的项目数,则引发 ValueError

Queue.join()

阻塞,直到队列中的所有项目都被获取并处理完毕。

每当将项目添加到队列时,未完成任务的计数就会增加。每当消费者线程调用 task_done() 以指示已检索到该项目并且其上的所有工作都已完成时,计数就会减少。当未完成任务的计数降至零时,join() 将解除阻塞。

如何等待入队任务完成的示例

import threading
import queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

SimpleQueue 对象

SimpleQueue 对象提供如下所述的公共方法。

SimpleQueue.qsize()

返回队列的大致大小。注意,qsize() > 0 并不能保证后续的 get() 调用不会阻塞。

SimpleQueue.empty()

如果队列为空,则返回 True,否则返回 False。如果 empty() 返回 False,则不能保证后续对 get() 的调用不会阻塞。

SimpleQueue.put(item, block=True, timeout=None)

item 放入队列。该方法从不阻塞并且始终成功(除了潜在的低级错误,例如内存分配失败)。可选参数 blocktimeout 将被忽略,仅为了与 Queue.put() 兼容而提供。

**CPython 实现细节:** 此方法具有可重入的 C 实现。也就是说,put()get() 调用可以被同一线程中的另一个 put() 调用中断,而不会导致死锁或损坏队列内部的状态。这使得它适合在析构函数(例如 __del__ 方法)或 weakref 回调中使用。

SimpleQueue.put_nowait(item)

等效于 put(item, block=False),为了与 Queue.put_nowait() 兼容而提供。

SimpleQueue.get(block=True, timeout=None)

从队列中移除并返回一个项目。如果可选参数 block 为 true 且 timeoutNone(默认值),则在必要时阻塞,直到有项目可用。如果 timeout 是一个正数,它最多阻塞 timeout 秒,如果在该时间内没有可用的项目,则引发 Empty 异常。否则(block 为 false),如果立即有项目可用,则返回该项目,否则引发 Empty 异常(在这种情况下,timeout 将被忽略)。

SimpleQueue.get_nowait()

等效于 get(False)

另请参阅

multiprocessing.Queue

一个用于多进程(而不是多线程)上下文的队列类。

collections.deque 是无界队列的另一种实现,它具有快速的原子 append()popleft() 操作,这些操作不需要锁定并且还支持索引。