同步原语¶
源代码: Lib/asyncio/locks.py
asyncio 同步原语旨在与 threading
模块的同步原语相似,但有两个重要注意事项:
asyncio 原语不是线程安全的,因此不应将它们用于操作系统线程同步(为此请使用
threading
);这些同步原语的方法不接受 timeout 参数;请使用
asyncio.wait_for()
函数执行带超时的操作。
asyncio 具有以下基本同步原语:
Lock¶
- class asyncio.Lock¶
为 asyncio 任务实现互斥锁。非线程安全。
asyncio 锁可用于保证对共享资源的独占访问。
使用 Lock 的首选方式是
async with
语句lock = asyncio.Lock() # ... later async with lock: # access shared state
这等同于
lock = asyncio.Lock() # ... later await lock.acquire() try: # access shared state finally: lock.release()
版本 3.10 中已更改: 移除了 loop 参数。
- async acquire()¶
获取锁。
此方法会等待直到锁处于 解锁 状态,将其设置为 锁定 状态并返回
True
。当多个协程在
acquire()
中阻塞等待锁被解锁时,最终只有一个协程会继续执行。获取锁是 公平的:继续执行的协程将是第一个开始等待锁的协程。
- release()¶
释放锁。
当锁处于 锁定 状态时,将其重置为 解锁 状态并返回。
如果锁处于 解锁 状态,则会引发
RuntimeError
。
- locked()¶
如果锁处于 锁定 状态,则返回
True
。
Event¶
- class asyncio.Event¶
一个事件对象。非线程安全。
asyncio 事件可用于通知多个 asyncio 任务某个事件已发生。
Event 对象管理一个内部标志,该标志可以使用
set()
方法设置为 true,并使用clear()
方法重置为 false。wait()
方法会阻塞直到标志设置为 true。标志最初设置为 false。版本 3.10 中已更改: 移除了 loop 参数。
示例
async def waiter(event): print('waiting for it ...') await event.wait() print('... got it!') async def main(): # Create an Event object. event = asyncio.Event() # Spawn a Task to wait until 'event' is set. waiter_task = asyncio.create_task(waiter(event)) # Sleep for 1 second and set the event. await asyncio.sleep(1) event.set() # Wait until the waiter task is finished. await waiter_task asyncio.run(main())
- set()¶
设置事件。
所有等待事件被设置的任务将立即被唤醒。
- is_set()¶
如果事件已设置,则返回
True
。
Condition¶
- class asyncio.Condition(lock=None)¶
一个条件对象。非线程安全。
asyncio 条件原语可用于任务等待某个事件发生,然后获得对共享资源的独占访问。
本质上,Condition 对象结合了
Event
和Lock
的功能。可以有多个 Condition 对象共享一个 Lock,这允许在对共享资源特定状态感兴趣的不同任务之间协调对共享资源的独占访问。可选的 lock 参数必须是
Lock
对象或None
。在后一种情况下,会自动创建一个新的 Lock 对象。版本 3.10 中已更改: 移除了 loop 参数。
使用 Condition 的首选方式是
async with
语句cond = asyncio.Condition() # ... later async with cond: await cond.wait()
这等同于
cond = asyncio.Condition() # ... later await cond.acquire() try: await cond.wait() finally: cond.release()
- async acquire()¶
获取底层锁。
此方法会等待直到底层锁处于 解锁 状态,将其设置为 锁定 状态并返回
True
。
- notify(n=1)¶
唤醒 n 个(默认为 1 个)在此条件上等待的任务。如果等待的任务少于 n 个,则所有任务都会被唤醒。
此方法调用之前必须获取锁,并在之后尽快释放。如果使用 未锁定 的锁调用,则会引发
RuntimeError
错误。
- locked()¶
如果底层锁已获取,则返回
True
。
- notify_all()¶
唤醒所有在此条件上等待的任务。
此方法的作用类似于
notify()
,但会唤醒所有等待中的任务。此方法调用之前必须获取锁,并在之后尽快释放。如果使用 未锁定 的锁调用,则会引发
RuntimeError
错误。
- release()¶
释放底层锁。
如果在未锁定的锁上调用,则会引发
RuntimeError
。
- async wait()¶
等待直到被通知。
如果调用此方法时调用任务尚未获取锁,则会引发
RuntimeError
。此方法会释放底层锁,然后阻塞直到被
notify()
或notify_all()
调用唤醒。一旦被唤醒,Condition 会重新获取其锁,此方法返回True
。请注意,任务 可能会 意外地从此次调用返回,这就是为什么调用者应始终重新检查状态并准备好再次
wait()
。因此,您可能更喜欢使用wait_for()
。
Semaphore¶
- class asyncio.Semaphore(value=1)¶
一个信号量对象。非线程安全。
信号量管理一个内部计数器,该计数器在每次
acquire()
调用时递减,并在每次release()
调用时递增。计数器永远不能低于零;当acquire()
发现它为零时,它会阻塞,等待直到某个任务调用release()
。可选的 value 参数给出内部计数器的初始值(默认为
1
)。如果给定值小于0
,则会引发ValueError
。版本 3.10 中已更改: 移除了 loop 参数。
使用 Semaphore 的首选方式是
async with
语句sem = asyncio.Semaphore(10) # ... later async with sem: # work with shared resource
这等同于
sem = asyncio.Semaphore(10) # ... later await sem.acquire() try: # work with shared resource finally: sem.release()
- locked()¶
如果信号量不能立即获取,则返回
True
。
- release()¶
释放信号量,将内部计数器递增一。可以唤醒等待获取信号量的任务。
与
BoundedSemaphore
不同,Semaphore
允许进行比acquire()
调用更多的release()
调用。
BoundedSemaphore¶
- class asyncio.BoundedSemaphore(value=1)¶
一个有界信号量对象。非线程安全。
有界信号量是
Semaphore
的一个版本,它在release()
中如果将内部计数器增加到超过初始 value,则会引发ValueError
。版本 3.10 中已更改: 移除了 loop 参数。
Barrier¶
- class asyncio.Barrier(parties)¶
一个屏障对象。非线程安全。
屏障是一种简单的同步原语,允许阻塞直到 parties 数量的任务都在其上等待。任务可以在
wait()
方法上等待,并将被阻塞直到指定数量的任务最终在wait()
上等待。此时,所有等待的任务将同时解除阻塞。async with
可以用作等待wait()
的替代方法。屏障可以重复使用任意次数。
示例
async def example_barrier(): # barrier with 3 parties b = asyncio.Barrier(3) # create 2 new waiting tasks asyncio.create_task(b.wait()) asyncio.create_task(b.wait()) await asyncio.sleep(0) print(b) # The third .wait() call passes the barrier await b.wait() print(b) print("barrier passed") await asyncio.sleep(0) print(b) asyncio.run(example_barrier())
此示例的结果是
<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]> <asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]> barrier passed <asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>
在 3.11 版本中新增。
- async wait()¶
通过屏障。当所有参与屏障的任务都已调用此函数时,它们将同时解除阻塞。
当屏障中等待或阻塞的任务被取消时,该任务会退出屏障,屏障保持相同状态。如果屏障的状态是“填充”,则等待任务的数量减少 1。
返回值是一个整数,范围从 0 到
parties-1
,每个任务都不同。这可以用于选择一个任务来执行一些特殊的内务管理,例如... async with barrier as position: if position == 0: # Only one task prints this print('End of *draining phase*')
如果屏障在任务等待时被破坏或重置,此方法可能会引发
BrokenBarrierError
异常。如果任务被取消,它可能会引发CancelledError
。
- async reset()¶
将屏障恢复到默认的空状态。任何等待它的任务都将收到
BrokenBarrierError
异常。如果屏障损坏,最好直接放弃并创建一个新的。
- async abort()¶
将屏障置于损坏状态。这会导致任何当前或未来对
wait()
的调用都以BrokenBarrierError
失败。例如,如果其中一个任务需要中止,请使用此方法以避免任务无限期等待。
- parties¶
通过屏障所需的任务数量。
- n_waiting¶
在填充过程中当前在屏障中等待的任务数量。
- broken¶
一个布尔值,如果屏障处于损坏状态,则为
True
。
- exception asyncio.BrokenBarrierError¶
当
Barrier
对象被重置或损坏时,会引发此异常,它是RuntimeError
的子类。
版本 3.9 中已更改: 移除了使用 await lock
或 yield from lock
和/或 with
语句(with await lock
, with (yield from lock)
)获取锁的方式。请改用 async with lock
。