队列¶
asyncio 队列的设计类似于 queue
模块中的类。虽然 asyncio 队列不是线程安全的,但它们被设计为专门用于 async/await 代码。
请注意,asyncio 队列的方法没有timeout参数;使用 asyncio.wait_for()
函数来执行带有超时的队列操作。
另请参阅下面的 示例 部分。
队列¶
- class asyncio.Queue(maxsize=0)¶
先进先出 (FIFO) 队列。
如果maxsize小于或等于零,则队列大小为无限。如果它是一个大于
0
的整数,则当队列达到maxsize时,await put()
会阻塞,直到get()
删除一个项目。与标准库线程
queue
不同,队列的大小始终是已知的,可以通过调用qsize()
方法来获取。在版本 3.10 中更改: 删除了loop参数。
此类是 线程不安全的.
- maxsize¶
队列中允许的项目数量。
- empty()¶
如果队列为空,则返回
True
,否则返回False
。
- coroutine get()¶
从队列中删除并返回一个项目。如果队列为空,则等待直到有项目可用。
- get_nowait()¶
如果立即有项目可用,则返回一个项目,否则引发
QueueEmpty
。
- coroutine join()¶
阻塞,直到队列中的所有项目都被接收并处理。
每当将项目添加到队列时,未完成任务的计数就会增加。每当消费者协程调用
task_done()
来指示项目已检索到并且所有对其的操作都已完成时,计数就会减少。当未完成任务的计数降至零时,join()
将解除阻塞。
- coroutine put(item)¶
将项目放入队列。如果队列已满,则等待直到有空闲插槽可用,然后再添加项目。
- qsize()¶
返回队列中的项目数量。
- task_done()¶
指示以前排队的任务已完成。
由队列消费者使用。对于每个用于获取任务的
get()
,后续对task_done()
的调用告诉队列对任务的处理已完成。如果当前有
join()
阻塞,则当所有项目都已处理完毕时(意味着已为每个放入队列的项目接收了task_done()
调用),它将恢复。如果调用的次数超过放入队列的项目数量,则引发
ValueError
。
优先级队列¶
后进先出队列¶
异常¶
- exception asyncio.QueueEmpty¶
当在空队列上调用
get_nowait()
方法时,会引发此异常。
- exception asyncio.QueueFull¶
当在已达到其maxsize的队列上调用
put_nowait()
方法时,会引发此异常。
示例¶
队列可用于在多个并发任务之间分配工作负载
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())