队列¶
asyncio 队列旨在与 queue
模块的类相似。尽管 asyncio 队列不是线程安全的,但它们专门设计用于 async/await 代码。
请注意,asyncio 队列的方法没有 timeout 参数;请使用 asyncio.wait_for()
函数来执行带超时的队列操作。
另请参阅下面的示例部分。
队列¶
- class asyncio.Queue(maxsize=0)¶
一个先进先出 (FIFO) 队列。
如果 maxsize 小于或等于零,则队列大小是无限的。如果它是一个大于
0
的整数,则当队列达到 maxsize 时,await put()
会阻塞,直到通过get()
移除一个项目。与标准库的线程
queue
不同,队列的大小始终可知,并可通过调用qsize()
方法返回。版本 3.10 中已更改: 移除了 loop 参数。
此类不是线程安全的。
- maxsize¶
队列中允许的项目数量。
- empty()¶
如果队列为空,返回
True
,否则返回False
。
- async get()¶
从队列中移除并返回一个项目。如果队列为空,则等待直到项目可用。
如果队列已关闭且为空,或者队列已立即关闭,则引发
QueueShutDown
。
- get_nowait()¶
如果项目立即可用,则返回一个项目,否则引发
QueueEmpty
。
- async join()¶
阻塞直到队列中的所有项目都已接收和处理完毕。
每当有项目添加到队列时,未完成任务的计数就会增加。每当消费者协程调用
task_done()
表示项目已被检索并且其上的所有工作都已完成时,计数就会减少。当未完成任务的计数降至零时,join()
会解除阻塞。
- async put(item)¶
将一个项目放入队列。如果队列已满,则等待直到有可用空位,然后添加项目。
如果队列已关闭,则引发
QueueShutDown
。
- qsize()¶
返回队列中的项目数量。
- shutdown(immediate=False)¶
将
Queue
实例置于关闭模式。队列不再能增长。将来调用
put()
将引发QueueShutDown
。当前阻塞的put()
调用者将被解除阻塞,并在之前阻塞的线程中引发QueueShutDown
。如果 immediate 为 false(默认值),队列可以通过
get()
调用正常关闭,以提取已加载的任务。如果为每个剩余任务调用
task_done()
,则挂起的join()
将正常解除阻塞。一旦队列为空,将来调用
get()
将引发QueueShutDown
。如果 immediate 为 true,队列将立即终止。队列将被清空。所有
join()
的调用者都会解除阻塞,无论未完成任务的数量如何。阻塞的get()
调用者会解除阻塞,并由于队列为空而引发QueueShutDown
。在使用 immediate 设置为 true 的
join()
时请谨慎。这会解除连接的阻塞,即使任务上没有完成任何工作,从而违反了连接队列的通常不变式。在 3.13 版本加入。
- task_done()¶
表示先前入队的工单已完成。
由队列消费者使用。对于每个用于获取工单的
get()
调用,后续调用task_done()
会告诉队列工单上的处理已完成。如果
join()
当前正在阻塞,当所有项目都已处理(意味着对于每个已put()
到队列中的项目,都收到了task_done()
调用)时,它将恢复。如果调用次数多于队列中放置的项目数量,则引发
ValueError
。
优先队列¶
后进先出队列¶
异常¶
- exception asyncio.QueueEmpty¶
当在空队列上调用
get_nowait()
方法时,会引发此异常。
- exception asyncio.QueueFull¶
当在已达到 maxsize 的队列上调用
put_nowait()
方法时,引发的异常。
示例¶
队列可用于在多个并发任务之间分配工作负载。
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())