使用 asyncio 开发

异步编程与经典的“顺序”编程不同。

本页列出了常见的错误和陷阱,并解释了如何避免它们。

调试模式

默认情况下,asyncio 在生产模式下运行。 为了方便开发,asyncio 具有调试模式

有几种方法可以启用 asyncio 调试模式

除了启用调试模式,还应考虑

  • asyncio 记录器的日志级别设置为 logging.DEBUG,例如,以下代码片段可以在应用程序启动时运行

    logging.basicConfig(level=logging.DEBUG)
    
  • 配置 warnings 模块以显示 ResourceWarning 警告。 一种方法是使用 -W default 命令行选项。

启用调试模式后

  • asyncio 检查 未等待的协程 并记录它们; 这减轻了“忘记等待”的陷阱。

  • 许多非线程安全的 asyncio API(例如 loop.call_soon()loop.call_at() 方法)如果从错误的线程调用,则会引发异常。

  • 如果 I/O 选择器的执行时间过长而无法执行 I/O 操作,则会记录该执行时间。

  • 记录执行时间超过 100 毫秒的回调。loop.slow_callback_duration 属性可用于设置被认为是“慢”的最小执行持续时间(以秒为单位)。

并发与多线程

事件循环在一个线程(通常是主线程)中运行,并在其线程中执行所有回调和任务。当任务在事件循环中运行时,同一线程中不能运行其他任务。当任务执行 await 表达式时,正在运行的任务将挂起,并且事件循环将执行下一个任务。

要从另一个操作系统线程调度回调,应使用 loop.call_soon_threadsafe() 方法。 示例

loop.call_soon_threadsafe(callback, *args)

几乎所有的 asyncio 对象都不是线程安全的,这通常不是问题,除非有代码从任务或回调之外使用它们。如果需要此类代码来调用底层 asyncio API,则应使用 loop.call_soon_threadsafe() 方法,例如

loop.call_soon_threadsafe(fut.cancel)

要从不同的操作系统线程调度协程对象,应使用 run_coroutine_threadsafe() 函数。 它返回一个 concurrent.futures.Future 以访问结果

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

要处理信号,事件循环必须在主线程中运行。

loop.run_in_executor() 方法可以与 concurrent.futures.ThreadPoolExecutor 一起使用,以在不同的操作系统线程中执行阻塞代码,而不会阻塞事件循环运行所在的操作系统线程。

目前没有办法直接从不同的进程(例如使用 multiprocessing 启动的进程)调度协程或回调。事件循环方法 部分列出了可以从管道读取和监视文件描述符而不会阻塞事件循环的 API。 此外,asyncio 的 子进程 API 提供了一种从事件循环启动进程并与其通信的方法。 最后,上述 loop.run_in_executor() 方法也可以与 concurrent.futures.ProcessPoolExecutor 一起使用,以在不同的进程中执行代码。

运行阻塞代码

不应直接调用阻塞(CPU 密集型)代码。例如,如果一个函数执行 1 秒的 CPU 密集型计算,所有并发的 asyncio 任务和 IO 操作将延迟 1 秒。

可以使用执行器在不同的线程甚至不同的进程中运行任务,以避免阻塞具有事件循环的操作系统线程。 有关更多详细信息,请参阅 loop.run_in_executor() 方法。

日志记录

asyncio 使用 logging 模块,所有日志记录都通过 "asyncio" 记录器执行。

默认日志级别为 logging.INFO,可以轻松调整

logging.getLogger("asyncio").setLevel(logging.WARNING)

网络日志记录会阻塞事件循环。建议使用单独的线程来处理日志或使用非阻塞 IO。例如,请参阅处理阻塞的处理程序

检测从不等待的协程

当调用协程函数但未等待时(例如,coro() 而不是 await coro()),或者未使用 asyncio.create_task() 调度协程时,asyncio 将发出 RuntimeWarning

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

输出

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

调试模式下的输出

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的修复方法是等待协程或调用 asyncio.create_task() 函数

async def main():
    await test()

检测从未检索的异常

如果调用了 Future.set_exception(),但从未等待 Future 对象,则异常将永远不会传播到用户代码。 在这种情况下,当垃圾回收 Future 对象时,asyncio 将发出一条日志消息。

未处理的异常示例

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

输出

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

启用调试模式 以获取创建任务的回溯

asyncio.run(main(), debug=True)

调试模式下的输出

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed