同步原语

源代码: Lib/asyncio/locks.py


asyncio 同步原语的设计与 threading 模块中的原语类似,但有两个重要的注意事项:

  • asyncio 原语不是线程安全的,因此不应用于操作系统线程同步(为此请使用 threading);

  • 这些同步原语的方法不接受 timeout 参数; 使用 asyncio.wait_for() 函数来执行带超时的操作。

asyncio 具有以下基本同步原语:


class asyncio.Lock

为 asyncio 任务实现互斥锁。 不是线程安全的。

asyncio 锁可用于保证对共享资源的独占访问。

使用 Lock 的首选方式是 async with 语句

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

它等效于

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

在 3.10 版本中更改: 删除了 loop 参数。

coroutine acquire()

获取锁。

此方法会等待直到锁被解锁,将其设置为锁定并返回 True

当多个协程在 acquire() 中被阻塞,等待锁被解锁时,最终只会有一个协程继续执行。

获取锁是公平的:继续执行的协程将是第一个开始等待锁的协程。

release()

释放锁。

当锁为锁定时,将其重置为解锁并返回。

如果锁为解锁,则会引发 RuntimeError

locked()

如果锁为锁定,则返回 True

事件

class asyncio.Event

一个事件对象。不是线程安全的。

asyncio 事件可用于通知多个 asyncio 任务已发生某些事件。

Event 对象管理一个内部标志,可以使用 set() 方法设置为 true,并使用 clear() 方法重置为 falsewait() 方法会阻塞,直到该标志设置为 true。该标志最初设置为 false

在 3.10 版本中更改: 删除了 loop 参数。

示例

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

等待直到事件被设置。

如果事件已设置,则立即返回 True。否则,将阻塞,直到另一个任务调用 set()

set()

设置事件。

所有等待设置事件的任务都将被立即唤醒。

clear()

清除(取消设置)事件。

等待 wait() 的任务现在将阻塞,直到再次调用 set() 方法。

is_set()

如果事件已设置,则返回 True

条件

class asyncio.Condition(lock=None)

一个条件对象。不是线程安全的。

asyncio 条件原语可供任务使用,以等待某些事件发生,然后获得对共享资源的独占访问权限。

本质上,Condition 对象结合了 EventLock 的功能。可以让多个 Condition 对象共享一个 Lock,从而允许在对共享资源的特定状态感兴趣的不同任务之间协调对共享资源的独占访问。

可选的 lock 参数必须是 Lock 对象或 None。在后一种情况下,会自动创建一个新的 Lock 对象。

在 3.10 版本中更改: 删除了 loop 参数。

使用 Condition 的首选方法是 async with 语句

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

它等效于

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

获取底层锁。

此方法会等待直到底层锁被解锁,将其设置为锁定并返回 True

notify(n=1)

唤醒 n 个(默认为 1 个)等待此条件的任务。如果等待的任务少于 n 个,则会唤醒所有任务。

必须在调用此方法之前获取锁,并在之后立即释放。如果在解锁锁的情况下调用,则会引发 RuntimeError 错误。

locked()

如果获取了底层锁,则返回 True

notify_all()

唤醒所有等待此条件的任务。

此方法的行为类似于 notify(),但会唤醒所有等待的任务。

必须在调用此方法之前获取锁,并在之后立即释放。如果在解锁锁的情况下调用,则会引发 RuntimeError 错误。

release()

释放底层锁。

当在解锁的锁上调用时,会引发 RuntimeError

coroutine wait()

等待直到收到通知。

如果调用此方法时调用任务尚未获取锁,则会引发 RuntimeError

此方法会释放底层锁,然后阻塞,直到被 notify()notify_all() 调用唤醒。一旦被唤醒,Condition 会重新获取其锁,并且此方法返回 True

请注意,任务可能会从此次调用中意外返回,这就是为什么调用者应该始终重新检查状态,并准备好再次wait()的原因。因此,您可能更倾向于使用wait_for()

协程 wait_for(predicate)

等待直到谓词变为true

谓词必须是一个可调用对象,其结果将被解释为布尔值。该方法将重复wait(),直到谓词求值为true。最终值是返回值。

信号量

class asyncio.Semaphore(value=1)

一个信号量对象。非线程安全。

信号量管理一个内部计数器,每次调用acquire()时计数器会递减,每次调用release()时计数器会递增。计数器永远不会低于零;当acquire()发现计数器为零时,它会阻塞,等待直到某个任务调用release()

可选的value参数给出内部计数器的初始值(默认为1)。如果给定的值小于0,则会引发ValueError

在 3.10 版本中更改: 删除了 loop 参数。

使用信号量的首选方式是async with语句

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

它等效于

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
协程 acquire()

获取信号量。

如果内部计数器大于零,则将其减一并立即返回True。如果计数器为零,则等待直到调用release()并返回True

locked()

如果信号量无法立即获取,则返回True

release()

释放信号量,将内部计数器加一。可以唤醒正在等待获取信号量的任务。

BoundedSemaphore不同,Semaphore允许执行比acquire()调用更多的release()调用。

有界信号量

class asyncio.BoundedSemaphore(value=1)

一个有界信号量对象。非线程安全。

有界信号量是Semaphore的一个版本,如果release()将内部计数器增加到初始value之上,则会引发ValueError

在 3.10 版本中更改: 删除了 loop 参数。

屏障

class asyncio.Barrier(parties)

一个屏障对象。非线程安全。

屏障是一种简单的同步原语,允许阻塞,直到有parties个任务在它上面等待。任务可以在wait()方法上等待,并且会被阻塞,直到指定数量的任务在wait()上等待。届时,所有等待的任务将同时解除阻塞。

async with可以用作等待wait()的替代方法。

屏障可以重复使用任意次数。

示例

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

此示例的结果是

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

3.11 版本中新增。

协程 wait()

通过屏障。当参与屏障的所有任务都调用此函数时,它们将同时解除阻塞。

当屏障中等待或阻塞的任务被取消时,该任务将退出屏障,屏障保持相同的状态。如果屏障的状态为“填充”,则等待任务的数量会减少 1。

返回值是一个介于 0 到 parties-1 之间的整数,每个任务都不同。这可以用来选择一个任务来执行一些特殊的管理任务,例如。

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

如果屏障在任务等待时被破坏或重置,此方法可能会引发BrokenBarrierError异常。如果任务被取消,则可能会引发CancelledError

协程 reset()

将屏障返回到默认的空状态。任何在屏障上等待的任务都将收到BrokenBarrierError异常。

如果屏障被破坏,最好是直接保留它并创建一个新的屏障。

协程 abort()

将屏障置于破坏状态。这会导致任何活动的或未来的wait()调用失败,并出现BrokenBarrierError。例如,如果其中一个任务需要中止,请使用此选项以避免无限等待任务。

parties

通过屏障所需的任务数。

n_waiting

当前正在屏障中等待填充的任务数。

broken

一个布尔值,如果屏障处于破坏状态,则为True

exception asyncio.BrokenBarrierError

Barrier对象被重置或破坏时,将引发此异常,它是RuntimeError的子类。


在 3.9 版本中更改:使用 await lockyield from lock 和/或 with 语句(with await lockwith (yield from lock))获取锁的操作已被删除。请改用 async with lock