事件循环

源代码: Lib/asyncio/events.py, Lib/asyncio/base_events.py


序言

事件循环是每个 asyncio 应用程序的核心。事件循环运行异步任务和回调,执行网络 IO 操作,并运行子进程。

应用程序开发人员通常应使用高级 asyncio 函数,例如 asyncio.run(),并且很少需要引用循环对象或调用其方法。本节主要针对需要对事件循环行为进行更精细控制的底层代码、库和框架的作者。

获取事件循环

可以使用以下底层函数来获取、设置或创建事件循环

asyncio.get_running_loop()

返回当前操作系统线程中正在运行的事件循环。

如果没有正在运行的事件循环,则引发 RuntimeError

此函数只能从协程或回调中调用。

3.7 版本新增。

asyncio.get_event_loop()

获取当前事件循环。

从协程或回调(例如,使用 call_soon 或类似的 API 调度)调用时,此函数将始终返回正在运行的事件循环。

如果没有设置正在运行的事件循环,该函数将返回 get_event_loop_policy().get_event_loop() 调用的结果。

由于此函数具有相当复杂的行为(尤其是在使用自定义事件循环策略时),因此在协程和回调中,优先使用 get_running_loop() 函数而不是 get_event_loop()

如上所述,考虑使用更高级别的 asyncio.run() 函数,而不是使用这些较低级别的函数手动创建和关闭事件循环。

自 3.12 版本弃用: 如果当前没有事件循环,则会发出弃用警告。在未来的某些 Python 版本中,这将变成错误。

asyncio.set_event_loop(loop)

loop 设置为当前操作系统线程的当前事件循环。

asyncio.new_event_loop()

创建并返回一个新的事件循环对象。

请注意,get_event_loop(), set_event_loop(), 和 new_event_loop() 函数的行为可以通过 设置自定义事件循环策略 来改变。

目录

此文档页面包含以下部分

事件循环方法

事件循环具有以下底层 API

运行和停止循环

loop.run_until_complete(future)

运行直到 futureFuture 的实例)完成。

如果参数是 协程对象,则会隐式地调度为 asyncio.Task 运行。

返回 Future 的结果或引发其异常。

loop.run_forever()

运行事件循环,直到调用 stop()

如果在调用 run_forever() 之前调用了 stop(),则循环将以零超时轮询 I/O 选择器一次,运行所有响应 I/O 事件调度的回调(以及那些已经调度的回调),然后退出。

如果在 run_forever() 运行时调用了 stop(),则循环将运行当前批次的回调然后退出。请注意,在这种情况下,回调调度的新的回调将不会运行;相反,它们将在下次调用 run_forever()run_until_complete() 时运行。

loop.stop()

停止事件循环。

loop.is_running()

如果事件循环当前正在运行,则返回 True

loop.is_closed()

如果事件循环已关闭,则返回 True

loop.close()

关闭事件循环。

调用此函数时,循环不得正在运行。任何挂起的回调都将被丢弃。

此方法清除所有队列并关闭执行器,但不等待执行器完成。

此方法是幂等的且不可逆的。事件循环关闭后,不应调用其他方法。

协程 loop.shutdown_asyncgens()

安排所有当前打开的 异步生成器 对象通过 aclose() 调用关闭。调用此方法后,如果迭代新的异步生成器,事件循环将发出警告。这应该用于可靠地完成所有计划的异步生成器。

请注意,当使用 asyncio.run() 时,无需调用此函数。

示例

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

在 3.6 版本中添加。

协程 loop.shutdown_default_executor(timeout=None)

安排关闭默认执行器,并等待它加入 ThreadPoolExecutor 中的所有线程。一旦调用此方法,使用 loop.run_in_executor() 的默认执行器将引发 RuntimeError

timeout 参数指定执行器完成加入所允许的时间量(以 float 秒为单位)。 使用默认值 None 时,允许执行器无限量的时间。

如果达到 timeout,则会发出 RuntimeWarning,并且默认执行器会终止,而无需等待其线程完成加入。

注意

使用 asyncio.run() 时不要调用此方法,因为后者会自动处理默认执行器的关闭。

在 3.9 版本中添加。

在 3.12 版本中更改: 添加了 timeout 参数。

调度回调

loop.call_soon(callback, *args, context=None)

安排 callback 回调 在事件循环的下一次迭代中与 args 参数一起调用。

返回 asyncio.Handle 的实例,该实例可以稍后用于取消回调。

回调按照注册的顺序调用。每个回调将精确调用一次。

可选的仅关键字 context 参数指定 callback 在其中运行的自定义 contextvars.Context。如果未提供 context,回调将使用当前上下文。

call_soon_threadsafe() 不同,此方法不是线程安全的。

loop.call_soon_threadsafe(callback, *args, context=None)

call_soon() 的线程安全变体。从另一个线程调度回调时,必须使用此函数,因为 call_soon() 不是线程安全的。

如果在已关闭的循环上调用,则引发 RuntimeError。这可能会在主应用程序关闭时在辅助线程上发生。

请参阅文档的 并发和多线程 部分。

在 3.7 版本中更改: 添加了仅关键字 context 参数。有关详细信息,请参阅 PEP 567

注意

大多数 asyncio 调度函数不允许传递关键字参数。为此,请使用 functools.partial()

# will schedule "print("Hello", flush=True)"
loop.call_soon(
    functools.partial(print, "Hello", flush=True))

使用 partial 对象通常比使用 lambdas 更方便,因为 asyncio 可以在调试和错误消息中更好地呈现 partial 对象。

调度延迟回调

事件循环提供了用于调度回调函数在未来某个时间点调用的机制。事件循环使用单调时钟来跟踪时间。

loop.call_later(delay, callback, *args, context=None)

安排在给定的 delay 秒数(可以是整数或浮点数)后调用callback

返回一个 asyncio.TimerHandle 的实例,该实例可用于取消回调。

callback 将被精确调用一次。如果两个回调安排在完全相同的时间执行,它们的调用顺序是未定义的。

可选的位置参数 args 将在调用时传递给回调。如果希望使用关键字参数调用回调,请使用 functools.partial()

可选的仅关键字参数 context 允许为 callback 指定一个自定义的 contextvars.Context 来运行。当没有提供 context 时,将使用当前上下文。

3.7 版本更改: 添加了仅关键字参数 context。有关更多详细信息,请参阅 PEP 567

3.8 版本更改: 在 Python 3.7 及更早版本中使用默认事件循环实现时,delay 不能超过一天。这个问题已在 Python 3.8 中得到修复。

loop.call_at(when, callback, *args, context=None)

安排在给定的绝对时间戳 when(整数或浮点数)调用 callback,使用与 loop.time() 相同的时间参考。

此方法的行为与 call_later() 相同。

返回一个 asyncio.TimerHandle 的实例,该实例可用于取消回调。

3.7 版本更改: 添加了仅关键字参数 context。有关更多详细信息,请参阅 PEP 567

3.8 版本更改: 在 Python 3.7 及更早版本中使用默认事件循环实现时,when 和当前时间之间的差值不能超过一天。这个问题已在 Python 3.8 中得到修复。

loop.time()

根据事件循环的内部单调时钟,返回当前时间,以 float 值表示。

注意

3.8 版本更改: 在 Python 3.7 及更早版本中,超时(相对 delay 或绝对 when)不应超过一天。这个问题已在 Python 3.8 中得到修复。

另请参阅

asyncio.sleep() 函数。

创建 Future 和 Task

loop.create_future()

创建一个附加到事件循环的 asyncio.Future 对象。

这是在 asyncio 中创建 Future 的首选方法。这允许第三方事件循环提供 Future 对象的替代实现(具有更好的性能或检测能力)。

3.5.2 版本新增。

loop.create_task(coro, *, name=None, context=None)

安排执行 协程 coro。返回一个 Task 对象。

第三方事件循环可以使用自己的 Task 子类来实现互操作性。在这种情况下,结果类型是 Task 的子类。

如果提供了 name 参数且不为 None,则使用 Task.set_name() 将其设置为任务的名称。

可选的仅关键字参数 context 允许为 coro 指定一个自定义的 contextvars.Context 来运行。当没有提供 context 时,将创建当前上下文的副本。

3.8 版本更改: 添加了 name 参数。

3.11 版本更改: 添加了 context 参数。

loop.set_task_factory(factory)

设置一个任务工厂,该工厂将由 loop.create_task() 使用。

如果 factoryNone,将设置默认的任务工厂。否则,factory 必须是一个可调用对象,其签名与 (loop, coro, context=None) 匹配,其中 loop 是对活动事件循环的引用,而 coro 是一个协程对象。可调用对象必须返回一个与 asyncio.Future 兼容的对象。

loop.get_task_factory()

如果正在使用默认任务工厂,则返回任务工厂或 None

打开网络连接

coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None, all_errors=False)

打开一个流式传输连接到由 hostport 指定的给定地址。

套接字族可以是 AF_INETAF_INET6,具体取决于 host (或提供的 family 参数)。

套接字类型将为 SOCK_STREAM

protocol_factory 必须是一个可调用对象,返回一个 asyncio 协议 实现。

此方法将尝试在后台建立连接。成功后,它将返回一个 (transport, protocol) 对。

底层操作的时间顺序概要如下:

  1. 连接已建立,并为其创建了一个 传输

  2. protocol_factory 在不带参数的情况下被调用,并期望返回一个 协议 实例。

  3. 通过调用协议实例的 connection_made() 方法,将其与传输耦合。

  4. 成功后,返回一个 (transport, protocol) 元组。

创建的传输是一个与实现相关的双向流。

其他参数:

  • ssl:如果给定且不为 false,则创建 SSL/TLS 传输(默认情况下创建普通的 TCP 传输)。如果 ssl 是一个 ssl.SSLContext 对象,则此上下文用于创建传输;如果 sslTrue,则使用从 ssl.create_default_context() 返回的默认上下文。

  • server_hostname 设置或覆盖将用于匹配目标服务器证书的主机名。只有当 ssl 不是 None 时才应传递。默认情况下,使用 host 参数的值。如果 host 为空,则没有默认值,您必须为 server_hostname 传递一个值。如果 server_hostname 是一个空字符串,则禁用主机名匹配(这是一个严重的安全风险,可能导致中间人攻击)。

  • familyprotoflags 是可选的地址族、协议和标志,将传递给 getaddrinfo() 以进行 host 解析。如果给定,这些都应是来自相应 socket 模块常量的整数。

  • happy_eyeballs_delay,如果给定,则为此连接启用 Happy Eyeballs。它应该是一个浮点数,表示在并行启动下一个尝试之前,等待连接尝试完成的时间(以秒为单位)。这是 RFC 8305 中定义的“连接尝试延迟”。RFC 建议的合理默认值为 0.25(250 毫秒)。

  • interleave 控制当主机名解析为多个 IP 地址时地址的重新排序。如果 0 或未指定,则不进行重新排序,并且地址将按照 getaddrinfo() 返回的顺序进行尝试。如果指定一个正整数,则地址将按地址族交错,并且给定的整数将被解释为 RFC 8305 中定义的“第一个地址族计数”。如果未指定 happy_eyeballs_delay,则默认为 0;如果指定了 happy_eyeballs_delay,则默认为 1

  • 如果给定 sock,则它应该是现有、已连接的 socket.socket 对象,供传输使用。如果给定了 sock,则不应指定 hostportfamilyprotoflagshappy_eyeballs_delayinterleavelocal_addr 中的任何一个。

    注意

    sock 参数将套接字的所有权转移给创建的传输。要关闭套接字,请调用传输的 close() 方法。

  • local_addr,如果给定,则是一个 (local_host, local_port) 元组,用于在本地绑定套接字。local_hostlocal_port 的查找方式与 hostport 类似,使用 getaddrinfo()

  • ssl_handshake_timeout 是(对于 TLS 连接)等待 TLS 握手完成的时间(以秒为单位),超过该时间将中止连接。如果为 None(默认值),则为 60.0 秒。

  • ssl_shutdown_timeout 是等待 SSL 关闭完成的时间(以秒为单位),超过该时间将中止连接。如果为 None(默认值),则为 30.0 秒。

  • all_errors 确定在无法创建连接时引发哪些异常。默认情况下,只会引发一个 Exception:如果只有一个异常或所有错误都具有相同的消息,则会引发第一个异常,或者引发一个带有组合错误消息的 OSError。当 all_errorsTrue 时,将引发一个包含所有异常的 ExceptionGroup(即使只有一个异常)。

在 3.5 版本中更改: ProactorEventLoop 添加了对 SSL/TLS 的支持。

在 3.6 版本中更改: 默认情况下,为所有 TCP 连接设置套接字选项 socket.TCP_NODELAY

在 3.7 版本中更改: 添加了 ssl_handshake_timeout 参数。

在 3.8 版本中更改: 添加了 happy_eyeballs_delayinterleave 参数。

Happy Eyeballs 算法:双栈主机成功。当服务器的 IPv4 路径和协议工作正常,但服务器的 IPv6 路径和协议不工作时,与仅使用 IPv4 的客户端相比,双栈客户端应用程序会遇到明显的连接延迟。这是不希望的,因为它会导致双栈客户端的用户体验更差。本文档指定了用于减少这种用户可见延迟的算法的要求,并提供了一种算法。

更多信息:https://datatracker.ietf.org/doc/html/rfc6555

在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 参数。

在 3.12 版本中更改: 添加了 all_errors

另请参阅

open_connection() 函数是一种高级替代 API。它返回一对 (StreamReader, StreamWriter),可以直接在 async/await 代码中使用。

协程 loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_port=None, allow_broadcast=None, sock=None)

创建数据报连接。

套接字族可以是 AF_INET, AF_INET6, 或 AF_UNIX,具体取决于 host(或提供的 family 参数)。

套接字类型将为 SOCK_DGRAM

protocol_factory 必须是一个可调用对象,返回一个 协议 实现。

成功后,返回一个 (transport, protocol) 元组。

其他参数:

  • local_addr,如果给定,则是一个 (local_host, local_port) 元组,用于在本地绑定套接字。local_hostlocal_port 的查找方式与 hostport 类似,使用 getaddrinfo()

  • 如果给定 remote_addr,则它是一个 (remote_host, remote_port) 元组,用于将套接字连接到远程地址。remote_hostremote_port 将使用 getaddrinfo() 进行查找。

  • familyprotoflags 是可选的地址族、协议和标志,它们将被传递给 getaddrinfo() 用于 host 解析。如果给定,这些都应该是来自相应 socket 模块常量的整数。

  • reuse_port 告诉内核允许此端点绑定到与其他现有端点绑定的同一端口,只要它们在创建时都设置了此标志。此选项在 Windows 和一些 Unix 系统上不受支持。如果未定义 socket.SO_REUSEPORT 常量,则不支持此功能。

  • allow_broadcast 告诉内核允许此端点向广播地址发送消息。

  • 可以可选地指定 sock,以便使用一个预先存在的、已连接的 socket.socket 对象供传输使用。如果指定了,则应该省略 local_addrremote_addr(必须为 None)。

    注意

    sock 参数将套接字的所有权转移给创建的传输。要关闭套接字,请调用传输的 close() 方法。

请参阅 UDP 回声客户端协议UDP 回声服务器协议 示例。

在 3.4.4 版本中更改: 添加了 familyprotoflagsreuse_addressreuse_portallow_broadcastsock 参数。

在 3.8 版本中更改: 添加了对 Windows 的支持。

在 3.8.1 版本中更改: 不再支持 reuse_address 参数,因为使用 socket.SO_REUSEADDR 会对 UDP 造成严重的安全问题。显式传递 reuse_address=True 将引发异常。

当多个具有不同 UID 的进程使用 SO_REUSEADDR 将套接字分配给相同的 UDP 套接字地址时,传入的数据包可以在套接字之间随机分配。

对于支持的平台,可以使用 reuse_port 作为类似功能的替代品。使用 reuse_port 时,将使用 socket.SO_REUSEPORT,它可以专门阻止具有不同 UID 的进程将套接字分配给相同的套接字地址。

在 3.11 版本中更改: 自 Python 3.8.1、3.7.6 和 3.6.10 起禁用的 reuse_address 参数已完全删除。

coroutine loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

创建一个 Unix 连接。

套接字族将是 AF_UNIX;套接字类型将是 SOCK_STREAM

成功后,返回一个 (transport, protocol) 元组。

path 是 Unix 域套接字的名称,并且是必需的,除非指定了 sock 参数。支持抽象 Unix 套接字、strbytesPath 路径。

有关此方法参数的信息,请参阅 loop.create_connection() 方法的文档。

可用性:Unix。

在 3.7 版本中更改: 添加了 ssl_handshake_timeout 参数。 path 参数现在可以是 路径类对象

在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 参数。

创建网络服务器

coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

创建一个 TCP 服务器(套接字类型为 SOCK_STREAM),该服务器侦听 host 地址的 port

返回一个 Server 对象。

参数

  • protocol_factory 必须是一个可调用对象,返回一个 协议 实现。

  • host 参数可以设置为几种类型,这些类型确定服务器将侦听的位置

    • 如果 host 是一个字符串,则 TCP 服务器将绑定到由 host 指定的单个网络接口。

    • 如果 host 是一个字符串序列,则 TCP 服务器将绑定到该序列指定的所有网络接口。

    • 如果 host 是一个空字符串或 None,则假定所有接口,并且将返回多个套接字的列表(最有可能一个用于 IPv4,另一个用于 IPv6)。

  • port 参数可以设置为指定服务器应侦听的端口。如果为 0None(默认值),则将选择一个未使用的随机端口(请注意,如果 host 解析为多个网络接口,则将为每个接口选择一个不同的随机端口)。

  • 可以将 family 设置为 socket.AF_INETAF_INET6,以强制套接字使用 IPv4 或 IPv6。如果未设置,则将根据主机名确定 family(默认为 AF_UNSPEC)。

  • flagsgetaddrinfo() 的位掩码。

  • 可以可选地指定 sock 以使用预先存在的套接字对象。如果指定了,则不得指定 hostport

    注意

    sock 参数将套接字的所有权转移到创建的服务器。要关闭套接字,请调用服务器的 close() 方法。

  • backlog 是传递给 listen() 的最大排队连接数(默认为 100)。

  • 可以将 ssl 设置为 SSLContext 实例,以在接受的连接上启用 TLS。

  • reuse_address 告诉内核重用 TIME_WAIT 状态的本地套接字,而无需等待其自然超时到期。如果未指定,则在 Unix 上将自动设置为 True

  • reuse_port 告诉内核允许此端点绑定到与其他现有端点绑定的同一端口,只要它们在创建时都设置了此标志。此选项在 Windows 上不受支持。

  • keep_alive 设置为 True 通过启用消息的定期传输来保持连接活动。

在 3.13 版本中更改: 添加了 keep_alive 参数。

  • ssl_handshake_timeout(对于 TLS 服务器)是等待 TLS 握手完成然后中止连接的时间(以秒为单位)。 如果 None(默认值),则为 60.0 秒。

  • ssl_shutdown_timeout 是等待 SSL 关闭完成的时间(以秒为单位),超过该时间将中止连接。如果为 None(默认值),则为 30.0 秒。

  • start_serving 设置为 True (默认值) 会使创建的服务器立即开始接受连接。当设置为 False 时,用户应该等待 Server.start_serving()Server.serve_forever(),以使服务器开始接受连接。

在 3.5 版本中更改: ProactorEventLoop 添加了对 SSL/TLS 的支持。

3.5.1 版本更改: host 参数可以是一个字符串序列。

3.6 版本更改: 添加了 ssl_handshake_timeoutstart_serving 参数。默认情况下,所有 TCP 连接都会设置套接字选项 socket.TCP_NODELAY

在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 参数。

另请参阅

start_server() 函数是一个更高层的替代 API,它返回一对 StreamReaderStreamWriter,可以在 async/await 代码中使用。

协程 loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)

类似于 loop.create_server(),但适用于 AF_UNIX 套接字族。

path 是 Unix 域套接字的名称,是必需的,除非提供了 sock 参数。支持抽象 Unix 套接字,str, bytesPath 路径。

如果 cleanup_socket 为 true,则当服务器关闭时,Unix 套接字将自动从文件系统中删除,除非在创建服务器后套接字已被替换。

有关此方法的参数的信息,请参阅 loop.create_server() 方法的文档。

可用性:Unix。

3.7 版本更改: 添加了 ssl_handshake_timeoutstart_serving 参数。path 参数现在可以是 Path 对象。

在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 参数。

3.13 版本更改: 添加了 cleanup_socket 参数。

协程 loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

将已接受的连接包装到传输/协议对中。

此方法可供在 asyncio 之外接受连接但使用 asyncio 处理这些连接的服务器使用。

参数

  • protocol_factory 必须是一个可调用对象,返回一个 协议 实现。

  • sock 是从 socket.accept 返回的预先存在的套接字对象。

    注意

    sock 参数将套接字的所有权转移给创建的传输。要关闭套接字,请调用传输的 close() 方法。

  • 可以将 ssl 设置为 SSLContext 以便在接受的连接上启用 SSL。

  • ssl_handshake_timeout (对于 SSL 连接) 是在中止连接之前等待 SSL 握手完成的时间(以秒为单位)。如果为 None (默认值),则为 60.0 秒。

  • ssl_shutdown_timeout 是等待 SSL 关闭完成的时间(以秒为单位),超过该时间将中止连接。如果为 None(默认值),则为 30.0 秒。

返回一个 (transport, protocol) 对。

在 3.5.3 版本中添加。

在 3.7 版本中更改: 添加了 ssl_handshake_timeout 参数。

在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 参数。

传输文件

协程 loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)

通过 transport 发送 file。返回发送的总字节数。

如果可用,该方法使用高性能的 os.sendfile()

file 必须是以二进制模式打开的常规文件对象。

offset 指定从何处开始读取文件。如果指定了 count,则它是要传输的总字节数,而不是将文件发送到 EOF。即使此方法引发错误,文件位置也会始终更新,并且可以使用 file.tell() 来获取实际发送的字节数。

当平台不支持 sendfile 系统调用时 (例如 Windows 或 Unix 上的 SSL 套接字),将 fallback 设置为 True 使 asyncio 手动读取和发送文件。

如果系统不支持 sendfile 系统调用并且 fallbackFalse,则引发 SendfileNotAvailableError

3.7 版本新增。

TLS 升级

协程 loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

将现有的基于传输的连接升级到 TLS。

创建 TLS 编码器/解码器实例,并将其插入 transportprotocol 之间。该编码器/解码器实现面向 transport 的协议和面向 protocol 的传输。

返回创建的双接口实例。在 await 之后,protocol 必须停止使用原始 transport,并且只能与返回的对象通信,因为编码器会缓存 protocol 端的数据,并零星地与 transport 交换额外的 TLS 会话数据包。

在某些情况下 (例如,当传递的传输已经关闭时),这可能会返回 None

参数

  • transportprotocol 是诸如 create_server()create_connection() 之类的方法返回的实例。

  • sslcontextSSLContext 的配置实例。

  • 当正在升级服务器端连接时(例如由 create_server() 创建的连接),传递 True

  • server_hostname:设置或覆盖目标服务器的证书将与之匹配的主机名。

  • ssl_handshake_timeout 是(对于 TLS 连接)等待 TLS 握手完成的时间(以秒为单位),超过该时间将中止连接。如果为 None(默认值),则为 60.0 秒。

  • ssl_shutdown_timeout 是等待 SSL 关闭完成的时间(以秒为单位),超过该时间将中止连接。如果为 None(默认值),则为 30.0 秒。

3.7 版本新增。

在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 参数。

监视文件描述符

loop.add_reader(fd, callback, *args)

开始监视 fd 文件描述符的读取可用性,并在 fd 可用于读取后,使用指定的参数调用 callback

loop.remove_reader(fd)

停止监视 fd 文件描述符的读取可用性。如果 fd 之前正在被监视读取,则返回 True

loop.add_writer(fd, callback, *args)

开始监视 fd 文件描述符的写入可用性,并在 fd 可用于写入时调用带有指定参数的 callback

使用 functools.partial() 将关键字参数传递给 callback

loop.remove_writer(fd)

停止监视 fd 文件描述符的写入可用性。如果 fd 之前正在被监视写入,则返回 True

有关这些方法的一些限制,请参阅 平台支持 部分。

直接使用套接字对象

通常,使用基于传输的 API(例如 loop.create_connection()loop.create_server())的协议实现比直接使用套接字的实现更快。但是,在某些性能不是关键的情况下,直接使用 socket 对象会更方便。

协程 loop.sock_recv(sock, nbytes)

sock 接收最多 nbytes 个字节。 socket.recv() 的异步版本。

以字节对象形式返回接收到的数据。

sock 必须是一个非阻塞套接字。

在 3.7 版本中更改: 尽管此方法始终被记录为协程方法,但在 Python 3.7 之前的版本中,它返回一个 Future。 从 Python 3.7 开始,它是一个 async def 方法。

协程 loop.sock_recv_into(sock, buf)

sock 接收数据到 buf 缓冲区。 模仿阻塞式 socket.recv_into() 方法。

返回写入缓冲区的字节数。

sock 必须是一个非阻塞套接字。

3.7 版本新增。

协程 loop.sock_recvfrom(sock, bufsize)

sock 接收最大 bufsize 的数据报。socket.recvfrom() 的异步版本。

返回 (接收到的数据, 远程地址) 的元组。

sock 必须是一个非阻塞套接字。

在 3.11 版本中添加。

协程 loop.sock_recvfrom_into(sock, buf, nbytes=0)

sock 接收最大 nbytes 的数据报到 buf 中。socket.recvfrom_into() 的异步版本。

返回 (接收到的字节数,远程地址) 的元组。

sock 必须是一个非阻塞套接字。

在 3.11 版本中添加。

协程 loop.sock_sendall(sock, data)

data 发送到 sock 套接字。 socket.sendall() 的异步版本。

此方法会继续发送到套接字,直到 data 中的所有数据都已发送或发生错误为止。成功时返回 None。 发生错误时,将引发异常。此外,无法确定连接的接收端成功处理了多少数据(如果有)。

sock 必须是一个非阻塞套接字。

在 3.7 版本中更改: 尽管此方法始终被记录为协程方法,但在 Python 3.7 之前它返回一个 Future。 从 Python 3.7 开始,它是一个 async def 方法。

协程 loop.sock_sendto(sock, data, address)

将数据报从 sock 发送到 addresssocket.sendto() 的异步版本。

返回发送的字节数。

sock 必须是一个非阻塞套接字。

在 3.11 版本中添加。

协程 loop.sock_connect(sock, address)

sock 连接到 address 的远程套接字。

socket.connect() 的异步版本。

sock 必须是一个非阻塞套接字。

在 3.5.2 版本中更改: address 不再需要解析。 sock_connect 将尝试通过调用 socket.inet_pton() 来检查 address 是否已被解析。 如果没有,将使用 loop.getaddrinfo() 来解析 address

协程 loop.sock_accept(sock)

接受一个连接。模仿阻塞式 socket.accept() 方法。

该套接字必须绑定到一个地址并侦听连接。返回值是一对 (conn, address),其中 conn 是一个新的套接字对象,可用于在该连接上发送和接收数据,而 address 是绑定到连接另一端套接字的地址。

sock 必须是一个非阻塞套接字。

在 3.7 版本中更改: 尽管此方法始终被记录为协程方法,但在 Python 3.7 之前它返回一个 Future。 从 Python 3.7 开始,它是一个 async def 方法。

另请参阅

loop.create_server()start_server()

协程 loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)

如果可能,使用高性能 os.sendfile 发送文件。 返回发送的总字节数。

socket.sendfile() 的异步版本。

sock 必须是非阻塞 socket.SOCK_STREAM socket

file 必须是以二进制模式打开的常规文件对象。

offset 指定从何处开始读取文件。如果指定了 count,则它是要传输的总字节数,而不是将文件发送到 EOF。即使此方法引发错误,文件位置也会始终更新,并且可以使用 file.tell() 来获取实际发送的字节数。

当设置为 True 时,如果平台不支持 sendfile 系统调用(例如 Windows 或 Unix 上的 SSL 套接字),则 fallback 会使 asyncio 手动读取和发送文件。

如果系统不支持 sendfile 系统调用且 fallbackFalse,则引发 SendfileNotAvailableError

sock 必须是一个非阻塞套接字。

3.7 版本新增。

DNS

协程 loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)

socket.getaddrinfo() 的异步版本。

协程 loop.getnameinfo(sockaddr, flags=0)

socket.getnameinfo() 的异步版本。

注意

getaddrinfogetnameinfo 内部都通过循环的默认线程池执行器利用它们的同步版本。当此执行器饱和时,这些方法可能会遇到延迟,更高级别的网络库可能会将其报告为超时增加。为了缓解这种情况,请考虑为其他用户任务使用自定义执行器,或设置具有更多工作线程的默认执行器。

在 3.7 版本中变更: getaddrinfogetnameinfo 方法始终被记录为返回一个协程,但在 Python 3.7 之前,它们实际上返回的是 asyncio.Future 对象。从 Python 3.7 开始,这两个方法都是协程。

使用管道

协程 loop.connect_read_pipe(protocol_factory, pipe)

在事件循环中注册 pipe 的读取端。

protocol_factory 必须是一个可调用对象,返回一个 asyncio 协议 实现。

pipe 是一个 类文件对象

返回对 (transport, protocol),其中 transport 支持 ReadTransport 接口,protocol 是由 protocol_factory 实例化的对象。

使用 SelectorEventLoop 事件循环,pipe 被设置为非阻塞模式。

协程 loop.connect_write_pipe(protocol_factory, pipe)

在事件循环中注册 pipe 的写入端。

protocol_factory 必须是一个可调用对象,返回一个 asyncio 协议 实现。

pipe类文件对象

返回对 (transport, protocol),其中 transport 支持 WriteTransport 接口,protocol 是由 protocol_factory 实例化的对象。

使用 SelectorEventLoop 事件循环,pipe 被设置为非阻塞模式。

注意

SelectorEventLoop 不支持 Windows 上的上述方法。对于 Windows,请改用 ProactorEventLoop

另请参阅

loop.subprocess_exec()loop.subprocess_shell() 方法。

Unix 信号

loop.add_signal_handler(signum, callback, *args)

callback 设置为 signum 信号的处理程序。

回调将由 loop 调用,以及该事件循环的其他排队回调和可运行协程。与使用 signal.signal() 注册的信号处理程序不同,使用此函数注册的回调允许与事件循环交互。

如果信号编号无效或不可捕获,则引发 ValueError。如果设置处理程序时出现问题,则引发 RuntimeError

使用 functools.partial() 将关键字参数传递给 callback

signal.signal() 一样,此函数必须在主线程中调用。

loop.remove_signal_handler(sig)

移除 sig 信号的处理程序。

如果信号处理程序被移除,则返回 True,如果未为给定信号设置处理程序,则返回 False

可用性:Unix。

另请参阅

signal 模块。

在线程或进程池中执行代码

可等待对象 loop.run_in_executor(executor, func, *args)

安排在指定的执行器中调用 func

executor 参数应为 concurrent.futures.Executor 实例。如果 executorNone,则使用默认执行器。可以使用 loop.set_default_executor() 设置默认执行器,否则,如果需要,run_in_executor() 将惰性初始化并使用 concurrent.futures.ThreadPoolExecutor

示例

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

if __name__ == '__main__':
    asyncio.run(main())

请注意,由于 multiprocessing 的特殊性(由 ProcessPoolExecutor 使用),选项 3 需要入口点保护 (if __name__ == '__main__')。请参阅 安全导入主模块

此方法返回一个 asyncio.Future 对象。

使用 functools.partial() 将关键字参数传递给 func

在 3.5.3 版本中更改: loop.run_in_executor() 不再配置它创建的线程池执行器的 max_workers,而是将其留给线程池执行器(ThreadPoolExecutor)来设置默认值。

loop.set_default_executor(executor)

executor 设置为 run_in_executor() 使用的默认执行器。executor 必须是 ThreadPoolExecutor 的实例。

在 3.11 版本中更改: executor 必须是 ThreadPoolExecutor 的实例。

错误处理 API

允许自定义事件循环中如何处理异常。

loop.set_exception_handler(handler)

handler 设置为新的事件循环异常处理程序。

如果 handlerNone,则将设置默认异常处理程序。 否则,handler 必须是与 (loop, context) 签名匹配的可调用对象,其中 loop 是对活动事件循环的引用,而 context 是一个 dict 对象,其中包含异常的详细信息(有关 context 的详细信息,请参阅 call_exception_handler() 文档)。

如果代表 TaskHandle 调用处理程序,则它在该任务或回调句柄的 contextvars.Context 中运行。

在 3.12 版本中更改: 可以在异常发生的任务或句柄的 Context 中调用处理程序。

loop.get_exception_handler()

返回当前的异常处理程序,如果没有设置自定义异常处理程序,则返回 None

3.5.2 版本新增。

loop.default_exception_handler(context)

默认异常处理程序。

当发生异常且未设置异常处理程序时,将调用此函数。 可以由想要推迟到默认处理程序行为的自定义异常处理程序调用此函数。

context 参数的含义与 call_exception_handler() 中的含义相同。

loop.call_exception_handler(context)

调用当前的事件循环异常处理程序。

context 是一个 dict 对象,其中包含以下键(在未来的 Python 版本中可能会引入新键)

  • “message”:错误消息;

  • “exception”(可选):异常对象;

  • “future”(可选):asyncio.Future 实例;

  • “task”(可选):asyncio.Task 实例;

  • “handle”(可选):asyncio.Handle 实例;

  • “protocol”(可选):协议 实例;

  • “transport”(可选):传输 实例;

  • “socket”(可选):socket.socket 实例;

  • “asyncgen”(可选):导致异常的异步生成器。

    异常。

注意

不应在子类化的事件循环中重载此方法。 对于自定义异常处理,请使用 set_exception_handler() 方法。

启用调试模式

loop.get_debug()

获取事件循环的调试模式(bool)。

如果环境变量 PYTHONASYNCIODEBUG 设置为非空字符串,则默认值为 True,否则为 False

loop.set_debug(enabled: bool)

设置事件循环的调试模式。

在 3.7 版本中更改: 现在还可以使用新的 Python 开发模式 来启用调试模式。

loop.slow_callback_duration

此属性可用于设置被认为是“慢”的最短执行时间(以秒为单位)。 启用调试模式后,将记录“慢”的回调。

默认值为 100 毫秒。

另请参阅

asyncio 的调试模式

运行子进程

此小节中描述的方法是低级的。 在常规的 async/await 代码中,请考虑改用高级的 asyncio.create_subprocess_shell()asyncio.create_subprocess_exec() 便利函数。

注意

在 Windows 上,默认事件循环 ProactorEventLoop 支持子进程,而 SelectorEventLoop 则不支持。 有关详细信息,请参阅 Windows 上的子进程支持

coroutine loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

args 指定的一个或多个字符串参数创建子进程。

args 必须是由以下表示的字符串列表

第一个字符串指定程序可执行文件,其余字符串指定参数。 字符串参数共同构成程序的 argv

这类似于标准库 subprocess.Popen 类,使用 shell=False 调用,并将字符串列表作为第一个参数传递;但是,当 Popen 接收一个作为字符串列表的参数时,subprocess_exec 接收多个字符串参数。

protocol_factory 必须是一个可调用对象,返回 asyncio.SubprocessProtocol 类的子类。

其他参数

  • stdin 可以是以下任何一项

    • 类似文件的对象

    • 现有文件描述符(正整数),例如使用 os.pipe() 创建的文件描述符

    • 使用 subprocess.PIPE 常量(默认值),它将创建一个新的管道并连接它,

    • 使用值 None,这将使子进程从当前进程继承文件描述符

    • 使用 subprocess.DEVNULL 常量,它表示将使用特殊的 os.devnull 文件

  • stdout 可以是以下任何一个

    • 类似文件的对象

    • 使用 subprocess.PIPE 常量(默认值),它将创建一个新的管道并连接它,

    • 使用值 None,这将使子进程从当前进程继承文件描述符

    • 使用 subprocess.DEVNULL 常量,它表示将使用特殊的 os.devnull 文件

  • stderr 可以是以下任何一个

    • 类似文件的对象

    • 使用 subprocess.PIPE 常量(默认值),它将创建一个新的管道并连接它,

    • 使用值 None,这将使子进程从当前进程继承文件描述符

    • 使用 subprocess.DEVNULL 常量,它表示将使用特殊的 os.devnull 文件

    • 使用 subprocess.STDOUT 常量,它将把标准错误流连接到进程的标准输出流

  • 所有其他关键字参数都将传递给 subprocess.Popen,不做任何解释,除了 *bufsize*、*universal_newlines*、*shell*、*text*、*encoding* 和 *errors*,这些参数都不应指定。

    asyncio 子进程 API 不支持将流解码为文本。可以使用 bytes.decode() 将流返回的字节转换为文本。

如果作为 *stdin*、*stdout* 或 *stderr* 传递的类文件对象表示一个管道,则该管道的另一端应使用 connect_write_pipe()connect_read_pipe() 进行注册,以便与事件循环一起使用。

有关其他参数的文档,请参阅 subprocess.Popen 类的构造函数。

返回一个 (transport, protocol) 对,其中 *transport* 符合 asyncio.SubprocessTransport 基类,而 *protocol* 是由 *protocol_factory* 实例化的对象。

协程 loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

从 *cmd* 创建一个子进程,*cmd* 可以是 str 或一个编码为 文件系统编码bytes 字符串,使用平台的“shell”语法。

这类似于标准库 subprocess.Popen 类在调用时使用 shell=True

*protocol_factory* 必须是可调用的,返回 SubprocessProtocol 类的子类。

有关其余参数的详细信息,请参阅 subprocess_exec()

返回一个 (transport, protocol) 对,其中 *transport* 符合 asyncio.SubprocessTransport 基类,而 *protocol* 是由 *protocol_factory* 实例化的对象。

注意

应用程序有责任确保所有空格和特殊字符都被适当地引用,以避免 shell 注入 漏洞。可以使用 shlex.quote() 函数来正确转义将用于构造 shell 命令的字符串中的空格和特殊字符。

回调句柄

class asyncio.Handle

loop.call_soon()loop.call_soon_threadsafe() 返回的回调包装器对象。

get_context()

返回与句柄关联的 contextvars.Context 对象。

在 3.12 版本中添加。

cancel()

取消回调。如果回调已被取消或执行,则此方法无效。

cancelled()

如果回调被取消,则返回 True

3.7 版本新增。

class asyncio.TimerHandle

loop.call_later()loop.call_at() 返回的回调包装器对象。

此类是 Handle 的子类。

when()

float 秒的形式返回计划的回调时间。

该时间是一个绝对时间戳,使用与 loop.time() 相同的时间参考。

3.7 版本新增。

服务器对象

服务器对象由 loop.create_server()loop.create_unix_server()start_server()start_unix_server() 函数创建。

请勿直接实例化 Server 类。

class asyncio.Server

Server 对象是异步上下文管理器。当在 async with 语句中使用时,可以保证当 async with 语句完成时,Server 对象已关闭且不接受新连接。

srv = await loop.create_server(...)

async with srv:
    # some code

# At this point, srv is closed and no longer accepts new connections.

在 3.7 版本中更改: 自 Python 3.7 起,Server 对象是一个异步上下文管理器。

在 3.11 版本中更改: 此类在 Python 3.9.11、3.10.3 和 3.11 中作为 asyncio.Server 公开。

close()

停止服务:关闭侦听套接字并将 sockets 属性设置为 None

表示现有传入客户端连接的套接字保持打开状态。

服务器是异步关闭的;使用 wait_closed() 协程等待直到服务器关闭(并且没有更多连接处于活动状态)。

close_clients()

关闭所有现有的传入客户端连接。

在所有关联的传输上调用 close()

为了避免与新客户端连接发生竞争,在关闭服务器时,应该在调用 close_clients() 之前调用 close()

3.13 版本中新增。

abort_clients()

立即关闭所有现有的传入客户端连接,无需等待挂起的操作完成。

在所有关联的传输上调用 abort()

为了避免与新客户端连接发生竞争,在关闭服务器时,应该在调用 abort_clients() 之前调用 close()

3.13 版本中新增。

get_loop()

返回与服务器对象关联的事件循环。

3.7 版本新增。

协程 start_serving()

开始接受连接。

此方法是幂等的,因此可以在服务器已经提供服务时调用。

loop.create_server()asyncio.start_server() 的仅限关键字参数 start_serving 允许创建一个最初不接受连接的 Server 对象。在这种情况下,可以使用 Server.start_serving()Server.serve_forever() 使服务器开始接受连接。

3.7 版本新增。

协程 serve_forever()

开始接受连接,直到协程被取消。取消 serve_forever 任务会导致服务器关闭。

如果服务器已经在接受连接,则可以调用此方法。每个 Server 对象只能存在一个 serve_forever 任务。

示例

async def client_connected(reader, writer):
    # Communicate with the client with
    # reader/writer streams.  For example:
    await reader.readline()

async def main(host, port):
    srv = await asyncio.start_server(
        client_connected, host, port)
    await srv.serve_forever()

asyncio.run(main('127.0.0.1', 0))

3.7 版本新增。

is_serving()

如果服务器正在接受新连接,则返回 True

3.7 版本新增。

协程 wait_closed()

等待 close() 方法完成且所有活动连接都已结束。

sockets

服务器正在监听的类 socket 对象列表,asyncio.trsock.TransportSocket

在 3.7 版本中更改: 在 Python 3.7 之前,Server.sockets 曾经直接返回服务器套接字的内部列表。在 3.7 中,返回该列表的副本。

事件循环实现

asyncio 提供了两种不同的事件循环实现:SelectorEventLoopProactorEventLoop

默认情况下,asyncio 配置为使用 EventLoop

class asyncio.SelectorEventLoop

基于 selectors 模块的 AbstractEventLoop 的子类。

使用适用于给定平台的最有效的 selector。也可以手动配置要使用的确切选择器实现

import asyncio
import selectors

class MyPolicy(asyncio.DefaultEventLoopPolicy):
   def new_event_loop(self):
      selector = selectors.SelectSelector()
      return asyncio.SelectorEventLoop(selector)

asyncio.set_event_loop_policy(MyPolicy())

可用性: Unix,Windows。

class asyncio.ProactorEventLoop

用于 Windows 的 AbstractEventLoop 的子类,使用“I/O 完成端口”(IOCP)。

可用性: Windows。

class asyncio.EventLoop

给定平台上可用的 AbstractEventLoop 的最高效子类的别名。

它是 Unix 上的 SelectorEventLoop 和 Windows 上的 ProactorEventLoop 的别名。

3.13 版本中新增。

class asyncio.AbstractEventLoop

符合 asyncio 的事件循环的抽象基类。

事件循环方法 部分列出了 AbstractEventLoop 的替代实现应定义的所有方法。

示例

请注意,本节中的所有示例都**有意**展示了如何使用低级别的事件循环 API,例如 loop.run_forever()loop.call_soon()。现代的 asyncio 应用程序很少需要以这种方式编写;请考虑使用高级函数,如 asyncio.run()

使用 call_soon() 的 Hello World

一个使用 loop.call_soon() 方法来调度回调的示例。回调显示 "Hello World",然后停止事件循环

import asyncio

def hello_world(loop):
    """A callback to print 'Hello World' and stop the event loop"""
    print('Hello World')
    loop.stop()

loop = asyncio.new_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

另请参阅

使用协程和 run() 函数创建的类似的 Hello World 示例。

使用 call_later() 显示当前日期

一个每秒显示当前日期的回调示例。回调使用 loop.call_later() 方法在 5 秒后重新调度自身,然后停止事件循环

import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.new_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

另请参阅

使用协程和 run() 函数创建的类似的 当前日期 示例。

监视文件描述符的读取事件

等待文件描述符使用 loop.add_reader() 方法接收到一些数据,然后关闭事件循环

import asyncio
from socket import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()

loop = asyncio.new_event_loop()

def reader():
    data = rsock.recv(100)
    print("Received:", data.decode())

    # We are done: unregister the file descriptor
    loop.remove_reader(rsock)

    # Stop the event loop
    loop.stop()

# Register the file descriptor for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

try:
    # Run the event loop
    loop.run_forever()
finally:
    # We are done. Close sockets and the event loop.
    rsock.close()
    wsock.close()
    loop.close()

另请参阅

为 SIGINT 和 SIGTERM 设置信号处理器

(此 signals 示例仅在 Unix 上有效。)

使用 loop.add_signal_handler() 方法,为信号 SIGINTSIGTERM 注册处理器

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print("got signal %s: exit" % signame)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")

asyncio.run(main())