协程和任务

本节概述了用于处理协程和任务的高级 asyncio API。

协程

源代码: Lib/asyncio/coroutines.py


使用 async/await 语法声明的协程是编写 asyncio 应用程序的首选方法。例如,以下代码片段会打印 “hello”,等待 1 秒,然后打印 “world”

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

请注意,简单地调用协程不会将其安排为执行

>>> main()
<coroutine object main at 0x1053bb7c8>

为了实际运行协程,asyncio 提供了以下机制

  • asyncio.run() 函数用于运行顶层入口点 “main()” 函数(请参见上面的示例。)

  • 等待协程。以下代码片段将在等待 1 秒后打印 “hello”,然后在等待另外 2 秒后打印 “world”

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    预期输出

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • asyncio.create_task() 函数用于将协程作为 asyncio 任务并发运行。

    让我们修改上面的示例并并发运行两个 say_after 协程

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    请注意,预期输出现在表明该代码片段比以前快 1 秒运行

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    
  • asyncio.TaskGroup 类提供了 create_task() 更现代的替代方案。使用此 API,上一个示例变为

    async def main():
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(
                say_after(1, 'hello'))
    
            task2 = tg.create_task(
                say_after(2, 'world'))
    
            print(f"started at {time.strftime('%X')}")
    
        # The await is implicit when the context manager exits.
    
        print(f"finished at {time.strftime('%X')}")
    

    时间和输出应与以前的版本相同。

    3.11 版本新增: asyncio.TaskGroup

可等待对象

如果一个对象可以在 await 表达式中使用,我们称其为可等待对象。许多 asyncio API 都被设计为接受可等待对象。

主要有三种类型的可等待对象:协程任务Future

协程

Python 协程是可等待的,因此可以从其他协程中等待

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()  # will raise a "RuntimeWarning".

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要

在此文档中,术语“协程”可以用于两个密切相关的概念

  • 一个协程函数:一个 async def 函数;

  • 一个协程对象:调用协程函数返回的对象。

任务

任务用于并发调度协程。

当使用诸如 asyncio.create_task() 之类的函数将协程包装到 Task 中时,协程会自动安排为立即运行

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Future

Future 是一个特殊的低级可等待对象,它表示异步操作的最终结果

当一个 Future 对象被等待时,这意味着协程将等待,直到 Future 在其他地方被解析。

asyncio 中的 Future 对象是必需的,以便允许基于回调的代码与 async/await 一起使用。

通常无需在应用程序级别的代码中创建 Future 对象。

Future 对象,有时由库和某些 asyncio API 公开,可以被等待

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

返回 Future 对象的低级函数的一个很好的例子是 loop.run_in_executor()

创建任务

源代码: Lib/asyncio/tasks.py


asyncio.create_task(coro, *, name=None, context=None)

coro 协程包装到 Task 中并安排其执行。返回 Task 对象。

如果 name 不是 None,则使用 Task.set_name() 将其设置为任务的名称。

可选的仅关键字 context 参数允许为 coro 指定自定义的 contextvars.Context 来运行。如果没有提供 context,则创建当前上下文的副本。

该任务在 get_running_loop() 返回的循环中执行,如果当前线程中没有正在运行的循环,则引发 RuntimeError

注意

asyncio.TaskGroup.create_task() 是一种利用结构化并发的新替代方案;它允许等待一组具有强大安全保证的相关任务。

重要

保存对此函数结果的引用,以避免任务在执行过程中消失。事件循环仅保留对任务的弱引用。没有在其他地方引用的任务可能会在任何时候被垃圾回收,甚至在完成之前。对于可靠的“fire-and-forget”后台任务,请将它们收集到集合中

background_tasks = set()

for i in range(10):
    task = asyncio.create_task(some_coro(param=i))

    # Add task to the set. This creates a strong reference.
    background_tasks.add(task)

    # To prevent keeping references to finished tasks forever,
    # make each task remove its own reference from the set after
    # completion:
    task.add_done_callback(background_tasks.discard)

3.7 版本新增。

在 3.8 版本中更改: 添加了 name 参数。

在 3.11 版本中更改: 添加了 context 参数。

任务取消

可以轻松安全地取消任务。取消任务后,将在下次机会在任务中引发 asyncio.CancelledError

