协程和任务

本节概述了用于协程和任务的高级 asyncio API。

协程

源代码: Lib/asyncio/coroutines.py


使用 async/await 语法声明的协程是编写 asyncio 应用程序的首选方式。例如,以下代码片段会打印“hello”,等待 1 秒,然后打印“world”

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

请注意,简单地调用协程并不会将其调度为执行

>>> main()
<coroutine object main at 0x1053bb7c8>

要实际运行协程,asyncio 提供了以下机制

  • asyncio.run() 函数用于运行顶层入口点“main()”函数(参见上面的示例)。

  • 等待协程。以下代码片段将等待 1 秒后打印“hello”,然后等待**再** 2 秒后打印“world”

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    预期输出

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • asyncio.create_task() 函数用于将协程作为 asyncio 任务并发运行。

    让我们修改上面的示例,**并发**运行两个 say_after 协程

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    请注意,预期输出现在显示该代码片段比之前快了 1 秒

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    
  • asyncio.TaskGroup 类提供了 create_task() 的更现代的替代方案。使用此 API,上一个示例变为

    async def main():
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(
                say_after(1, 'hello'))
    
            task2 = tg.create_task(
                say_after(2, 'world'))
    
            print(f"started at {time.strftime('%X')}")
    
        # The await is implicit when the context manager exits.
    
        print(f"finished at {time.strftime('%X')}")
    

    时间和输出应与上一个版本相同。

    3.11 版本新增:asyncio.TaskGroup

可等待对象

如果一个对象可以在 await 表达式中使用,我们称之为**可等待**对象。许多 asyncio API 都设计为接受可等待对象。

**可等待**对象主要有三种类型:**协程**、**任务**和**Future**。

协程

Python 协程是**可等待对象**,因此可以从其他协程中等待

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()  # will raise a "RuntimeWarning".

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要

在此文档中,“协程”一词可用于两个密切相关的概念

  • *协程函数*:一个 async def 函数;

  • *协程对象*:调用*协程函数*返回的对象。

任务

**任务**用于**并发**调度协程。

当协程被 asyncio.create_task() 等函数封装成**任务**时,协程会自动调度并尽快运行

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Future

Future 是一种特殊的**低级**可等待对象,表示异步操作的**最终结果**。

当一个 Future 对象被*等待*时,意味着协程将等待 Future 在其他地方得到解决。

asyncio 中的 Future 对象是为了允许基于回调的代码与 async/await 一起使用。

通常**不需要**在应用程序级别代码中创建 Future 对象。

Future 对象,有时由库和一些 asyncio API 公开,可以被等待

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一个返回 Future 对象的低级函数的好例子是 loop.run_in_executor()

创建任务

源代码: Lib/asyncio/tasks.py


asyncio.create_task(coro, *, name=None, context=None, eager_start=None, **kwargs)

将 *coro* 协程封装到 Task 中并调度其执行。返回 Task 对象。

完整的函数签名与 Task 构造函数(或工厂)的签名基本相同——此函数的所有关键字参数都传递给该接口。

一个可选的仅限关键字参数 *context* 允许为 *coro* 指定一个自定义的 contextvars.Context 来运行。如果未提供 *context*,则创建当前上下文的副本。

一个可选的仅限关键字参数 *eager_start* 允许指定任务是否应在调用 create_task 期间急切执行,或稍后调度。如果未传递 *eager_start*,则将使用 loop.set_task_factory() 设置的模式。

任务在 get_running_loop() 返回的循环中执行,如果当前线程中没有运行的循环,则会引发 RuntimeError

备注

asyncio.TaskGroup.create_task() 是一种新的替代方案,利用了结构化并发;它允许以强大的安全保证等待一组相关任务完成。

重要

保存此函数结果的引用,以避免任务在执行中途消失。事件循环只对任务保持弱引用。未在其他地方引用的任务可能随时被垃圾回收,即使在完成之前也是如此。对于可靠的“即发即弃”后台任务,将它们收集到一个集合中

background_tasks = set()

for i in range(10):
    task = asyncio.create_task(some_coro(param=i))

    # Add task to the set. This creates a strong reference.
    background_tasks.add(task)

    # To prevent keeping references to finished tasks forever,
    # make each task remove its own reference from the set after
    # completion:
    task.add_done_callback(background_tasks.discard)

在 3.7 版本加入。

3.8 版本变化:新增了 *name* 参数。

3.11 版本变化:新增了 *context* 参数。

3.14 版本变化:通过传递所有 *kwargs* 增加了 *eager_start* 参数。

任务取消

任务可以轻松安全地取消。当任务被取消时,asyncio.CancelledError 将在下次机会在任务中引发。

建议协程使用 try/finally 块来健壮地执行清理逻辑。如果显式捕获到 asyncio.CancelledError,则在清理完成后通常应将其传播。asyncio.CancelledError 直接继承自 BaseException,因此大多数代码无需了解它。

支持结构化并发的 asyncio 组件,如 asyncio.TaskGroupasyncio.timeout(),内部使用取消来实现,如果协程吞噬了 asyncio.CancelledError,可能会出现异常行为。类似地,用户代码通常不应该调用 uncancel。但是,在真正需要抑制 asyncio.CancelledError 的情况下,还需要调用 uncancel() 以完全移除取消状态。

任务组

任务组结合了任务创建 API 和一种方便可靠的方式来等待组中的所有任务完成。

class asyncio.TaskGroup

一个包含任务组的 异步上下文管理器。任务可以使用 create_task() 添加到组中。当上下文管理器退出时,所有任务都会被等待。

在 3.11 版本中新增。

create_task(coro, *, name=None, context=None, eager_start=None, **kwargs)

在此任务组中创建一个任务。签名与 asyncio.create_task() 的签名匹配。如果任务组不活跃(例如,尚未进入,已完成,或正在关闭),我们将关闭给定的 coro

3.13 版本变化:如果任务组不活跃,则关闭给定的协程。

3.14 版本变化:将所有 *kwargs* 传递给 loop.create_task()

示例

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print(f"Both tasks have completed now: {task1.result()}, {task2.result()}")

async with 语句将等待组中的所有任务完成。在等待期间,仍可以向组中添加新任务(例如,通过将 tg 传递给其中一个协程并在该协程中调用 tg.create_task())。一旦最后一个任务完成并且 async with 块退出,就不能再向组中添加新任务。

当组中任何任务因 asyncio.CancelledError 之外的异常失败时,其余任务将被取消。此后不能再向组中添加任务。此时,如果 async with 语句的主体仍处于活动状态(即,__aexit__() 尚未被调用),则直接包含 async with 语句的任务也将被取消。由此产生的 asyncio.CancelledError 将中断 await,但它不会冒出包含的 async with 语句。

一旦所有任务完成,如果任何任务因 asyncio.CancelledError 之外的异常失败,这些异常将被组合到 ExceptionGroupBaseExceptionGroup 中(根据需要;参见其文档),然后引发。

两个基本异常被特殊处理:如果任何任务因 KeyboardInterruptSystemExit 而失败,任务组仍然会取消其余任务并等待它们,但随后会重新引发最初的 KeyboardInterruptSystemExit,而不是 ExceptionGroupBaseExceptionGroup

如果 async with 语句的主体因异常而退出(因此 __aexit__() 被调用并设置了异常),这与其中一个任务失败的情况相同:其余任务被取消并等待,非取消异常被分组到一个异常组中并引发。传递给 __aexit__() 的异常,除非它是 asyncio.CancelledError,也包含在异常组中。对于 KeyboardInterruptSystemExit,与上一段相同地进行了特殊处理。

任务组会小心地不将用于“唤醒”其 __aexit__() 的内部取消与由其他方发出的对其正在运行的任务的取消请求混淆。特别是,当一个任务组在语法上嵌套在另一个任务组中,并且两者都同时在其子任务之一中遇到异常时,内部任务组将处理其异常,然后外部任务组将收到另一个取消并处理其自身的异常。

在任务组被外部取消并且还必须引发 ExceptionGroup 的情况下,它将调用父任务的 cancel() 方法。这确保了在下一个 await 处将引发 asyncio.CancelledError,因此取消不会丢失。

