asyncio 的概念性概述

这篇 HOWTO 文章旨在帮助您建立一个关于 asyncio 基本工作原理的扎实心智模型,帮助您理解推荐模式的“何为”和“为何”。

您可能对一些关键的 asyncio 概念感到好奇。通过阅读本文,您将能够轻松回答以下问题:

  • 当一个对象被 awaited 时,幕后发生了什么?

  • asyncio 如何区分不需要 CPU 时间的任务(例如网络请求或文件读取)和需要 CPU 时间的任务(例如计算 n 阶乘)?

  • 如何编写一个操作的异步变体,例如异步睡眠或数据库请求。

参见

概念性概述第一部分:高层

在第一部分中,我们将介绍 asyncio 的主要高层构建块:事件循环、协程函数、协程对象、任务和 await

事件循环

asyncio 中的一切都相对于事件循环发生。它是演出的明星。它就像一个管弦乐队的指挥。它在幕后管理资源。一些权力明确地授予给它,但它完成工作的许多能力来自于其工蜂的尊重和合作。

更技术性地说,事件循环包含了一组要运行的作业。有些作业是您直接添加的,有些是由 asyncio 间接添加的。事件循环从其积压的工作中取出一个作业并调用它(或“将控制权交给它”),类似于调用一个函数,然后该作业运行。一旦它暂停或完成,它就会将控制权返回给事件循环。事件循环然后从其池中选择另一个作业并调用它。您可以**大致**将作业集合视为一个队列:作业被添加,然后一次处理一个,通常(但不总是)按顺序进行。这个过程无限重复,事件循环无休止地循环。如果没有更多待执行的作业,事件循环会足够聪明地休息,避免不必要地浪费 CPU 周期,并在有更多工作要做时返回。

有效的执行依赖于作业之间的良好共享和协作;一个贪婪的作业可能会霸占控制权并让其他作业挨饿,从而使整体的事件循环方法变得相当无用。

import asyncio

# This creates an event loop and indefinitely cycles through
# its collection of jobs.
event_loop = asyncio.new_event_loop()
event_loop.run_forever()

异步函数和协程

这是一个基本的、无聊的 Python 函数

def hello_printer():
    print(
        "Hi, I am a lowly, simple printer, though I have all I "
        "need in life -- \nfresh paper and my dearly beloved octopus "
        "partner in crime."
    )

调用常规函数会执行其逻辑或主体

>>> hello_printer()
Hi, I am a lowly, simple printer, though I have all I need in life --
fresh paper and my dearly beloved octopus partner in crime.

async def,与普通的 def 不同,使其成为一个异步函数(或“协程函数”)。调用它会创建并返回一个 协程 对象。

async def loudmouth_penguin(magic_number: int):
    print(
     "I am a super special talking penguin. Far cooler than that printer. "
     f"By the way, my lucky number is: {magic_number}."
    )

调用异步函数 loudmouth_penguin 并不会执行 print 语句;相反,它会创建一个协程对象

>>> loudmouth_penguin(magic_number=3)
<coroutine object loudmouth_penguin at 0x104ed2740>

术语“协程函数”和“协程对象”经常被混淆为协程。这可能会令人困惑!在本文中,协程特指协程对象,或者更确切地说,是 types.CoroutineType 的实例(原生协程)。请注意,协程也可以作为 collections.abc.Coroutine 的实例存在——这是一个对类型检查很重要的区别。

协程表示函数的主体或逻辑。协程必须显式启动;同样,仅仅创建协程并不会启动它。值得注意的是,协程可以在函数主体内的各个点暂停和恢复。这种暂停和恢复的能力正是实现异步行为的关键!

协程和协程函数是通过利用 生成器生成器函数 的功能构建的。回想一下,生成器函数是一个 yield 的函数,就像这个一样

def get_random_number():
    # This would be a bad random number generator!
    print("Hi")
    yield 1
    print("Hello")
    yield 7
    print("Howdy")
    yield 4
    ...

类似于协程函数,调用生成器函数不会运行它。相反,它会创建一个生成器对象

>>> get_random_number()
<generator object get_random_number at 0x1048671c0>

您可以通过使用内置函数 next() 来进入生成器的下一个 yield。换句话说,生成器运行,然后暂停。例如:

>>> generator = get_random_number()
>>> next(generator)
Hi
1
>>> next(generator)
Hello
7

任务