建议协程使用 try/finally 块来可靠地执行清理逻辑。如果显式捕获了 asyncio.CancelledError,则通常应在清理完成后传播它。asyncio.CancelledError 直接继承 BaseException,因此大多数代码不需要知道它。

asyncio.TaskGroupasyncio.timeout() 这样启用结构化并发的 asyncio 组件,其内部实现使用了取消机制,如果协程吞噬了 asyncio.CancelledError,可能会出现异常行为。同样,用户代码通常不应调用 uncancel。但是,在确实需要抑制 asyncio.CancelledError 的情况下,也必须调用 uncancel() 以完全移除取消状态。

任务组

任务组结合了任务创建 API 以及一种方便可靠的方式来等待组中的所有任务完成。

class asyncio.TaskGroup

一个 异步上下文管理器,持有一组任务。可以使用 create_task() 将任务添加到组中。当上下文管理器退出时,将等待所有任务完成。

3.11 版本新增。

create_task(coro, *, name=None, context=None)

在此任务组中创建一个任务。签名与 asyncio.create_task() 的签名匹配。如果任务组处于非活动状态(例如,尚未进入、已经完成或正在关闭过程中),我们将关闭给定的 coro

在 3.13 版本中更改: 如果任务组未处于活动状态,则关闭给定的协程。

示例

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print(f"Both tasks have completed now: {task1.result()}, {task2.result()}")

async with 语句将等待组中的所有任务完成。在等待时,仍然可以将新任务添加到组中(例如,通过将 tg 传递到其中一个协程并在该协程中调用 tg.create_task())。一旦最后一个任务完成并且 async with 块退出,则无法将新任务添加到组中。

当组中的任何一个任务首次因 asyncio.CancelledError 之外的异常而失败时,组中其余的任务将被取消。此时无法再向组中添加任何新任务。此时,如果 async with 语句的主体仍处于活动状态(即,尚未调用 __aexit__()),则直接包含 async with 语句的任务也将被取消。产生的 asyncio.CancelledError 将中断 await,但它不会从包含的 async with 语句中冒泡出来。

一旦所有任务完成,如果有任何任务因 asyncio.CancelledError 之外的异常而失败,这些异常将合并到 ExceptionGroupBaseExceptionGroup 中(视情况而定;请参阅其文档),然后引发该异常。

有两个基本异常会被特殊处理:如果任何任务因 KeyboardInterruptSystemExit 而失败,则任务组仍会取消其余的任务并等待它们,但随后将重新引发初始的 KeyboardInterruptSystemExit,而不是 ExceptionGroupBaseExceptionGroup

如果 async with 语句的主体因异常而退出(因此调用了带有异常集的 __aexit__()),则这被视为与其中一个任务失败相同:其余任务将被取消并等待,并且非取消异常将被分组到异常组中并引发。传递到 __aexit__() 的异常(除非它是 asyncio.CancelledError)也会包含在异常组中。对于 KeyboardInterruptSystemExit,也会像上一段中那样进行特殊处理。

任务组会小心地不将用于“唤醒”其 __aexit__() 的内部取消与由其他方对其正在运行的任务发出的取消请求混淆。特别是,当一个任务组在语法上嵌套在另一个任务组中,并且两者同时在其子任务之一中遇到异常时,内部任务组将处理其异常,然后外部任务组将接收另一个取消并处理其自己的异常。

在任务组在外部被取消并且还必须引发 ExceptionGroup 的情况下,它将调用父任务的 cancel() 方法。这确保将在下一个 await 处引发 asyncio.CancelledError,这样取消就不会丢失。

任务组会保留 asyncio.Task.cancelling() 报告的取消计数。

在 3.13 版本中更改: 改进了对同时发生的内部和外部取消的处理,并正确保留了取消计数。

终止任务组

虽然标准库本身不支持终止任务组,但可以通过将引发异常的任务添加到任务组并忽略引发的异常来实现终止。

import asyncio
from asyncio import TaskGroup

class TerminateTaskGroup(Exception):
    """Exception raised to terminate a task group."""

async def force_terminate_task_group():
    """Used to force termination of a task group."""
    raise TerminateTaskGroup()

async def job(task_id, sleep_time):
    print(f'Task {task_id}: start')
    await asyncio.sleep(sleep_time)
    print(f'Task {task_id}: done')

async def main():
    try:
        async with TaskGroup() as group:
            # spawn some tasks
            group.create_task(job(1, 0.5))
            group.create_task(job(2, 1.5))
            # sleep for 1 second
            await asyncio.sleep(1)
            # add an exception-raising task to force the group to terminate
            group.create_task(force_terminate_task_group())
    except* TerminateTaskGroup:
        pass

asyncio.run(main())

预期输出

Task 1: start
Task 2: start
Task 1: done

休眠

coroutine asyncio.sleep(delay, result=None)

阻塞 *delay* 秒。

如果提供了 *result*,则当协程完成时,它将返回给调用者。

sleep() 总是会挂起当前任务,允许其他任务运行。

将延迟设置为 0 提供了一种优化路径,允许其他任务运行。长时间运行的函数可以使用此方法来避免阻塞事件循环的整个函数调用持续时间。

每秒显示当前日期 5 秒的协程示例

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

在 3.10 版本中更改: 删除了 *loop* 参数。

在 3.13 版本中更改: 如果 *delay* 为 nan,则引发 ValueError

并发运行任务

awaitable asyncio.gather(*aws, return_exceptions=False)

并发运行 aws 序列中的 可等待对象

如果 aws 中的任何可等待对象是协程,它将自动计划为任务。

如果所有可等待对象都成功完成,结果将是返回值的聚合列表。结果值的顺序与 aws 中可等待对象的顺序相对应。

如果 return_exceptionsFalse (默认值),则第一个引发的异常会立即传播到等待 gather() 的任务。aws 序列中的其他可等待对象不会被取消,并将继续运行。

如果 return_exceptionsTrue,则异常的处理方式与成功结果相同,并聚合在结果列表中。

如果 gather()取消,则所有已提交的(尚未完成的)可等待对象也会被取消

如果来自 aws 序列的任何 Task 或 Future 被取消,则将其视为引发 CancelledError – 在这种情况下,gather() 调用不会被取消。这是为了防止取消一个已提交的 Task/Future 导致其他 Task/Future 被取消。

注意

创建和并发运行任务并等待其完成的新替代方案是 asyncio.TaskGroup。对于调度子任务的嵌套,TaskGroup 提供了比 gather 更强的安全保证:如果一个任务(或子任务,即由任务调度的任务)引发异常,TaskGroup 将取消剩余的计划任务,而 gather 则不会。

示例

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

注意

如果 return_exceptions 为 false,在 gather() 被标记为完成后取消 gather() 不会取消任何已提交的可等待对象。例如,gather 可以在将异常传播给调用者后被标记为完成,因此,在从 gather 捕获异常(由其中一个可等待对象引发)后调用 gather.cancel() 不会取消任何其他可等待对象。

在 3.7 版本中更改:如果 gather 本身被取消,则无论 return_exceptions 如何,都会传播取消。

在 3.10 版本中更改: 删除了 *loop* 参数。

自 3.10 版本起弃用:如果未提供位置参数,或者并非所有位置参数都是类似 Future 的对象,并且没有正在运行的事件循环,则会发出弃用警告。

急切任务工厂

asyncio.eager_task_factory(loop, coro, *, name=None, context=None)

用于急切任务执行的任务工厂。

当使用此工厂时(通过 loop.set_task_factory(asyncio.eager_task_factory)),协程在 Task 构造期间开始同步执行。只有当任务阻塞时,才会将其调度到事件循环上。这可以提高性能,因为避免了同步完成的协程的循环调度开销。

一个常见示例是使用缓存或记忆化来避免尽可能实际的 I/O 的协程。

注意

立即执行协程是一种语义更改。如果协程返回或引发异常,则永远不会将任务调度到事件循环。如果协程执行阻塞,则将任务调度到事件循环。此更改可能会对现有应用程序引入行为更改。例如,应用程序的任务执行顺序可能会更改。

在 3.12 版本中添加。

asyncio.create_eager_task_factory(custom_task_constructor)

创建一个急切任务工厂,类似于 eager_task_factory(),在创建新任务时使用提供的 custom_task_constructor 而不是默认的 Task

custom_task_constructor 必须是具有与 Task.__init__ 签名匹配的签名的可调用对象。该可调用对象必须返回一个与 asyncio.Task 兼容的对象。

此函数返回一个可调用对象,旨在通过 loop.set_task_factory(factory)) 用作事件循环的任务工厂。

在 3.12 版本中添加。

防止取消

awaitableasyncio.shield(aw)

保护一个 可等待对象 不被 取消

如果 aw 是协程,它会自动作为 Task 进行调度。

语句

task = asyncio.create_task(something())
res = await shield(task)

等价于

res = await something()

不同之处在于,如果包含它的协程被取消,则在 something() 中运行的 Task 不会被取消。从 something() 的角度来看,取消并未发生。尽管其调用者仍然被取消,因此 “await” 表达式仍然会引发 CancelledError

如果 something() 通过其他方式(即从其内部)取消,也会取消 shield()

如果希望完全忽略取消(不建议),则 shield() 函数应与 try/except 子句结合使用,如下所示

task = asyncio.create_task(something())
try:
    res = await shield(task)
except CancelledError:
    res = None

重要

保存传递给此函数的任务的引用,以避免任务在执行过程中消失。事件循环只保留对任务的弱引用。未在其他地方引用的任务可能随时被垃圾回收,甚至在完成之前。

在 3.10 版本中更改: 删除了 *loop* 参数。

自 3.10 版本起弃用:如果 aw 不是类似 Future 的对象,并且没有正在运行的事件循环,则会发出弃用警告。

超时

asyncio.timeout(delay)

返回一个 异步上下文管理器,可用于限制等待某些事物所花费的时间。

delay 可以是 None,也可以是浮点数/整数的等待秒数。如果 delayNone,则不会应用时间限制;如果上下文管理器创建时延迟未知,这可能会很有用。

在任何一种情况下,都可以使用 Timeout.reschedule() 在创建后重新调度上下文管理器。

示例

async def main():
    async with asyncio.timeout(10):
        await long_running_task()

如果 long_running_task 的完成时间超过 10 秒,则上下文管理器将取消当前任务并内部处理产生的 asyncio.CancelledError,将其转换为可以捕获和处理的 TimeoutError

注意

asyncio.timeout() 上下文管理器是将 asyncio.CancelledError 转换为 TimeoutError 的原因,这意味着 TimeoutError 只能在上下文管理器外部捕获。

捕获 TimeoutError 的示例

async def main():
    try:
        async with asyncio.timeout(10):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

asyncio.timeout() 生成的上下文管理器可以重新调度到不同的截止日期并进行检查。

class asyncio.Timeout(when)

一个用于取消过期协程的 异步上下文管理器

when 应该是一个绝对时间,表示上下文应该超时的时刻,以事件循环的时钟为准。

  • 如果 whenNone,则永远不会触发超时。

  • 如果 when < loop.time(),则超时将在事件循环的下一次迭代时触发。

when() float | None

返回当前截止时间,如果当前截止时间未设置,则返回 None

reschedule(when: float | None)

重新安排超时。

expired() bool

返回上下文管理器是否已超过其截止时间(已过期)。

示例

async def main():
    try:
        # We do not know the timeout when starting, so we pass ``None``.
        async with asyncio.timeout(None) as cm:
            # We know the timeout now, so we reschedule it.
            new_deadline = get_running_loop().time() + 10
            cm.reschedule(new_deadline)

            await long_running_task()
    except TimeoutError:
        pass

    if cm.expired():
        print("Looks like we haven't finished on time.")

超时上下文管理器可以安全地嵌套使用。

3.11 版本新增。

asyncio.timeout_at(when)

asyncio.timeout() 类似,不同之处在于 when 是停止等待的绝对时间,或 None

示例

async def main():
    loop = get_running_loop()
    deadline = loop.time() + 20
    try:
        async with asyncio.timeout_at(deadline):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

3.11 版本新增。

coroutine asyncio.wait_for(aw, timeout)

等待 aw 可等待对象 在超时时间内完成。

如果 aw 是协程,它会自动作为 Task 进行调度。

timeout 可以是 None,也可以是浮点数或整数,表示等待的秒数。如果 timeoutNone,则会阻塞直到 future 完成。

如果发生超时,它会取消任务并引发 TimeoutError

为了避免任务被 取消,请将其包装在 shield() 中。

该函数将等待直到 future 真正被取消,因此总等待时间可能超过 timeout。如果在取消期间发生异常,则会传播该异常。

如果等待被取消,则 future aw 也会被取消。

示例

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

在 3.7 版本中更改: aw 因超时而被取消时,wait_for 会等待 aw 被取消。以前,它会立即引发 TimeoutError

在 3.10 版本中更改: 删除了 *loop* 参数。