任务组保留了 asyncio.Task.cancelling() 报告的取消计数。

3.13 版本变化:改进了同时进行的内部和外部取消处理,并正确保留了取消计数。

终止任务组

虽然标准库本身不支持终止任务组,但可以通过向任务组添加一个引发异常的任务并忽略所引发的异常来实现终止

import asyncio
from asyncio import TaskGroup

class TerminateTaskGroup(Exception):
    """Exception raised to terminate a task group."""

async def force_terminate_task_group():
    """Used to force termination of a task group."""
    raise TerminateTaskGroup()

async def job(task_id, sleep_time):
    print(f'Task {task_id}: start')
    await asyncio.sleep(sleep_time)
    print(f'Task {task_id}: done')

async def main():
    try:
        async with TaskGroup() as group:
            # spawn some tasks
            group.create_task(job(1, 0.5))
            group.create_task(job(2, 1.5))
            # sleep for 1 second
            await asyncio.sleep(1)
            # add an exception-raising task to force the group to terminate
            group.create_task(force_terminate_task_group())
    except* TerminateTaskGroup:
        pass

asyncio.run(main())

预期输出

Task 1: start
Task 2: start
Task 1: done

休眠

async asyncio.sleep(delay, result=None)

阻塞 *delay* 秒。

如果提供了 *result*,它将在协程完成时返回给调用者。

sleep() 总是暂停当前任务,允许其他任务运行。

将延迟设置为 0 提供了一种优化路径,以允许其他任务运行。这可以由长时间运行的函数使用,以避免在函数调用期间完全阻塞事件循环。

协程每秒显示当前日期 5 秒的示例

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

版本 3.10 中已更改: 移除了 loop 参数。

3.13 版本变化:如果 *delay* 是 nan,则引发 ValueError

并发运行任务

awaitable asyncio.gather(*aws, return_exceptions=False)

**并发**运行 *aws* 序列中的可等待对象

如果 *aws* 中的任何可等待对象是协程,它会自动调度为任务。

如果所有可等待对象都成功完成,结果是一个返回值的聚合列表。结果值的顺序与 *aws* 中可等待对象的顺序相对应。

如果 *return_exceptions* 为 False(默认),则第一个引发的异常会立即传播到等待 gather() 的任务。*aws* 序列中的其他可等待对象**不会被取消**,并将继续运行。

如果 *return_exceptions* 为 True,异常将与成功结果一样处理,并聚合到结果列表中。

如果 gather() 被**取消**,则所有已提交的可等待对象(尚未完成的)也会被**取消**。

如果 *aws* 序列中的任何 Task 或 Future 被*取消*,则将其视为引发了 CancelledError —— 在这种情况下,gather() 调用**不会**被取消。这是为了防止一个提交的 Task/Future 的取消导致其他 Task/Future 被取消。

备注

创建和运行任务并等待其完成的新替代方案是 asyncio.TaskGroup。*TaskGroup* 为调度子任务嵌套提供了比 *gather* 更强的安全保证:如果一个任务(或子任务,由任务调度的任务)引发异常,*TaskGroup* 将取消剩余的已调度任务,而 *gather* 不会。

示例

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

备注

如果 *return_exceptions* 为 false,则在 gather() 被标记为完成之后取消它不会取消任何已提交的可等待对象。例如,gather 在将异常传播给调用者后可能会被标记为完成,因此,在从 gather 捕获异常(由其中一个可等待对象引发)后调用 gather.cancel() 不会取消任何其他可等待对象。

3.7 版本变化:如果 *gather* 本身被取消,无论 *return_exceptions* 如何,取消都会传播。

版本 3.10 中已更改: 移除了 loop 参数。

自 3.10 版本弃用:如果未提供位置参数或并非所有位置参数都是类 Future 对象且没有运行中的事件循环,则会发出弃用警告。

急切任务工厂

asyncio.eager_task_factory(loop, coro, *, name=None, context=None)

用于急切任务执行的任务工厂。

当使用此工厂(通过 loop.set_task_factory(asyncio.eager_task_factory))时,协程在 Task 构造期间同步开始执行。任务仅在其阻塞时才在事件循环中调度。这可以提高性能,因为对于同步完成的协程,避免了循环调度的开销。

一个有益的常见示例是协程,它们采用缓存或记忆化来尽可能避免实际的 I/O。

备注

协程的立即执行是一个语义变化。如果协程返回或引发,任务永远不会被调度到事件循环。如果协程执行阻塞,任务将被调度到事件循环。此更改可能会给现有应用程序带来行为变化。例如,应用程序的任务执行顺序可能会发生变化。

3.12 新版功能.

asyncio.create_eager_task_factory(custom_task_constructor)

创建一个急切任务工厂,类似于 eager_task_factory(),在创建新任务时使用提供的 *custom_task_constructor* 而不是默认的 Task

*custom_task_constructor* 必须是**可调用对象**,其签名与 Task.__init__ 的签名匹配。该可调用对象必须返回一个与 asyncio.Task 兼容的对象。

此函数返回一个**可调用对象**,旨在通过 loop.set_task_factory(factory)) 用作事件循环的任务工厂。

3.12 新版功能.

避免取消

awaitable asyncio.shield(aw)

保护可等待对象不被取消

如果 *aw* 是协程,它会自动调度为任务。

语句

task = asyncio.create_task(something())
res = await shield(task)

等价于

res = await something()

但是,如果包含它的协程被取消,则在 something() 中运行的任务不会被取消。从 something() 的角度来看,取消没有发生。尽管其调用者仍然被取消,因此“await”表达式仍然引发 CancelledError

如果 something() 被其他方式取消(即从自身内部取消),那也会取消 shield()

如果希望完全忽略取消(不建议),则应将 shield() 函数与 try/except 子句结合使用,如下所示

task = asyncio.create_task(something())
try:
    res = await shield(task)
except CancelledError:
    res = None

重要

保存传递给此函数的任务的引用,以避免任务在执行中途消失。事件循环只对任务保持弱引用。未在其他地方引用的任务可能随时被垃圾回收,即使在完成之前也是如此。

版本 3.10 中已更改: 移除了 loop 参数。

自 3.10 版本弃用:如果 *aw* 不是类 Future 对象且没有运行中的事件循环,则会发出弃用警告。

超时

asyncio.timeout(delay)

返回一个异步上下文管理器,可用于限制等待时间。

*delay* 可以是 None,也可以是一个浮点数/整数的秒数。如果 *delay* 是 None,则不应用时间限制;当创建上下文管理器时延迟未知时,这可能很有用。

在任何一种情况下,上下文管理器都可以在创建后使用 Timeout.reschedule() 重新调度。

示例

async def main():
    async with asyncio.timeout(10):
        await long_running_task()

如果 long_running_task 耗时超过 10 秒才能完成,上下文管理器将取消当前任务并在内部处理由此产生的 asyncio.CancelledError,将其转换为可被捕获和处理的 TimeoutError

备注

asyncio.timeout() 上下文管理器将 asyncio.CancelledError 转换为 TimeoutError,这意味着 TimeoutError 只能在上下文管理器**外部**捕获。

捕获 TimeoutError 的示例

async def main():
    try:
        async with asyncio.timeout(10):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

asyncio.timeout() 生成的上下文管理器可以重新调度到不同的截止日期并进行检查。

class asyncio.Timeout(when)

一个用于取消过期协程的 异步上下文管理器

when 应该是一个绝对时间,上下文应该在该时间超时,由事件循环的时钟测量

  • 如果 whenNone,则超时永远不会触发。

  • 如果 when < loop.time(),则超时将在事件循环的下一次迭代中触发。

when() float | None

返回当前截止日期,如果未设置当前截止日期,则返回 None

reschedule(when: float | None)

重新调度超时。

expired() bool

返回上下文管理器是否已超出其截止日期(已过期)。

示例

async def main():
    try:
        # We do not know the timeout when starting, so we pass ``None``.
        async with asyncio.timeout(None) as cm:
            # We know the timeout now, so we reschedule it.
            new_deadline = get_running_loop().time() + 10
            cm.reschedule(new_deadline)

            await long_running_task()
    except TimeoutError:
        pass

    if cm.expired():
        print("Looks like we haven't finished on time.")

超时上下文管理器可以安全地嵌套。

在 3.11 版本中新增。

asyncio.timeout_at(when)

类似于 asyncio.timeout(),只是 *when* 是停止等待的绝对时间,或 None

示例

async def main():
    loop = get_running_loop()
    deadline = loop.time() + 20
    try:
        async with asyncio.timeout_at(deadline):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

在 3.11 版本中新增。

async asyncio.wait_for(aw, timeout)

等待 *aw* 可等待对象在超时内完成。

如果 *aw* 是协程,它会自动调度为任务。

*timeout* 可以是 None,也可以是等待的浮点数或整数秒数。如果 *timeout* 是 None,则阻塞直到 future 完成。

如果发生超时,它将取消任务并引发 TimeoutError

为了避免任务取消,请将其封装在 shield() 中。

函数将一直等待直到 future 实际被取消,因此总等待时间可能超过 *timeout*。如果在取消期间发生异常,它将被传播。

如果等待被取消,future *aw* 也会被取消。

示例

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

3.7 版本变化:当 *aw* 因超时而被取消时,wait_for 会等待 *aw* 被取消。以前,它会立即引发 TimeoutError

版本 3.10 中已更改: 移除了 loop 参数。

3.11 版本变化:引发 TimeoutError 而不是 asyncio.TimeoutError

等待原语

async asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

并发运行 *aws* 可迭代对象中的 FutureTask 实例,并阻塞直到 *return_when* 指定的条件。

*aws* 可迭代对象不能为空。

返回两组任务/期物:(done, pending)

用法

done, pending = await asyncio.wait(aws)

如果指定了 *timeout*(浮点数或整数),可用于控制返回前的最大等待秒数。

请注意,此函数不会引发 TimeoutError。超时发生时未完成的 Futures 或 Tasks 只会返回在第二个集合中。

*return_when* 指示此函数何时返回。它必须是以下常量之一

常量

描述

asyncio.FIRST_COMPLETED

当任何 future 完成或被取消时,函数将返回。

asyncio.FIRST_EXCEPTION

当任何 future 因引发异常而完成时,函数将返回。如果没有 future 引发异常,则它等同于 ALL_COMPLETED

asyncio.ALL_COMPLETED

当所有 future 完成或被取消时,函数将返回。

wait_for() 不同,wait() 在超时发生时不会取消期物。

版本 3.10 中已更改: 移除了 loop 参数。

3.11 版本变化:直接向 wait() 传递协程对象是被禁止的。

3.12 版本变化:增加了对生成器产生任务的支持。

asyncio.as_completed(aws, *, timeout=None)

并发运行 *aws* 可迭代对象中的可等待对象。返回的对象可以迭代以获取可等待对象完成后的结果。

as_completed() 返回的对象可以作为异步迭代器或普通迭代器进行迭代。当使用异步迭代时,如果提供的可等待对象是任务或期物,则会生成它们。这使得将先前调度的任务与其结果关联起来变得容易。示例

ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]

async for earliest_connect in as_completed(tasks):
    # earliest_connect is done. The result can be obtained by
    # awaiting it or calling earliest_connect.result()
    reader, writer = await earliest_connect

    if earliest_connect is ipv6_connect:
        print("IPv6 connection established.")
    else:
        print("IPv4 connection established.")

在异步迭代期间,对于不是任务或期物的已提供的可等待对象,将生成隐式创建的任务。

当用作普通迭代器时,每次迭代都会产生一个新的协程,该协程返回结果或引发下一个完成的可等待对象的异常。此模式与 Python 3.13 之前的版本兼容

ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]

for next_connect in as_completed(tasks):
    # next_connect is not one of the original task objects. It must be
    # awaited to obtain the result value or raise the exception of the
    # awaitable that finishes next.
    reader, writer = await next_connect

