使用 asyncio 进行开发

异步编程不同于经典的“顺序”编程。

此页面列出了常见的错误和陷阱,并说明如何避免它们。

调试模式

默认情况下,asyncio 在生产模式下运行。为了简化开发,asyncio 具有调试模式

有几种方法可以启用 asyncio 调试模式

除了启用调试模式之外,还应考虑

  • asyncio 记录器 的日志级别设置为 logging.DEBUG,例如,可以在应用程序启动时运行以下代码片段

    logging.basicConfig(level=logging.DEBUG)
    
  • 配置 warnings 模块以显示 ResourceWarning 警告。一种方法是使用 -W default 命令行选项。

当调试模式启用时

  • asyncio 检查 未等待的协程 并记录它们;这减轻了“忘记等待”的陷阱。

  • 许多非线程安全的 asyncio API(例如 loop.call_soon()loop.call_at() 方法)如果从错误的线程调用,则会引发异常。

  • 如果执行 I/O 操作需要花费太长时间,则会记录 I/O 选择器的执行时间。

  • 记录执行时间超过 100 毫秒的回调。可以使用 loop.slow_callback_duration 属性设置被视为“慢”的最小执行持续时间(以秒为单位)。

并发和多线程

事件循环在某个线程(通常是主线程)中运行,并在其线程中执行所有回调和任务。当任务在事件循环中运行时,其他任务不能在同一线程中运行。当任务执行 await 表达式时,正在运行的任务将被挂起,事件循环将执行下一个任务。

要从另一个操作系统线程安排 回调,应使用 loop.call_soon_threadsafe() 方法。示例

loop.call_soon_threadsafe(callback, *args)

几乎所有 asyncio 对象都不是线程安全的,除非有代码从任务或回调外部使用它们,否则通常不会出现问题。如果需要此类代码调用底层 asyncio API,则应使用 loop.call_soon_threadsafe() 方法,例如

loop.call_soon_threadsafe(fut.cancel)

要从不同的操作系统线程安排协程对象,应使用 run_coroutine_threadsafe() 函数。它返回一个 concurrent.futures.Future 来访问结果

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

要处理信号,事件循环必须在主线程中运行。

可以使用 loop.run_in_executor() 方法和 concurrent.futures.ThreadPoolExecutor 在不同的操作系统线程中执行阻塞代码,而不会阻塞事件循环所在的 OS 线程。

目前无法直接从其他进程(例如使用 multiprocessing 启动的进程)调度协程或回调。 事件循环方法 部分列出了可以在不阻塞事件循环的情况下从管道读取和监视文件描述符的 API。此外,asyncio 的 子进程 API 提供了一种从事件循环启动进程并与其通信的方法。最后,上述 loop.run_in_executor() 方法还可以与 concurrent.futures.ProcessPoolExecutor 一起使用,以便在其他进程中执行代码。

运行阻塞代码

不应直接调用阻塞(CPU 绑定)代码。例如,如果某个函数执行 1 秒的 CPU 密集型计算,则所有并发 asyncio 任务和 IO 操作都会延迟 1 秒。

可以使用执行器在其他线程甚至其他进程中运行任务,以避免使用事件循环阻塞 OS 线程。有关更多详细信息,请参阅 loop.run_in_executor() 方法。

日志记录

asyncio 使用 logging 模块,所有日志记录都通过 "asyncio" 记录器执行。

默认日志级别为 logging.INFO,可以轻松调整

logging.getLogger("asyncio").setLevel(logging.WARNING)

网络日志记录可能会阻塞事件循环。建议使用单独的线程来处理日志或使用非阻塞 IO。例如,请参阅 处理会阻塞的处理程序

检测从未等待过的协程

当协程函数被调用,但未等待(例如 coro() 而不是 await coro())或协程未通过 asyncio.create_task() 调度,asyncio 将发出 RuntimeWarning

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

输出

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

调试模式下的输出

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的修复方法是等待协程或调用 asyncio.create_task() 函数

async def main():
    await test()

检测从不检索的异常

如果调用了 Future.set_exception(),但从未等待 Future 对象,则该异常将永远不会传播到用户代码。在这种情况下,当 Future 对象被垃圾回收时,asyncio 将发出日志消息。

未处理异常的示例

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

输出

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

启用调试模式 以获取创建任务时的回溯

asyncio.run(main(), debug=True)

调试模式下的输出

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed