同步原语

源代码: Lib/asyncio/locks.py


asyncio 同步原语旨在与 threading 模块的同步原语相似,但有两个重要注意事项:

  • asyncio 原语不是线程安全的,因此不应将它们用于操作系统线程同步(为此请使用 threading);

  • 这些同步原语的方法不接受 timeout 参数;请使用 asyncio.wait_for() 函数执行带超时的操作。

asyncio 具有以下基本同步原语:


Lock

class asyncio.Lock

为 asyncio 任务实现互斥锁。非线程安全。

asyncio 锁可用于保证对共享资源的独占访问。

使用 Lock 的首选方式是 async with 语句

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

这等同于

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

版本 3.10 中已更改: 移除了 loop 参数。

async acquire()

获取锁。

此方法会等待直到锁处于 解锁 状态,将其设置为 锁定 状态并返回 True

当多个协程在 acquire() 中阻塞等待锁被解锁时,最终只有一个协程会继续执行。

获取锁是 公平的:继续执行的协程将是第一个开始等待锁的协程。

release()

释放锁。

当锁处于 锁定 状态时,将其重置为 解锁 状态并返回。

如果锁处于 解锁 状态,则会引发 RuntimeError

locked()

如果锁处于 锁定 状态,则返回 True

Event

class asyncio.Event

一个事件对象。非线程安全。

asyncio 事件可用于通知多个 asyncio 任务某个事件已发生。

Event 对象管理一个内部标志,该标志可以使用 set() 方法设置为 true,并使用 clear() 方法重置为 falsewait() 方法会阻塞直到标志设置为 true。标志最初设置为 false

版本 3.10 中已更改: 移除了 loop 参数。

示例

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
async wait()

等待直到事件被设置。

如果事件已设置,则立即返回 True。否则,阻塞直到另一个任务调用 set()

set()

设置事件。

所有等待事件被设置的任务将立即被唤醒。

clear()

清除(取消设置)事件。

后续等待 wait() 的任务现在将阻塞,直到再次调用 set() 方法。

is_set()

如果事件已设置,则返回 True

Condition

class asyncio.Condition(lock=None)

一个条件对象。非线程安全。

asyncio 条件原语可用于任务等待某个事件发生,然后获得对共享资源的独占访问。

本质上,Condition 对象结合了 EventLock 的功能。可以有多个 Condition 对象共享一个 Lock,这允许在对共享资源特定状态感兴趣的不同任务之间协调对共享资源的独占访问。

可选的 lock 参数必须是 Lock 对象或 None。在后一种情况下,会自动创建一个新的 Lock 对象。

版本 3.10 中已更改: 移除了 loop 参数。

使用 Condition 的首选方式是 async with 语句

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

这等同于

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
async acquire()

获取底层锁。

此方法会等待直到底层锁处于 解锁 状态,将其设置为 锁定 状态并返回 True

notify(n=1)

唤醒 n 个(默认为 1 个)在此条件上等待的任务。如果等待的任务少于 n 个,则所有任务都会被唤醒。

此方法调用之前必须获取锁,并在之后尽快释放。如果使用 未锁定 的锁调用,则会引发 RuntimeError 错误。

locked()

如果底层锁已获取,则返回 True

notify_all()

唤醒所有在此条件上等待的任务。

此方法的作用类似于 notify(),但会唤醒所有等待中的任务。

此方法调用之前必须获取锁,并在之后尽快释放。如果使用 未锁定 的锁调用,则会引发 RuntimeError 错误。

release()

释放底层锁。

如果在未锁定的锁上调用,则会引发 RuntimeError

async wait()

等待直到被通知。

如果调用此方法时调用任务尚未获取锁,则会引发 RuntimeError

此方法会释放底层锁,然后阻塞直到被 notify()notify_all() 调用唤醒。一旦被唤醒,Condition 会重新获取其锁,此方法返回 True

请注意,任务 可能会 意外地从此次调用返回,这就是为什么调用者应始终重新检查状态并准备好再次 wait()。因此,您可能更喜欢使用 wait_for()

async wait_for(predicate)

等待直到谓词变为 true

谓词必须是一个可调用对象,其结果将被解释为布尔值。此方法将重复调用 wait() 直到谓词评估为 true。最终值是返回值。

Semaphore

class asyncio.Semaphore(value=1)

一个信号量对象。非线程安全。

信号量管理一个内部计数器,该计数器在每次 acquire() 调用时递减,并在每次 release() 调用时递增。计数器永远不能低于零;当 acquire() 发现它为零时,它会阻塞,等待直到某个任务调用 release()

可选的 value 参数给出内部计数器的初始值(默认为 1)。如果给定值小于 0,则会引发 ValueError

版本 3.10 中已更改: 移除了 loop 参数。

使用 Semaphore 的首选方式是 async with 语句

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

这等同于

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
async acquire()

获取信号量。

如果内部计数器大于零,则将其递减一并立即返回 True。如果为零,则等待直到调用 release() 并返回 True

locked()

如果信号量不能立即获取,则返回 True

release()

释放信号量,将内部计数器递增一。可以唤醒等待获取信号量的任务。

BoundedSemaphore 不同,Semaphore 允许进行比 acquire() 调用更多的 release() 调用。

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1)

一个有界信号量对象。非线程安全。

有界信号量是 Semaphore 的一个版本,它在 release() 中如果将内部计数器增加到超过初始 value,则会引发 ValueError

版本 3.10 中已更改: 移除了 loop 参数。

Barrier

class asyncio.Barrier(parties)

一个屏障对象。非线程安全。

屏障是一种简单的同步原语,允许阻塞直到 parties 数量的任务都在其上等待。任务可以在 wait() 方法上等待,并将被阻塞直到指定数量的任务最终在 wait() 上等待。此时,所有等待的任务将同时解除阻塞。

async with 可以用作等待 wait() 的替代方法。

屏障可以重复使用任意次数。

示例

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

此示例的结果是

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

在 3.11 版本中新增。

async wait()

通过屏障。当所有参与屏障的任务都已调用此函数时,它们将同时解除阻塞。

当屏障中等待或阻塞的任务被取消时,该任务会退出屏障,屏障保持相同状态。如果屏障的状态是“填充”,则等待任务的数量减少 1。

返回值是一个整数,范围从 0 到 parties-1,每个任务都不同。这可以用于选择一个任务来执行一些特殊的内务管理,例如

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

如果屏障在任务等待时被破坏或重置,此方法可能会引发 BrokenBarrierError 异常。如果任务被取消,它可能会引发 CancelledError

async reset()

将屏障恢复到默认的空状态。任何等待它的任务都将收到 BrokenBarrierError 异常。

如果屏障损坏,最好直接放弃并创建一个新的。

async abort()

将屏障置于损坏状态。这会导致任何当前或未来对 wait() 的调用都以 BrokenBarrierError 失败。例如,如果其中一个任务需要中止,请使用此方法以避免任务无限期等待。

parties

通过屏障所需的任务数量。

n_waiting

在填充过程中当前在屏障中等待的任务数量。

broken

一个布尔值,如果屏障处于损坏状态,则为 True

exception asyncio.BrokenBarrierError

Barrier 对象被重置或损坏时,会引发此异常,它是 RuntimeError 的子类。


版本 3.9 中已更改: 移除了使用 await lockyield from lock 和/或 with 语句(with await lock, with (yield from lock))获取锁的方式。请改用 async with lock