如果在所有可等待对象完成之前发生超时,则会引发 TimeoutError。这由异步迭代期间的 async for 循环或普通迭代期间生成的协程引发。

版本 3.10 中已更改: 移除了 loop 参数。

自 3.10 版本弃用:如果 *aws* 可迭代对象中并非所有可等待对象都是类 Future 对象且没有运行中的事件循环,则会发出弃用警告。

3.12 版本变化:增加了对生成器产生任务的支持。

3.13 版本变化:结果现在既可以用作异步迭代器,也可以用作普通迭代器(以前它只是一个普通迭代器)。

在线程中运行

async asyncio.to_thread(func, /, *args, **kwargs)

在单独的线程中异步运行函数 *func*。

为此函数提供的任何 *args 和 **kwargs 将直接传递给 *func*。此外,当前的 contextvars.Context 将被传播,允许在单独的线程中访问事件循环线程的上下文变量。

返回一个可等待的协程,以获取 *func* 的最终结果。

此协程函数主要用于执行 I/O 密集型函数/方法,否则如果它们在主线程中运行,将阻塞事件循环。例如

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

在任何协程中直接调用 blocking_io() 会在其持续时间内阻塞事件循环,导致额外 1 秒的运行时间。相反,通过使用 asyncio.to_thread(),我们可以在单独的线程中运行它,而不会阻塞事件循环。

备注

由于 GILasyncio.to_thread() 通常只能用于使 I/O 密集型函数非阻塞。但是,对于释放 GIL 的扩展模块或没有 GIL 的替代 Python 实现,asyncio.to_thread() 也可以用于 CPU 密集型函数。

在 3.9 版本中新增。

从其他线程调度

asyncio.run_coroutine_threadsafe(coro, loop)

向给定的事件循环提交一个协程。线程安全。

返回一个 concurrent.futures.Future,用于等待来自另一个操作系统线程的结果。

此函数旨在从与事件循环运行线程不同的操作系统线程中调用。示例

def in_thread(loop: asyncio.AbstractEventLoop) -> None:
    # Run some blocking IO
    pathlib.Path("example.txt").write_text("hello world", encoding="utf8")

    # Create a coroutine
    coro = asyncio.sleep(1, result=3)

    # Submit the coroutine to a given loop
    future = asyncio.run_coroutine_threadsafe(coro, loop)

    # Wait for the result with an optional timeout argument
    assert future.result(timeout=2) == 3

async def amain() -> None:
    # Get the running loop
    loop = asyncio.get_running_loop()

    # Run something in a thread
    await asyncio.to_thread(in_thread, loop)

也可以反向运行。示例

@contextlib.contextmanager
def loop_in_thread() -> Generator[asyncio.AbstractEventLoop]:
    loop_fut = concurrent.futures.Future[asyncio.AbstractEventLoop]()
    stop_event = asyncio.Event()

    async def main() -> None:
        loop_fut.set_result(asyncio.get_running_loop())
        await stop_event.wait()

    with concurrent.futures.ThreadPoolExecutor(1) as tpe:
        complete_fut = tpe.submit(asyncio.run, main())
        for fut in concurrent.futures.as_completed((loop_fut, complete_fut)):
            if fut is loop_fut:
                loop = loop_fut.result()
                try:
                    yield loop
                finally:
                    loop.call_soon_threadsafe(stop_event.set)
            else:
                fut.result()

# Create a loop in another thread
with loop_in_thread() as loop:
    # Create a coroutine
    coro = asyncio.sleep(1, result=3)

    # Submit the coroutine to a given loop
    future = asyncio.run_coroutine_threadsafe(coro, loop)

    # Wait for the result with an optional timeout argument
    assert future.result(timeout=2) == 3

如果在协程中引发异常,则返回的 Future 将收到通知。它还可以用于取消事件循环中的任务

try:
    result = future.result(timeout)
except TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

请参阅文档的并发和多线程部分。

与其他 asyncio 函数不同,此函数需要显式传递 *loop* 参数。

3.5.1 版本新增。

自省

asyncio.current_task(loop=None)

返回当前正在运行的 Task 实例,如果当前没有任务正在运行,则返回 None

