使用 asyncio 进行开发

异步编程与经典的“顺序”编程不同。

本页列出了常见的错误和陷阱,并解释了如何避免它们。

调试模式

默认情况下,asyncio 以生产模式运行。为了方便开发,asyncio 具有一个 调试模式

有几种方法可以启用 asyncio 调试模式

除了启用调试模式,还请考虑

  • asyncio 记录器 的日志级别设置为 logging.DEBUG,例如,以下代码片段可以在应用程序启动时运行

    logging.basicConfig(level=logging.DEBUG)
    
  • 配置 warnings 模块以显示 ResourceWarning 警告。一种方法是使用 -W default 命令行选项。

当调试模式启用时

  • 许多非线程安全的 asyncio API(例如 loop.call_soon()loop.call_at() 方法)如果在错误的线程中调用,则会引发异常。

  • 如果 I/O 选择器执行 I/O 操作花费太长时间,则会记录其执行时间。

  • 执行时间超过 100 毫秒的回调将被记录。 loop.slow_callback_duration 属性可用于设置被认为是“慢”的最小执行持续时间(以秒为单位)。

并发和多线程

事件循环在一个线程中运行(通常是主线程),并在其线程中执行所有回调和任务。当一个任务在事件循环中运行时,同一线程中的其他任务不能运行。当一个任务执行 await 表达式时,正在运行的任务被挂起,事件循环执行下一个任务。

要从另一个操作系统线程调度 回调,应使用 loop.call_soon_threadsafe() 方法。示例

loop.call_soon_threadsafe(callback, *args)

几乎所有的 asyncio 对象都不是线程安全的,这通常不是问题,除非有代码从任务或回调之外使用它们。如果需要此类代码调用低级 asyncio API,则应使用 loop.call_soon_threadsafe() 方法,例如

loop.call_soon_threadsafe(fut.cancel)

要从不同的操作系统线程调度协程对象,应使用 run_coroutine_threadsafe() 函数。它返回一个 concurrent.futures.Future 来访问结果

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

为了处理信号,事件循环必须在主线程中运行。

可以使用 loop.run_in_executor() 方法与 concurrent.futures.ThreadPoolExecutorInterpreterPoolExecutor 结合使用,以在不同的操作系统线程中执行阻塞代码,而不会阻塞事件循环运行的操作系统线程。

目前无法直接从不同的进程(例如使用 multiprocessing 启动的进程)调度协程或回调。 事件循环方法 部分列出了可以在不阻塞事件循环的情况下从管道读取和监视文件描述符的 API。此外,asyncio 的 子进程 API 提供了一种从事件循环启动进程并与之通信的方法。最后,上述 loop.run_in_executor() 方法也可以与 concurrent.futures.ProcessPoolExecutor 结合使用,以在不同的进程中执行代码。

运行阻塞代码

阻塞(CPU密集型)代码不应直接调用。例如,如果一个函数执行 1 秒的 CPU密集型计算,所有并发的 asyncio 任务和 IO 操作都将延迟 1 秒。

执行器可用于在不同的线程中运行任务,包括在不同的解释器中,甚至在不同的进程中,以避免阻塞事件循环所在的操作系统线程。有关更多详细信息,请参阅 loop.run_in_executor() 方法。

日志记录

asyncio 使用 logging 模块,所有日志记录都通过 "asyncio" 记录器执行。

默认日志级别为 logging.INFO,可以轻松调整

logging.getLogger("asyncio").setLevel(logging.WARNING)

网络日志记录可能会阻塞事件循环。建议使用单独的线程处理日志或使用非阻塞 IO。例如,请参阅 处理阻塞处理程序

检测从未被等待的协程

当调用协程函数但未等待时(例如 coro() 而不是 await coro()),或者协程未通过 asyncio.create_task() 调度时,asyncio 将发出 RuntimeWarning

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

输出

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

调试模式下的输出

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的解决方法是等待协程或调用 asyncio.create_task() 函数

async def main():
    await test()

检测从未被检索的异常

如果调用了 Future.set_exception() 但从未等待 Future 对象,则该异常将永远不会传播到用户代码。在这种情况下,当 Future 对象被垃圾回收时,asyncio 将发出一条日志消息。

未处理异常的示例

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

输出

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

启用调试模式 以获取任务创建位置的追溯

asyncio.run(main(), debug=True)

调试模式下的输出

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed