asyncio
的概念性概述¶
这篇 HOWTO 文章旨在帮助您建立一个关于 asyncio
基本工作原理的扎实心智模型,帮助您理解推荐模式的“何为”和“为何”。
您可能对一些关键的 asyncio
概念感到好奇。通过阅读本文,您将能够轻松回答以下问题:
当一个对象被 awaited 时,幕后发生了什么?
asyncio
如何区分不需要 CPU 时间的任务(例如网络请求或文件读取)和需要 CPU 时间的任务(例如计算 n 阶乘)?如何编写一个操作的异步变体,例如异步睡眠或数据库请求。
参见
这篇 HOWTO 文章的灵感来源:Alexander Nordin 的 指南。
Python 核心团队成员 Łukasz Langa 创建的关于
asyncio
的深入 YouTube 视频教程系列。500 行或更少代码:使用 asyncio 协程实现的网络爬虫,作者 A. Jesse Jiryu Davis 和 Guido van Rossum。
概念性概述第一部分:高层¶
在第一部分中,我们将介绍 asyncio
的主要高层构建块:事件循环、协程函数、协程对象、任务和 await
。
事件循环¶
asyncio
中的一切都相对于事件循环发生。它是演出的明星。它就像一个管弦乐队的指挥。它在幕后管理资源。一些权力明确地授予给它,但它完成工作的许多能力来自于其工蜂的尊重和合作。
更技术性地说,事件循环包含了一组要运行的作业。有些作业是您直接添加的,有些是由 asyncio
间接添加的。事件循环从其积压的工作中取出一个作业并调用它(或“将控制权交给它”),类似于调用一个函数,然后该作业运行。一旦它暂停或完成,它就会将控制权返回给事件循环。事件循环然后从其池中选择另一个作业并调用它。您可以**大致**将作业集合视为一个队列:作业被添加,然后一次处理一个,通常(但不总是)按顺序进行。这个过程无限重复,事件循环无休止地循环。如果没有更多待执行的作业,事件循环会足够聪明地休息,避免不必要地浪费 CPU 周期,并在有更多工作要做时返回。
有效的执行依赖于作业之间的良好共享和协作;一个贪婪的作业可能会霸占控制权并让其他作业挨饿,从而使整体的事件循环方法变得相当无用。
import asyncio
# This creates an event loop and indefinitely cycles through
# its collection of jobs.
event_loop = asyncio.new_event_loop()
event_loop.run_forever()
异步函数和协程¶
这是一个基本的、无聊的 Python 函数
def hello_printer():
print(
"Hi, I am a lowly, simple printer, though I have all I "
"need in life -- \nfresh paper and my dearly beloved octopus "
"partner in crime."
)
调用常规函数会执行其逻辑或主体
>>> hello_printer()
Hi, I am a lowly, simple printer, though I have all I need in life --
fresh paper and my dearly beloved octopus partner in crime.
async def,与普通的 def
不同,使其成为一个异步函数(或“协程函数”)。调用它会创建并返回一个 协程 对象。
async def loudmouth_penguin(magic_number: int):
print(
"I am a super special talking penguin. Far cooler than that printer. "
f"By the way, my lucky number is: {magic_number}."
)
调用异步函数 loudmouth_penguin
并不会执行 print 语句;相反,它会创建一个协程对象
>>> loudmouth_penguin(magic_number=3)
<coroutine object loudmouth_penguin at 0x104ed2740>
术语“协程函数”和“协程对象”经常被混淆为协程。这可能会令人困惑!在本文中,协程特指协程对象,或者更确切地说,是 types.CoroutineType
的实例(原生协程)。请注意,协程也可以作为 collections.abc.Coroutine
的实例存在——这是一个对类型检查很重要的区别。
协程表示函数的主体或逻辑。协程必须显式启动;同样,仅仅创建协程并不会启动它。值得注意的是,协程可以在函数主体内的各个点暂停和恢复。这种暂停和恢复的能力正是实现异步行为的关键!
协程和协程函数是通过利用 生成器 和 生成器函数 的功能构建的。回想一下,生成器函数是一个 yield
的函数,就像这个一样
def get_random_number():
# This would be a bad random number generator!
print("Hi")
yield 1
print("Hello")
yield 7
print("Howdy")
yield 4
...
类似于协程函数,调用生成器函数不会运行它。相反,它会创建一个生成器对象
>>> get_random_number()
<generator object get_random_number at 0x1048671c0>
您可以通过使用内置函数 next()
来进入生成器的下一个 yield
。换句话说,生成器运行,然后暂停。例如:
>>> generator = get_random_number()
>>> next(generator)
Hi
1
>>> next(generator)
Hello
7
任务¶
粗略地说,任务 是与事件循环绑定的协程(而不是协程函数)。任务还维护一个回调函数列表,当我们讨论 await
时,它们的重要性将变得清晰。推荐创建任务的方式是通过 asyncio.create_task()
。
创建任务会自动安排其执行(通过向事件循环的待办事项列表,即作业集合,添加一个回调来运行它)。
由于(在每个线程中)只有一个事件循环,asyncio
会为您处理将任务与事件循环关联起来。因此,无需指定事件循环。
coroutine = loudmouth_penguin(magic_number=5)
# This creates a Task object and schedules its execution via the event loop.
task = asyncio.create_task(coroutine)
之前,我们手动创建了事件循环并将其设置为永远运行。实际上,建议使用(也常见)asyncio.run()
,它负责管理事件循环并确保提供的协程在继续之前完成。例如,许多异步程序遵循这种设置
import asyncio
async def main():
# Perform all sorts of wacky, wild asynchronous things...
...
if __name__ == "__main__":
asyncio.run(main())
# The program will not reach the following print statement until the
# coroutine main() finishes.
print("coroutine main() is done!")
重要的是要意识到任务本身并没有添加到事件循环中,只有对任务的回调被添加到事件循环中。如果事件循环调用任务对象之前它就被垃圾回收了,这将变得很重要。例如,考虑这个程序
1async def hello():
2 print("hello!")
3
4async def main():
5 asyncio.create_task(hello())
6 # Other asynchronous instructions which run for a while
7 # and cede control to the event loop...
8 ...
9
10asyncio.run(main())
因为第 5 行创建的任务对象没有引用,所以它_可能_在事件循环调用它之前被垃圾回收。协程 main()
中的后续指令将控制权交还给事件循环,以便它可以调用其他作业。当事件循环最终尝试运行任务时,它可能会失败并发现任务对象不存在!即使协程保留了对任务的引用但在该任务完成之前协程完成,也可能发生这种情况。当协程退出时,局部变量超出作用域并可能被垃圾回收。asyncio
和 Python 的垃圾回收器在实践中会非常努力地确保这种情况不会发生。但这并不是粗心大意的理由!
await¶
await
是一个 Python 关键字,通常以两种不同的方式使用
await task
await coroutine
在一个关键方面,await
的行为取决于被等待对象的类型。
等待一个任务将把控制权从当前任务或协程移交给事件循环。在放弃控制权的过程中,会发生一些重要的事情。我们将使用以下代码示例进行说明
async def plant_a_tree():
dig_the_hole_task = asyncio.create_task(dig_the_hole())
await dig_the_hole_task
# Other instructions associated with planting a tree.
...
在这个例子中,想象事件循环已将控制权传递给协程 plant_a_tree()
的开始。如上所示,协程创建了一个任务,然后等待它。await dig_the_hole_task
指令将一个回调(它将恢复 plant_a_tree()
)添加到 dig_the_hole_task
对象的Callbacks列表中。然后,该指令将控制权交给事件循环。一段时间后,事件循环将控制权传递给 dig_the_hole_task
,任务将完成它需要做的任何事情。一旦任务完成,它将将其各种回调添加到事件循环中,在这种情况下,是调用以恢复 plant_a_tree()
。
一般来说,当被等待的任务 (dig_the_hole_task
) 完成时,原始任务或协程 (plant_a_tree()
) 被添加回事件循环的待办事项列表以恢复。
这是一个基本但可靠的心智模型。在实践中,控制权的移交稍微复杂一些,但不多。在第二部分中,我们将详细介绍实现这一点所需的细节。
与任务不同,等待协程不会将控制权交还给事件循环! 如果先将协程包装在一个任务中,然后等待该任务,则会交出控制权。await coroutine
的行为实际上与调用一个普通的同步 Python 函数相同。考虑以下程序
import asyncio
async def coro_a():
print("I am coro_a(). Hi!")
async def coro_b():
print("I am coro_b(). I sure hope no one hogs the event loop...")
async def main():
task_b = asyncio.create_task(coro_b())
num_repeats = 3
for _ in range(num_repeats):
await coro_a()
await task_b
asyncio.run(main())
协程 main()
中的第一条语句创建了 task_b
并通过事件循环将其调度执行。然后,coro_a()
被重复等待。控制权从未交还给事件循环,这就是为什么我们在 coro_b()
的输出之前看到所有三个 coro_a()
调用的输出
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_b(). I sure hope no one hogs the event loop...
如果我们将 await coro_a()
更改为 await asyncio.create_task(coro_a())
,行为就会改变。协程 main()
在该语句处将控制权交回给事件循环。事件循环然后继续处理其积压的工作,调用 task_b
,然后调用包装 coro_a()
的任务,最后恢复协程 main()
。
I am coro_b(). I sure hope no one hogs the event loop...
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_a(). Hi!
这种 await coroutine
的行为可能会让很多人感到困惑!这个例子强调了仅仅使用 await coroutine
如何可能无意中霸占其他任务的控制权,从而有效地使事件循环停滞。asyncio.run()
可以通过 debug=True
标志帮助您检测此类情况,该标志启用 调试模式。除其他外,它会记录任何独占执行 100 毫秒或更长时间的协程。
这种设计有意牺牲了一些关于 await
用法的概念清晰性,以换取性能提升。每次等待一个任务时,控制权都需要沿着调用栈一直传递到事件循环。这听起来可能微不足道,但在一个包含许多 await
语句和深层调用栈的大型程序中,这种开销可能会导致显著的性能拖累。
概念性概述第二部分:核心细节¶
第二部分详细介绍了 asyncio
用于管理控制流的机制。这就是奇迹发生的地方。通过本节,您将了解 await
在幕后做了什么,以及如何创建自己的异步操作符。
协程的内部工作原理¶
asyncio
利用四个组件来传递控制权。
coroutine.send(arg)
是用于启动或恢复协程的方法。如果协程已暂停并正在恢复,则参数 arg
将作为最初暂停它的 yield
语句的返回值发送进去。如果协程是第一次使用(而不是恢复),arg
必须是 None
。
1class Rock:
2 def __await__(self):
3 value_sent_in = yield 7
4 print(f"Rock.__await__ resuming with value: {value_sent_in}.")
5 return value_sent_in
6
7async def main():
8 print("Beginning coroutine main().")
9 rock = Rock()
10 print("Awaiting rock...")
11 value_from_rock = await rock
12 print(f"Coroutine received value: {value_from_rock} from rock.")
13 return 23
14
15coroutine = main()
16intermediate_result = coroutine.send(None)
17print(f"Coroutine paused and returned intermediate value: {intermediate_result}.")
18
19print(f"Resuming coroutine and sending in value: 42.")
20try:
21 coroutine.send(42)
22except StopIteration as e:
23 returned_value = e.value
24print(f"Coroutine main() finished and provided value: {returned_value}.")
yield,像往常一样,暂停执行并将控制权返回给调用者。在上面的示例中,第 3 行的 yield
由第 11 行的 ... = await rock
调用。更广泛地说,await
调用给定对象的 __await__()
方法。await
还做了一件非常特殊的事情:它将接收到的任何 yield
传播(或“传递”)到调用链的上方。在本例中,即回到第 16 行的 ... = coroutine.send(None)
。
协程通过第 21 行的 coroutine.send(42)
调用恢复。协程从第 3 行 yield
(或暂停)的地方继续执行,并执行其主体中剩余的语句。当协程完成时,它会引发一个 StopIteration
异常,返回值为 value
属性。
该代码片段产生以下输出
Beginning coroutine main().
Awaiting rock...
Coroutine paused and returned intermediate value: 7.
Resuming coroutine and sending in value: 42.
Rock.__await__ resuming with value: 42.
Coroutine received value: 42 from rock.
Coroutine main() finished and provided value: 23.
值得在此处暂停片刻,确保您理解了控制流和值传递的各种方式。此处涵盖了许多重要概念,确保您的理解牢固是值得的。
从协程中 yield
(或有效放弃控制权)的唯一方法是 await
一个在其 __await__
方法中 yield
的对象。这可能听起来很奇怪。您可能会想
1. 那直接在协程函数内部的
yield
呢?协程函数会变成一个 异步生成器函数,这是一个完全不同的概念。2. 那在协程函数内部使用 yield from 到一个(普通)生成器呢?那会导致错误:
SyntaxError: yield from not allowed in a coroutine.
这是为了简洁而故意设计的——只强制使用协程的一种方式。最初yield
也被禁止,但为了允许异步生成器而被重新接受。尽管如此,yield from
和await
实际上做的是同一件事。
期货¶
期货 是一种旨在表示计算状态和结果的对象。这个术语暗示着未来会发生或尚未发生的事情,而该对象就是一种关注该事物的方式。
一个 Future 有几个重要的属性。一个是它的状态,可以是“pending”(待定)、“cancelled”(已取消)或“done”(已完成)。另一个是它的结果,在状态转换为“done”时设置。与协程不同,Future 不代表实际要完成的计算;相反,它代表该计算的状态和结果,有点像一个状态灯(红、黄或绿)或指示器。
asyncio.Task
继承自 asyncio.Future
,以获得这些各种功能。上一节说任务存储了一个回调列表,这并不完全准确。实际上是 Future
类实现了这种逻辑,而 Task
继承了它。
Future 也可以直接使用(而不是通过任务)。任务在其协程完成时将自己标记为已完成。Future 更具通用性,当您说完成时就会被标记为完成。通过这种方式,它们是灵活的接口,让您可以为等待和恢复设置自己的条件。
一个自制的 asyncio.sleep¶
我们将通过一个示例,说明您如何利用 Future 创建自己的异步睡眠 (async_sleep
) 变体,它模仿 asyncio.sleep()
。
此代码片段向事件循环注册了一些任务,然后等待由 asyncio.create_task
创建的任务,该任务包装了 async_sleep(3)
协程。我们希望该任务仅在三秒钟过去后完成,但不能阻止其他任务运行。
async def other_work():
print("I like work. Work work.")
async def main():
# Add a few other tasks to the event loop, so there's something
# to do while asynchronously sleeping.
work_tasks = [
asyncio.create_task(other_work()),
asyncio.create_task(other_work()),
asyncio.create_task(other_work())
]
print(
"Beginning asynchronous sleep at time: "
f"{datetime.datetime.now().strftime("%H:%M:%S")}."
)
await asyncio.create_task(async_sleep(3))
print(
"Done asynchronous sleep at time: "
f"{datetime.datetime.now().strftime("%H:%M:%S")}."
)
# asyncio.gather effectively awaits each task in the collection.
await asyncio.gather(*work_tasks)
下面,我们使用一个 Future 来实现对该任务何时被标记为完成的自定义控制。如果从未调用 future.set_result()
(负责将该 Future 标记为完成的方法),那么该任务将永远不会完成。我们还借助了另一个任务,我们稍后会看到,它将监控已过去的时间,并相应地调用 future.set_result()
。
async def async_sleep(seconds: float):
future = asyncio.Future()
time_to_wake = time.time() + seconds
# Add the watcher-task to the event loop.
watcher_task = asyncio.create_task(_sleep_watcher(future, time_to_wake))
# Block until the future is marked as done.
await future
下面,我们使用一个相当简单的 YieldToEventLoop()
对象,从其 __await__
方法中 yield
,将控制权交还给事件循环。这实际上与调用 asyncio.sleep(0)
相同,但这种方法提供了更高的清晰度,更不用说在展示如何实现它时使用 asyncio.sleep
有点作弊!
像往常一样,事件循环遍历其任务,将控制权交给它们,并在它们暂停或完成时收回控制权。watcher_task
(运行协程 _sleep_watcher(...)
)将在事件循环的每个完整周期中被调用一次。每次恢复时,它都会检查时间,如果时间不足,它将再次暂停并将控制权交还给事件循环。一旦时间充足,_sleep_watcher(...)
将 Future 标记为完成,并通过退出其无限 while
循环而完成。鉴于这个辅助任务在事件循环的每个周期中只被调用一次,您会正确地注意到这个异步睡眠将睡眠_至少_三秒,而不是正好三秒。请注意,asyncio.sleep
也是如此。
class YieldToEventLoop:
def __await__(self):
yield
async def _sleep_watcher(future, time_to_wake):
while True:
if time.time() >= time_to_wake:
# This marks the future as done.
future.set_result(None)
break
else:
await YieldToEventLoop()
这是完整程序的输出
$ python custom-async-sleep.py
Beginning asynchronous sleep at time: 14:52:22.
I like work. Work work.
I like work. Work work.
I like work. Work work.
Done asynchronous sleep at time: 14:52:25.
您可能会觉得这种异步睡眠的实现过于复杂。嗯,确实如此。这个例子旨在通过一个简单的例子展示 Future 的多功能性,以便可以模仿它来满足更复杂的需求。作为参考,您可以在没有 Future 的情况下实现它,如下所示
async def simpler_async_sleep(seconds):
time_to_wake = time.time() + seconds
while True:
if time.time() >= time_to_wake:
return
else:
await YieldToEventLoop()
但目前就这些了。希望您已准备好更自信地投入异步编程,或查阅 文档的其余部分
中的高级主题。