在 3.11 版本中更改: 引发 TimeoutError 而不是 asyncio.TimeoutError

等待原语

coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

并发运行 aws 可迭代对象中的 FutureTask 实例,并阻塞直到 return_when 指定的条件成立。

aws 可迭代对象不能为空。

返回两组 Task/Future:(done, pending)

用法

done, pending = await asyncio.wait(aws)

timeout (浮点数或整数),如果指定,可用于控制返回前等待的最大秒数。

请注意,此函数不会引发 TimeoutError。当发生超时时未完成的 Future 或 Task 只会返回到第二个集合中。

return_when 指示此函数应何时返回。它必须是以下常量之一

常量

描述

asyncio.FIRST_COMPLETED

当任何 future 完成或被取消时,该函数将返回。

asyncio.FIRST_EXCEPTION

当任何 future 通过引发异常完成时,该函数将返回。如果没有 future 引发异常,则它等效于 ALL_COMPLETED

asyncio.ALL_COMPLETED

当所有 future 完成或被取消时,该函数将返回。

wait_for() 不同,当发生超时时,wait() 不会取消 future。

在 3.10 版本中更改: 删除了 *loop* 参数。

在 3.11 版本中更改: 禁止直接将协程对象传递给 wait()

在 3.12 版本中更改: 添加了对生成器产生任务的支持。

asyncio.as_completed(aws, *, timeout=None)

并发运行 aws 可迭代对象中的 可等待对象。返回的对象可以迭代以获取可等待对象完成时的结果。

as_completed() 返回的对象可以作为 异步迭代器 或普通 迭代器 进行迭代。当使用异步迭代时,如果原始提供的可等待对象是任务或 future,则会产生这些对象。这使得将先前计划的任务与其结果关联起来变得容易。例如

ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]

async for earliest_connect in as_completed(tasks):
    # earliest_connect is done. The result can be obtained by
    # awaiting it or calling earliest_connect.result()
    reader, writer = await earliest_connect

    if earliest_connect is ipv6_connect:
        print("IPv6 connection established.")
    else:
        print("IPv4 connection established.")

在异步迭代期间,将为提供的不是任务或 future 的可等待对象生成隐式创建的任务。

当用作普通迭代器时,每次迭代都会产生一个新的协程,该协程返回下一个已完成的可等待对象的结果或引发异常。此模式与低于 3.13 的 Python 版本兼容

ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]

for next_connect in as_completed(tasks):
    # next_connect is not one of the original task objects. It must be
    # awaited to obtain the result value or raise the exception of the
    # awaitable that finishes next.
    reader, writer = await next_connect

如果在所有可等待对象完成之前发生超时,则会引发 TimeoutError。这由异步迭代期间的 async for 循环或普通迭代期间产生的协程引发。

在 3.10 版本中更改: 删除了 *loop* 参数。

自 3.10 版本起弃用: 如果 aws 可迭代对象中并非所有可等待对象都是 Future 类对象,并且没有正在运行的事件循环,则会发出弃用警告。

在 3.12 版本中更改: 添加了对生成器产生任务的支持。

在 3.13 版本中更改: 现在可以将结果用作 异步迭代器 或普通 迭代器 (以前仅是普通迭代器)。

在线程中运行

coroutine asyncio.to_thread(func, /, *args, **kwargs)

在单独的线程中异步运行函数 func

为此函数提供的任何 *args 和 **kwargs 都会直接传递给 func。此外,当前的 contextvars.Context 也会传播,允许从事件循环线程访问单独线程中的上下文变量。

返回一个协程,可以等待以获取 func 的最终结果。

此协程函数主要用于执行 IO 密集型函数/方法,如果它们在主线程中运行,则会阻塞事件循环。例如

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

在任何协程中直接调用 blocking_io() 都会在其持续时间内阻塞事件循环,从而导致额外的 1 秒运行时间。相反,通过使用 asyncio.to_thread(),我们可以在单独的线程中运行它,而不会阻塞事件循环。

注意

由于 GILasyncio.to_thread() 通常只能用于使 IO 密集型函数变为非阻塞。但是,对于释放 GIL 的扩展模块或没有 GIL 的其他 Python 实现,asyncio.to_thread() 也可以用于 CPU 密集型函数。

在 3.9 版本中添加。

从其他线程调度

asyncio.run_coroutine_threadsafe(coro, loop)

将协程提交到给定的事件循环中。线程安全。

返回一个 concurrent.futures.Future,用于等待来自另一个操作系统线程的结果。

此函数旨在从事件循环正在运行的操作系统线程之外的不同线程调用。例如:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果在协程中引发了异常,则会通知返回的 Future。它也可以用于取消事件循环中的任务。

try:
    result = future.result(timeout)
except TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

请参阅文档的 并发和多线程 部分。

与其他的 asyncio 函数不同,此函数需要显式传递 loop 参数。

在版本 3.5.1 中添加。

自省

asyncio.current_task(loop=None)

返回当前正在运行的 Task 实例,如果没有任务正在运行,则返回 None

如果 loopNone,则会使用 get_running_loop() 获取当前循环。

3.7 版本新增。

asyncio.all_tasks(loop=None)

返回循环运行的尚未完成的 Task 对象的集合。

如果 loopNone,则会使用 get_running_loop() 获取当前循环。

3.7 版本新增。

asyncio.iscoroutine(obj)

如果 obj 是一个协程对象,则返回 True

在版本 3.4 中添加。

任务对象

class asyncio.Task(coro, *, loop=None, name=None, context=None, eager_start=False)

一个 类似于 Future 的对象,它运行一个 Python 协程。不是线程安全的。

任务用于在事件循环中运行协程。如果一个协程等待 Future,则任务会暂停协程的执行,并等待 Future 完成。当 Future 完成 时,包装的协程的执行将恢复。

事件循环使用协作调度:一个事件循环一次运行一个任务。当一个任务等待 Future 完成时,事件循环会运行其他任务、回调或执行 IO 操作。

使用高级的 asyncio.create_task() 函数创建任务,或使用低级的 loop.create_task()ensure_future() 函数。不建议手动实例化任务。

要取消正在运行的任务,请使用 cancel() 方法。调用它将导致任务向包装的协程抛出 CancelledError 异常。如果协程在取消期间正在等待 Future 对象,则该 Future 对象将被取消。

cancelled() 可用于检查任务是否已取消。如果包装的协程没有抑制 CancelledError 异常并且实际上被取消了,则该方法返回 True

asyncio.TaskFuture 继承了除 Future.set_result()Future.set_exception() 之外的所有 API。

可选的仅限关键字的 context 参数允许为要运行的 coro 指定自定义 contextvars.Context 。 如果未提供 context,则 Task 会复制当前上下文,然后在复制的上下文中运行其协程。

可选的仅限关键字的 eager_start 参数允许在任务创建时立即开始执行 asyncio.Task 。 如果设置为 True 并且事件循环正在运行,则任务将立即开始执行协程,直到协程首次阻塞为止。 如果协程返回或引发而没有阻塞,则任务将尽早完成,并将跳过调度到事件循环。

在 3.7 版本中更改: 添加了对 contextvars 模块的支持。

在 3.8 版本中更改: 添加了 name 参数。

在 3.10 版本中已弃用: 如果未指定 loop 并且没有正在运行的事件循环,则会发出弃用警告。

在 3.11 版本中更改: 添加了 context 参数。

在 3.12 版本中更改: 添加了 eager_start 参数。

done()

如果任务已完成,则返回 True

当包装的协程返回值、引发异常或任务被取消时,任务即被视为完成

result()

返回任务的结果。

如果任务已完成,则返回包装的协程的结果(或者,如果协程引发了异常,则会重新引发该异常。)

如果任务已取消,则此方法会引发 CancelledError 异常。

如果任务的结果尚不可用,则此方法会引发 InvalidStateError 异常。

exception()

返回任务的异常。

如果包装的协程引发了异常,则返回该异常。 如果包装的协程正常返回,则此方法返回 None

如果任务已取消,则此方法会引发 CancelledError 异常。

如果任务尚未完成,则此方法会引发 InvalidStateError 异常。

add_done_callback(callback, *, context=None)

添加一个回调函数,以便在任务完成时运行。

此方法仅应在低级基于回调的代码中使用。

有关更多详细信息,请参阅 Future.add_done_callback() 的文档。

remove_done_callback(callback)

从回调列表删除 callback

此方法仅应在低级基于回调的代码中使用。

有关更多详细信息,请参阅 Future.remove_done_callback() 的文档。

get_stack(*, limit=None)

返回此任务的堆栈帧列表。