如果 *loop* 为 None,则使用 get_running_loop() 获取当前循环。

在 3.7 版本加入。

asyncio.all_tasks(loop=None)

返回由循环运行的尚未完成的 Task 对象的集合。

如果 *loop* 是 None,则使用 get_running_loop() 获取当前循环。

在 3.7 版本加入。

asyncio.iscoroutine(obj)

如果 *obj* 是协程对象,则返回 True

在 3.4 版本加入。

任务对象

class asyncio.Task(coro, *, loop=None, name=None, context=None, eager_start=False)

一个类似Future的对象,运行Python协程。非线程安全。

任务用于在事件循环中运行协程。如果协程等待 Future,任务会暂停协程的执行并等待 Future 完成。当 Future *完成*后,被封装的协程的执行会恢复。

事件循环使用协作调度:一个事件循环一次运行一个任务。当一个任务等待 Future 完成时,事件循环会运行其他任务、回调或执行 I/O 操作。

使用高级 asyncio.create_task() 函数创建任务,或低级 loop.create_task()ensure_future() 函数。不鼓励手动实例化任务。

要取消正在运行的任务,请使用 cancel() 方法。调用它将导致任务向被封装的协程抛出 CancelledError 异常。如果协程在取消期间正在等待 Future 对象,则 Future 对象将被取消。

cancelled() 可用于检查任务是否被取消。如果封装的协程没有抑制 CancelledError 异常并实际被取消,则该方法返回 True

asyncio.Task 继承了 Future 的所有 API,除了 Future.set_result()Future.set_exception()

一个可选的仅限关键字参数 *context* 允许为 *coro* 指定一个自定义的 contextvars.Context 来运行。如果未提供 *context*,则任务复制当前上下文,并在复制的上下文中运行其协程。

可选的仅限关键字参数 *eager_start* 允许在任务创建时急切地启动 asyncio.Task 的执行。如果设置为 True 且事件循环正在运行,任务将立即开始执行协程,直到协程第一次阻塞。如果协程在不阻塞的情况下返回或引发,任务将急切完成并跳过调度到事件循环。

3.7 版本变化:增加了对 contextvars 模块的支持。

3.8 版本变化:新增了 *name* 参数。

自 3.10 版本弃用:如果未指定 *loop* 且没有运行中的事件循环,则会发出弃用警告。

3.11 版本变化:新增了 *context* 参数。

3.12 版本变化:新增了 *eager_start* 参数。

done()

如果任务*完成*,则返回 True

当封装的协程返回一个值、引发一个异常或任务被取消时,任务就*完成*了。

result()

返回任务的结果。

如果任务*完成*,则返回封装协程的结果(如果协程引发了异常,则重新引发该异常)。

如果任务已被*取消*,此方法将引发 CancelledError 异常。

如果任务的结果尚未可用,此方法将引发 InvalidStateError 异常。

exception()

返回任务的异常。

如果封装的协程引发了异常,则返回该异常。如果封装的协程正常返回,此方法返回 None

如果任务已被*取消*,此方法将引发 CancelledError 异常。

如果任务尚未*完成*,此方法将引发 InvalidStateError 异常。

add_done_callback(callback, *, context=None)

添加一个回调函数,当任务*完成*时运行。

此方法仅应在低级基于回调的代码中使用。

有关更多详细信息,请参阅 Future.add_done_callback() 的文档。

remove_done_callback(callback)

从回调列表中移除 callback

此方法仅应在低级基于回调的代码中使用。

有关更多详细信息,请参阅 Future.remove_done_callback() 的文档。

get_stack(*, limit=None)

返回此 Task 的堆栈帧列表。

如果包装的协程尚未完成,则此方法返回其暂停时的堆栈。如果协程已成功完成或已取消,则此方法返回一个空列表。如果协程因异常而终止,则此方法返回回溯帧列表。

帧始终按从最旧到最新的顺序排列。

对于暂停的协程,只返回一个堆栈帧。

可选的 limit 参数设置要返回的最大帧数;默认情况下,返回所有可用帧。返回列表的顺序取决于返回的是堆栈还是回溯:返回堆栈的最新帧,但返回回溯的最旧帧。(这与 traceback 模块的行为匹配。)