粗略地说,任务 是与事件循环绑定的协程(而不是协程函数)。任务还维护一个回调函数列表,当我们讨论 await 时,它们的重要性将变得清晰。推荐创建任务的方式是通过 asyncio.create_task()

创建任务会自动安排其执行(通过向事件循环的待办事项列表,即作业集合,添加一个回调来运行它)。

由于(在每个线程中)只有一个事件循环,asyncio 会为您处理将任务与事件循环关联起来。因此,无需指定事件循环。

coroutine = loudmouth_penguin(magic_number=5)
# This creates a Task object and schedules its execution via the event loop.
task = asyncio.create_task(coroutine)

之前,我们手动创建了事件循环并将其设置为永远运行。实际上,建议使用(也常见)asyncio.run(),它负责管理事件循环并确保提供的协程在继续之前完成。例如,许多异步程序遵循这种设置

import asyncio

async def main():
    # Perform all sorts of wacky, wild asynchronous things...
    ...

if __name__ == "__main__":
    asyncio.run(main())
    # The program will not reach the following print statement until the
    # coroutine main() finishes.
    print("coroutine main() is done!")

重要的是要意识到任务本身并没有添加到事件循环中,只有对任务的回调被添加到事件循环中。如果事件循环调用任务对象之前它就被垃圾回收了,这将变得很重要。例如,考虑这个程序

 1async def hello():
 2    print("hello!")
 3
 4async def main():
 5    asyncio.create_task(hello())
 6    # Other asynchronous instructions which run for a while
 7    # and cede control to the event loop...
 8    ...
 9
10asyncio.run(main())

因为第 5 行创建的任务对象没有引用,所以它_可能_在事件循环调用它之前被垃圾回收。协程 main() 中的后续指令将控制权交还给事件循环,以便它可以调用其他作业。当事件循环最终尝试运行任务时,它可能会失败并发现任务对象不存在!即使协程保留了对任务的引用但在该任务完成之前协程完成,也可能发生这种情况。当协程退出时,局部变量超出作用域并可能被垃圾回收。asyncio 和 Python 的垃圾回收器在实践中会非常努力地确保这种情况不会发生。但这并不是粗心大意的理由!

await

await 是一个 Python 关键字,通常以两种不同的方式使用

await task
await coroutine

在一个关键方面,await 的行为取决于被等待对象的类型。

等待一个任务将把控制权从当前任务或协程移交给事件循环。在放弃控制权的过程中,会发生一些重要的事情。我们将使用以下代码示例进行说明

async def plant_a_tree():
    dig_the_hole_task = asyncio.create_task(dig_the_hole())
    await dig_the_hole_task

    # Other instructions associated with planting a tree.
    ...

在这个例子中,想象事件循环已将控制权传递给协程 plant_a_tree() 的开始。如上所示,协程创建了一个任务,然后等待它。await dig_the_hole_task 指令将一个回调(它将恢复 plant_a_tree())添加到 dig_the_hole_task 对象的Callbacks列表中。然后,该指令将控制权交给事件循环。一段时间后,事件循环将控制权传递给 dig_the_hole_task,任务将完成它需要做的任何事情。一旦任务完成,它将将其各种回调添加到事件循环中,在这种情况下,是调用以恢复 plant_a_tree()

一般来说,当被等待的任务 (dig_the_hole_task) 完成时,原始任务或协程 (plant_a_tree()) 被添加回事件循环的待办事项列表以恢复。

这是一个基本但可靠的心智模型。在实践中,控制权的移交稍微复杂一些,但不多。在第二部分中,我们将详细介绍实现这一点所需的细节。

与任务不同,等待协程不会将控制权交还给事件循环! 如果先将协程包装在一个任务中,然后等待该任务,则会交出控制权。await coroutine 的行为实际上与调用一个普通的同步 Python 函数相同。考虑以下程序

import asyncio

async def coro_a():
   print("I am coro_a(). Hi!")

async def coro_b():
   print("I am coro_b(). I sure hope no one hogs the event loop...")

async def main():
   task_b = asyncio.create_task(coro_b())
   num_repeats = 3
   for _ in range(num_repeats):
      await coro_a()
   await task_b

asyncio.run(main())

协程 main() 中的第一条语句创建了 task_b 并通过事件循环将其调度执行。然后,coro_a() 被重复等待。控制权从未交还给事件循环,这就是为什么我们在 coro_b() 的输出之前看到所有三个 coro_a() 调用的输出

I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_b(). I sure hope no one hogs the event loop...

如果我们将 await coro_a() 更改为 await asyncio.create_task(coro_a()),行为就会改变。协程 main() 在该语句处将控制权交回给事件循环。事件循环然后继续处理其积压的工作,调用 task_b,然后调用包装 coro_a() 的任务,最后恢复协程 main()

I am coro_b(). I sure hope no one hogs the event loop...
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_a(). Hi!

这种 await coroutine 的行为可能会让很多人感到困惑!这个例子强调了仅仅使用 await coroutine 如何可能无意中霸占其他任务的控制权,从而有效地使事件循环停滞。asyncio.run() 可以通过 debug=True 标志帮助您检测此类情况,该标志启用 调试模式。除其他外,它会记录任何独占执行 100 毫秒或更长时间的协程。

这种设计有意牺牲了一些关于 await 用法的概念清晰性,以换取性能提升。每次等待一个任务时,控制权都需要沿着调用栈一直传递到事件循环。这听起来可能微不足道,但在一个包含许多 await 语句和深层调用栈的大型程序中,这种开销可能会导致显著的性能拖累。

概念性概述第二部分:核心细节

第二部分详细介绍了 asyncio 用于管理控制流的机制。这就是奇迹发生的地方。通过本节,您将了解 await 在幕后做了什么,以及如何创建自己的异步操作符。

协程的内部工作原理

asyncio 利用四个组件来传递控制权。

coroutine.send(arg) 是用于启动或恢复协程的方法。如果协程已暂停并正在恢复,则参数 arg 将作为最初暂停它的 yield 语句的返回值发送进去。如果协程是第一次使用(而不是恢复),arg 必须是 None

 1class Rock:
 2    def __await__(self):
 3        value_sent_in = yield 7
 4        print(f"Rock.__await__ resuming with value: {value_sent_in}.")
 5        return value_sent_in
 6
 7async def main():
 8    print("Beginning coroutine main().")
 9    rock = Rock()
10    print("Awaiting rock...")
11    value_from_rock = await rock
12    print(f"Coroutine received value: {value_from_rock} from rock.")
13    return 23
14
15coroutine = main()
16intermediate_result = coroutine.send(None)
17print(f"Coroutine paused and returned intermediate value: {intermediate_result}.")
18
19print(f"Resuming coroutine and sending in value: 42.")
20try:
21    coroutine.send(42)
22except StopIteration as e:
23    returned_value = e.value
24print(f"Coroutine main() finished and provided value: {returned_value}.")

yield,像往常一样,暂停执行并将控制权返回给调用者。在上面的示例中,第 3 行的 yield 由第 11 行的 ... = await rock 调用。更广泛地说,await 调用给定对象的 __await__() 方法。await 还做了一件非常特殊的事情:它将接收到的任何 yield 传播(或“传递”)到调用链的上方。在本例中,即回到第 16 行的 ... = coroutine.send(None)

协程通过第 21 行的 coroutine.send(42) 调用恢复。协程从第 3 行 yield(或暂停)的地方继续执行,并执行其主体中剩余的语句。当协程完成时,它会引发一个 StopIteration 异常,返回值为 value 属性。

该代码片段产生以下输出

Beginning coroutine main().
Awaiting rock...
Coroutine paused and returned intermediate value: 7.
Resuming coroutine and sending in value: 42.
Rock.__await__ resuming with value: 42.
Coroutine received value: 42 from rock.
Coroutine main() finished and provided value: 23.

值得在此处暂停片刻,确保您理解了控制流和值传递的各种方式。此处涵盖了许多重要概念,确保您的理解牢固是值得的。

从协程中 yield(或有效放弃控制权)的唯一方法是 await 一个在其 __await__ 方法中 yield 的对象。这可能听起来很奇怪。您可能会想

1. 那直接在协程函数内部的 yield 呢?协程函数会变成一个 异步生成器函数,这是一个完全不同的概念。

2. 那在协程函数内部使用 yield from 到一个(普通)生成器呢?那会导致错误:SyntaxError: yield from not allowed in a coroutine. 这是为了简洁而故意设计的——只强制使用协程的一种方式。最初 yield 也被禁止,但为了允许异步生成器而被重新接受。尽管如此,yield fromawait 实际上做的是同一件事。

期货

期货 是一种旨在表示计算状态和结果的对象。这个术语暗示着未来会发生或尚未发生的事情,而该对象就是一种关注该事物的方式。

一个 Future 有几个重要的属性。一个是它的状态,可以是“pending”(待定)、“cancelled”(已取消)或“done”(已完成)。另一个是它的结果,在状态转换为“done”时设置。与协程不同,Future 不代表实际要完成的计算;相反,它代表该计算的状态和结果,有点像一个状态灯(红、黄或绿)或指示器。

asyncio.Task 继承自 asyncio.Future,以获得这些各种功能。上一节说任务存储了一个回调列表,这并不完全准确。实际上是 Future 类实现了这种逻辑,而 Task 继承了它。

Future 也可以直接使用(而不是通过任务)。任务在其协程完成时将自己标记为已完成。Future 更具通用性,当您说完成时就会被标记为完成。通过这种方式,它们是灵活的接口,让您可以为等待和恢复设置自己的条件。

一个自制的 asyncio.sleep

我们将通过一个示例,说明您如何利用 Future 创建自己的异步睡眠 (async_sleep) 变体,它模仿 asyncio.sleep()

此代码片段向事件循环注册了一些任务,然后等待由 asyncio.create_task 创建的任务,该任务包装了 async_sleep(3) 协程。我们希望该任务仅在三秒钟过去后完成,但不能阻止其他任务运行。

async def other_work():
    print("I like work. Work work.")

async def main():
    # Add a few other tasks to the event loop, so there's something
    # to do while asynchronously sleeping.
    work_tasks = [
        asyncio.create_task(other_work()),
        asyncio.create_task(other_work()),
        asyncio.create_task(other_work())
    ]
    print(
        "Beginning asynchronous sleep at time: "
        f"{datetime.datetime.now().strftime("%H:%M:%S")}."
    )
    await asyncio.create_task(async_sleep(3))
    print(
        "Done asynchronous sleep at time: "
        f"{datetime.datetime.now().strftime("%H:%M:%S")}."
    )
    # asyncio.gather effectively awaits each task in the collection.
    await asyncio.gather(*work_tasks)

下面,我们使用一个 Future 来实现对该任务何时被标记为完成的自定义控制。如果从未调用 future.set_result()(负责将该 Future 标记为完成的方法),那么该任务将永远不会完成。我们还借助了另一个任务,我们稍后会看到,它将监控已过去的时间,并相应地调用 future.set_result()

async def async_sleep(seconds: float):
    future = asyncio.Future()
    time_to_wake = time.time() + seconds
    # Add the watcher-task to the event loop.
    watcher_task = asyncio.create_task(_sleep_watcher(future, time_to_wake))
    # Block until the future is marked as done.
    await future

下面,我们使用一个相当简单的 YieldToEventLoop() 对象,从其 __await__ 方法中 yield,将控制权交还给事件循环。这实际上与调用 asyncio.sleep(0) 相同,但这种方法提供了更高的清晰度,更不用说在展示如何实现它时使用 asyncio.sleep 有点作弊!

像往常一样,事件循环遍历其任务,将控制权交给它们,并在它们暂停或完成时收回控制权。watcher_task(运行协程 _sleep_watcher(...))将在事件循环的每个完整周期中被调用一次。每次恢复时,它都会检查时间,如果时间不足,它将再次暂停并将控制权交还给事件循环。一旦时间充足,_sleep_watcher(...) 将 Future 标记为完成,并通过退出其无限 while 循环而完成。鉴于这个辅助任务在事件循环的每个周期中只被调用一次,您会正确地注意到这个异步睡眠将睡眠_至少_三秒,而不是正好三秒。请注意,asyncio.sleep 也是如此。

class YieldToEventLoop:
    def __await__(self):
        yield

async def _sleep_watcher(future, time_to_wake):
    while True:
        if time.time() >= time_to_wake:
            # This marks the future as done.
            future.set_result(None)
            break
        else:
            await YieldToEventLoop()

这是完整程序的输出

$ python custom-async-sleep.py
Beginning asynchronous sleep at time: 14:52:22.
I like work. Work work.
I like work. Work work.
I like work. Work work.
Done asynchronous sleep at time: 14:52:25.

您可能会觉得这种异步睡眠的实现过于复杂。嗯,确实如此。这个例子旨在通过一个简单的例子展示 Future 的多功能性,以便可以模仿它来满足更复杂的需求。作为参考,您可以在没有 Future 的情况下实现它,如下所示

async def simpler_async_sleep(seconds):
    time_to_wake = time.time() + seconds
    while True:
        if time.time() >= time_to_wake:
            return
        else:
            await YieldToEventLoop()

但目前就这些了。希望您已准备好更自信地投入异步编程,或查阅 文档的其余部分 中的高级主题。