队列

源代码: Lib/asyncio/queues.py


asyncio 队列旨在与 queue 模块的类相似。尽管 asyncio 队列不是线程安全的,但它们专门设计用于 async/await 代码。

请注意,asyncio 队列的方法没有 timeout 参数;请使用 asyncio.wait_for() 函数来执行带超时的队列操作。

另请参阅下面的示例部分。

队列

class asyncio.Queue(maxsize=0)

一个先进先出 (FIFO) 队列。

如果 maxsize 小于或等于零,则队列大小是无限的。如果它是一个大于 0 的整数,则当队列达到 maxsize 时,await put() 会阻塞,直到通过 get() 移除一个项目。

与标准库的线程 queue 不同,队列的大小始终可知,并可通过调用 qsize() 方法返回。

版本 3.10 中已更改: 移除了 loop 参数。

此类不是线程安全的

maxsize

队列中允许的项目数量。

empty()

如果队列为空,返回 True,否则返回 False

full()

如果队列中有 maxsize 个项目,返回 True

如果队列以 maxsize=0(默认值)初始化,则 full() 永远不会返回 True

async get()

从队列中移除并返回一个项目。如果队列为空,则等待直到项目可用。

如果队列已关闭且为空,或者队列已立即关闭,则引发 QueueShutDown

get_nowait()

如果项目立即可用,则返回一个项目,否则引发 QueueEmpty

async join()

阻塞直到队列中的所有项目都已接收和处理完毕。

每当有项目添加到队列时,未完成任务的计数就会增加。每当消费者协程调用 task_done() 表示项目已被检索并且其上的所有工作都已完成时,计数就会减少。当未完成任务的计数降至零时,join() 会解除阻塞。

async put(item)

将一个项目放入队列。如果队列已满,则等待直到有可用空位,然后添加项目。

如果队列已关闭,则引发 QueueShutDown

put_nowait(item)

将一个项目放入队列而不阻塞。

如果没有立即可用的空位,则引发 QueueFull

qsize()

返回队列中的项目数量。

shutdown(immediate=False)

Queue 实例置于关闭模式。

队列不再能增长。将来调用 put() 将引发 QueueShutDown。当前阻塞的 put() 调用者将被解除阻塞,并在之前阻塞的线程中引发 QueueShutDown

如果 immediate 为 false(默认值),队列可以通过 get() 调用正常关闭,以提取已加载的任务。

如果为每个剩余任务调用 task_done(),则挂起的 join() 将正常解除阻塞。

一旦队列为空,将来调用 get() 将引发 QueueShutDown

如果 immediate 为 true,队列将立即终止。队列将被清空。所有 join() 的调用者都会解除阻塞,无论未完成任务的数量如何。阻塞的 get() 调用者会解除阻塞,并由于队列为空而引发 QueueShutDown

在使用 immediate 设置为 true 的 join() 时请谨慎。这会解除连接的阻塞,即使任务上没有完成任何工作,从而违反了连接队列的通常不变式。

在 3.13 版本加入。

task_done()

表示先前入队的工单已完成。

由队列消费者使用。对于每个用于获取工单的 get() 调用,后续调用 task_done() 会告诉队列工单上的处理已完成。

如果 join() 当前正在阻塞,当所有项目都已处理(意味着对于每个已 put() 到队列中的项目,都收到了 task_done() 调用)时,它将恢复。

如果调用次数多于队列中放置的项目数量,则引发 ValueError

优先队列

class asyncio.PriorityQueue

Queue 的变体;以优先级顺序(最低优先)检索条目。

条目通常是 (priority_number, data) 形式的元组。

后进先出队列

class asyncio.LifoQueue

Queue 的变体,它首先检索最近添加的条目(后进先出)。

异常

exception asyncio.QueueEmpty

当在空队列上调用 get_nowait() 方法时,会引发此异常。

exception asyncio.QueueFull

当在已达到 maxsize 的队列上调用 put_nowait() 方法时,引发的异常。

exception asyncio.QueueShutDown

当在已关闭的队列上调用 put()get() 时,引发的异常。

在 3.13 版本加入。

示例

队列可用于在多个并发任务之间分配工作负载。

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())