print_stack(*, limit=None, file=None)

打印此 Task 的堆栈或回溯。

此方法生成的输出类似于 traceback 模块对 get_stack() 检索到的帧的输出。

limit 参数直接传递给 get_stack()

file 参数是一个 I/O 流,输出将写入其中;默认情况下,输出写入 sys.stdout

get_coro()

返回由 Task 包装的协程对象。

备注

对于已急切完成的 Task,此方法将返回 None。请参阅 急切任务工厂

在 3.8 版本加入。

版本 3.12 中的变更: 新添加的急切任务执行意味着结果可能为 None

get_context()

返回与任务关联的 contextvars.Context 对象。

3.12 新版功能.

get_name()

返回 Task 的名称。

如果未明确为 Task 分配名称,则默认的 asyncio Task 实现会在实例化期间生成一个默认名称。

在 3.8 版本加入。

set_name(value)

设置 Task 的名称。

value 参数可以是任何对象,然后将其转换为字符串。

在默认的 Task 实现中,名称将在 Task 对象的 repr() 输出中可见。

在 3.8 版本加入。

cancel(msg=None)

请求取消任务。

如果任务已 完成已取消,则返回 False,否则返回 True

此方法安排在事件循环的下一个周期中向包装的协程抛出 CancelledError 异常。

然后,协程有机会通过使用 try …… except CancelledError …… finally 块来清理甚至拒绝请求。因此,与 Future.cancel() 不同,Task.cancel() 不能保证任务会被取消,尽管完全抑制取消并不常见,并且强烈不建议这样做。如果协程仍然决定抑制取消,它除了捕获异常外,还需要调用 Task.uncancel()

版本 3.9 中的变更: 添加了 msg 参数。

版本 3.11 中的变更: msg 参数从取消的任务传播到其等待者。

以下示例说明了协程如何截获取消请求

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()

如果 Task 已 取消,则返回 True

当使用 cancel() 请求取消并且包装的协程传播了抛入其中的 CancelledError 异常时,Task 就会被 取消

uncancel()

减少对此 Task 的取消请求计数。

返回剩余的取消请求数量。

请注意,一旦已取消任务的执行完成,对 uncancel() 的后续调用将无效。

在 3.11 版本中新增。

此方法由 asyncio 的内部机制使用,不应由最终用户代码使用。特别是,如果 Task 成功取消,这将允许结构化并发的元素(如 Task Groupsasyncio.timeout())继续运行,将取消隔离到相应的结构化块。例如

async def make_request_with_timeout():
    try:
        async with asyncio.timeout(1):
            # Structured block affected by the timeout:
            await make_request()
            await make_another_request()
    except TimeoutError:
        log("There was a timeout")
    # Outer code not affected by the timeout:
    await unrelated_code()

虽然包含 make_request()make_another_request() 的块可能会由于超时而被取消,但即使在超时情况下,unrelated_code() 也应该继续运行。这是通过 uncancel() 实现的。TaskGroup 上下文管理器以类似的方式使用 uncancel()

如果最终用户代码由于某种原因通过捕获 CancelledError 来抑制取消,则需要调用此方法来移除取消状态。

当此方法将取消计数递减到零时,该方法会检查之前的 cancel() 调用是否已安排将 CancelledError 抛入任务中。如果尚未抛出,则该安排将被撤销(通过重置内部 _must_cancel 标志)。

版本 3.13 中的变更: 更改为在达到零时撤销待处理的取消请求。

cancelling()

返回对此 Task 的待处理取消请求数量,即调用 cancel() 的次数减去调用 uncancel() 的次数。

请注意,如果此数字大于零但 Task 仍在执行,cancelled() 仍将返回 False。这是因为此数字可以通过调用 uncancel() 来降低,如果取消请求降至零,这可能导致任务最终不会被取消。

此方法由 asyncio 的内部机制使用,不应由最终用户代码使用。有关更多详细信息,请参阅 uncancel()

在 3.11 版本中新增。