如果包装的协程尚未完成,则返回其挂起的堆栈。 如果协程已成功完成或已取消,则返回一个空列表。 如果协程因异常而终止,则返回回溯帧列表。

帧始终按从最旧到最新的顺序排列。

对于挂起的协程,仅返回一个堆栈帧。

可选的 limit 参数设置返回的最大帧数;默认情况下,返回所有可用的帧。返回的列表的顺序取决于返回的是堆栈还是回溯:返回堆栈的最新帧,但返回回溯的最旧帧。(这与 traceback 模块的行为一致。)

print_stack(*, limit=None, file=None)

打印此任务的堆栈或回溯。

这会产生类似于 traceback 模块的输出,用于通过 get_stack() 检索的帧。

limit 参数直接传递给 get_stack()

file 参数是写入输出的 I/O 流;默认情况下,输出写入 sys.stdout

get_coro()

返回 Task 封装的协程对象。

注意

对于已经提前完成的任务,这将返回 None。请参阅 Eager Task Factory

在 3.8 版本中添加。

在 3.12 版本中更改: 新添加的提前任务执行意味着结果可能为 None

get_context()

返回与任务关联的 contextvars.Context 对象。

在 3.12 版本中添加。

get_name()

返回任务的名称。

如果未显式为任务分配名称,则默认的 asyncio 任务实现会在实例化期间生成默认名称。

在 3.8 版本中添加。

set_name(value)

设置任务的名称。

value 参数可以是任何对象,然后将其转换为字符串。

在默认的任务实现中,该名称将在任务对象的 repr() 输出中可见。

在 3.8 版本中添加。

cancel(msg=None)

请求取消任务。

这会安排在事件循环的下一个周期中将 CancelledError 异常抛入封装的协程中。

然后,协程有机会清理甚至通过使用 try … … except CancelledErrorfinally 块来抑制异常来拒绝该请求。因此,与 Future.cancel() 不同,Task.cancel() 不保证任务将被取消,尽管完全抑制取消并不常见,并且不鼓励这样做。如果协程仍然决定抑制取消,则除了捕获异常外,还需要调用 Task.uncancel()

在 3.9 版本中更改: 添加了 msg 参数。

在 3.11 版本中更改: msg 参数从已取消的任务传播到其等待者。

以下示例说明了协程如何拦截取消请求

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()

如果任务已取消,则返回 True

当使用 cancel() 请求取消,并且封装的协程传播了抛入其中的 CancelledError 异常时,该任务才被视为取消

uncancel()

减少对此任务的取消请求计数。

返回剩余的取消请求数。

请注意,一旦取消的任务执行完成,进一步调用 uncancel() 将无效。

3.11 版本新增。

此方法由 asyncio 的内部使用,不希望最终用户代码使用。特别是,如果任务成功地被取消,这将允许结构化并发的元素(如 任务组asyncio.timeout())继续运行,从而将取消隔离到相应的结构化块。例如

async def make_request_with_timeout():
    try:
        async with asyncio.timeout(1):
            # Structured block affected by the timeout:
            await make_request()
            await make_another_request()
    except TimeoutError:
        log("There was a timeout")
    # Outer code not affected by the timeout:
    await unrelated_code()

虽然由于超时,make_request()make_another_request() 的块可能会被取消,但即使在超时的情况下,unrelated_code() 也应继续运行。这是通过 uncancel() 实现的。TaskGroup 上下文管理器以类似的方式使用 uncancel()

如果最终用户代码由于某种原因通过捕获 CancelledError 来抑制取消,则需要调用此方法来删除取消状态。

当此方法将取消计数减少到零时,该方法会检查之前的 cancel() 调用是否已安排将 CancelledError 抛入任务中。如果尚未抛出,则会取消该安排(通过重置内部 _must_cancel 标志)。

在 3.13 版本中更改: 更改为在达到零时撤销挂起的取消请求。

cancelling()

返回此任务的挂起取消请求数,即 cancel() 的调用次数减去 uncancel() 的调用次数。

请注意,如果此数字大于零,但任务仍在执行,则 cancelled() 仍将返回 False。这是因为可以通过调用 uncancel() 来降低此数字,这可能会导致任务在取消请求降至零后最终不会被取消。

此方法由 asyncio 的内部使用,不希望最终用户代码使用。有关详细信息,请参阅 uncancel()

3.11 版本新增。