事件循环¶
源代码: Lib/asyncio/events.py, Lib/asyncio/base_events.py
前言
事件循环是每个 asyncio 应用程序的核心。事件循环运行异步任务和回调,执行网络 IO 操作,并运行子进程。
应用程序开发人员通常应该使用高级 asyncio 函数,例如 asyncio.run()
,并且很少需要引用循环对象或调用其方法。本节主要针对低级代码、库和框架的作者,他们需要对事件循环行为进行更精细的控制。
获取事件循环
以下低级函数可用于获取、设置或创建事件循环
- asyncio.get_running_loop()¶
返回当前 OS 线程中正在运行的事件循环。
如果当前没有正在运行的事件循环,则会引发
RuntimeError
。此函数只能从协程或回调中调用。
在版本 3.7 中添加。
- asyncio.get_event_loop()¶
获取当前事件循环。
当从协程或回调(例如使用 call_soon 或类似 API 调度)调用时,此函数将始终返回正在运行的事件循环。
如果没有设置正在运行的事件循环,该函数将返回
get_event_loop_policy().get_event_loop()
调用的结果。由于此函数的行为相当复杂(尤其是在使用自定义事件循环策略时),因此在协程和回调中使用
get_running_loop()
函数优于get_event_loop()
。如上所述,请考虑使用更高级别的
asyncio.run()
函数,而不是使用这些低级函数手动创建和关闭事件循环。自版本 3.12 起弃用: 如果当前没有事件循环,则会发出弃用警告。在未来的某个 Python 版本中,这将成为错误。
- asyncio.set_event_loop(loop)¶
将 loop 设置为当前 OS 线程的当前事件循环。
- asyncio.new_event_loop()¶
创建并返回一个新的事件循环对象。
请注意,get_event_loop()
、set_event_loop()
和 new_event_loop()
函数的行为可以通过 设置自定义事件循环策略 来改变。
内容
此文档页面包含以下部分
The 事件循环方法 部分是事件循环 API 的参考文档;
The 回调句柄 部分记录了
Handle
和TimerHandle
实例,这些实例是从调度方法(例如loop.call_soon()
和loop.call_later()
)返回的;The 服务器对象 部分记录了从事件循环方法(如
loop.create_server()
)返回的类型;The 事件循环实现 部分记录了
SelectorEventLoop
和ProactorEventLoop
类;The 示例 部分展示了如何使用一些事件循环 API。
事件循环方法¶
事件循环具有以下低级 API
运行和停止循环¶
- loop.run_until_complete(future)¶
运行,直到 future(
Future
的实例)完成。如果参数是 协程对象,它将隐式调度为
asyncio.Task
。返回 Future 的结果或引发其异常。
- loop.run_forever()¶
运行事件循环,直到调用
stop()
。如果在调用
run_forever()
之前调用stop()
,循环将使用零超时轮询 I/O 选择器一次,运行所有响应 I/O 事件(以及已计划的那些)而计划的回调,然后退出。如果在
run_forever()
运行时调用stop()
,循环将运行当前批次的回调,然后退出。请注意,在这种情况下,回调调度的新的回调将不会运行;相反,它们将在下次调用run_forever()
或run_until_complete()
时运行。
- loop.stop()¶
停止事件循环。
- loop.is_running()¶
如果事件循环当前正在运行,则返回
True
。
- loop.is_closed()¶
如果事件循环已关闭,则返回
True
。
- loop.close()¶
关闭事件循环。
调用此函数时,循环不得运行。任何挂起的回调都将被丢弃。
此方法清除所有队列并关闭执行器,但不等待执行器完成。
此方法是幂等的且不可逆的。在事件循环关闭后,不应调用任何其他方法。
- 协程 loop.shutdown_asyncgens()¶
计划所有当前打开的 异步生成器 对象使用
aclose()
调用关闭。调用此方法后,如果迭代新的异步生成器,事件循环将发出警告。这应该用于可靠地完成所有计划的异步生成器。请注意,当使用
asyncio.run()
时,无需调用此函数。示例
try: loop.run_forever() finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
在版本 3.6 中添加。
- 协程 loop.shutdown_default_executor(timeout=None)¶
计划关闭默认执行器并等待它加入
ThreadPoolExecutor
中的所有线程。一旦调用了此方法,使用默认执行器与loop.run_in_executor()
将引发RuntimeError
。timeout 参数指定执行器完成加入所需的时间量(以
float
秒为单位)。使用默认值None
,执行器允许无限的时间。如果达到 timeout,将发出
RuntimeWarning
,并且默认执行器将终止,而不会等待其线程完成加入。注意
当使用
asyncio.run()
时,不要调用此方法,因为后者会自动处理默认执行器的关闭。在版本 3.9 中添加。
在版本 3.12 中更改: 添加了 timeout 参数。
调度回调¶
- loop.call_soon(callback, *args, context=None)¶
计划在事件循环的下次迭代中使用 args 参数调用 callback 回调。
返回
asyncio.Handle
的实例,该实例可用于稍后取消回调。回调按注册顺序调用。每个回调将被精确调用一次。
可选的关键字参数 context 指定 callback 运行的自定义
contextvars.Context
。当未提供 context 时,回调使用当前上下文。与
call_soon_threadsafe()
不同,此方法不是线程安全的。
- loop.call_soon_threadsafe(callback, *args, context=None)¶
call_soon()
的线程安全变体。当从另一个线程调度回调时,必须使用此函数,因为call_soon()
不是线程安全的。如果在已关闭的循环上调用,则引发
RuntimeError
。当主应用程序关闭时,这可能会发生在辅助线程上。请参阅文档的 并发和多线程 部分。
在版本 3.7 中更改: 添加了 context 关键字参数。有关更多详细信息,请参阅 PEP 567。
注意
大多数 asyncio
调度函数不允许传递关键字参数。为此,请使用 functools.partial()
# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
使用部分对象通常比使用 lambda 更方便,因为 asyncio 可以在调试和错误消息中更好地呈现部分对象。
调度延迟回调¶
事件循环提供机制来调度回调函数,以便在将来的某个时间点调用。事件循环使用单调时钟来跟踪时间。
- loop.call_later(delay, callback, *args, context=None)¶
计划在给定的 delay 秒数(可以是整数或浮点数)之后调用 callback。
返回
asyncio.TimerHandle
的实例,该实例可用于取消回调。callback 将被精确调用一次。如果两个回调计划在完全相同的时间调用,则它们被调用的顺序是不确定的。
可选的位置参数 args 将在调用回调时传递给回调。如果您希望回调使用关键字参数调用,请使用
functools.partial()
。可选的关键字参数 context 允许指定 callback 运行的自定义
contextvars.Context
。当未提供 context 时,使用当前上下文。在版本 3.7 中更改: 添加了 context 关键字参数。有关更多详细信息,请参阅 PEP 567。
在版本 3.8 中更改: 在 Python 3.7 及更早版本中,使用默认事件循环实现,delay 不能超过一天。这在 Python 3.8 中已修复。
- loop.call_at(when, callback, *args, context=None)¶
使用与
loop.time()
相同的时间参考,在给定的绝对时间戳 when(一个整数或浮点数)处安排调用 callback。此方法的行为与
call_later()
相同。返回
asyncio.TimerHandle
的实例,该实例可用于取消回调。在版本 3.7 中更改: 添加了 context 关键字参数。有关更多详细信息,请参见 PEP 567。
在版本 3.8 中更改: 在 Python 3.7 及更早版本中,使用默认事件循环实现,when 与当前时间之间的差值不能超过一天。此问题已在 Python 3.8 中修复。
注意
在版本 3.8 中更改: 在 Python 3.7 及更早版本中,超时(相对 delay 或绝对 when)不应超过一天。此问题已在 Python 3.8 中修复。
另请参见
asyncio.sleep()
函数。
创建 Futures 和 Tasks¶
- loop.create_future()¶
创建一个与事件循环关联的
asyncio.Future
对象。这是在 asyncio 中创建 Futures 的首选方法。这允许第三方事件循环提供 Future 对象的替代实现(具有更好的性能或检测功能)。
在版本 3.5.2 中添加。
- loop.create_task(coro, *, name=None, context=None)¶
-
第三方事件循环可以使用它们自己的
Task
子类来实现互操作性。在这种情况下,结果类型是Task
的子类。如果提供了 name 参数且不为
None
,则使用Task.set_name()
将其设置为任务的名称。可选的关键字参数 context 允许为 coro 指定一个自定义的
contextvars.Context
来运行。如果没有提供 context,则会创建当前上下文的副本。在版本 3.8 中更改: 添加了 name 参数。
在版本 3.11 中更改: 添加了 context 参数。
- loop.set_task_factory(factory)¶
设置一个任务工厂,该工厂将被
loop.create_task()
使用。如果 factory 为
None
,则将设置默认任务工厂。否则,factory 必须是一个可调用对象,其签名与(loop, coro, context=None)
匹配,其中 loop 是对活动事件循环的引用,而 coro 是一个协程对象。该可调用对象必须返回一个与asyncio.Future
兼容的对象。
- loop.get_task_factory()¶
返回一个任务工厂,如果使用的是默认任务工厂,则返回
None
。
打开网络连接¶
- coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None, all_errors=False)¶
打开到由 host 和 port 指定的地址的流式传输连接。
套接字族可以是
AF_INET
或AF_INET6
,具体取决于 host(或 family 参数,如果提供)。套接字类型将为
SOCK_STREAM
。protocol_factory 必须是一个可调用对象,它返回一个 asyncio 协议 实现。
此方法将尝试在后台建立连接。成功后,它将返回一个
(transport, protocol)
对。底层操作的时间顺序概要如下
建立连接并为其创建一个 传输。
protocol_factory 在没有参数的情况下被调用,并期望返回一个 协议 实例。
通过调用协议实例的
connection_made()
方法,将协议实例与传输耦合。成功后将返回一个
(transport, protocol)
元组。
创建的传输是与实现相关的双向流。
其他参数
ssl: 如果给出且不为 false,则创建一个 SSL/TLS 传输(默认情况下创建一个普通 TCP 传输)。如果 ssl 是一个
ssl.SSLContext
对象,则使用此上下文来创建传输;如果 ssl 为True
,则使用从ssl.create_default_context()
返回的默认上下文。另请参见
server_hostname 设置或覆盖目标服务器证书将与之匹配的主机名。仅当 ssl 不为
None
时才应传递。默认情况下使用 host 参数的值。如果 host 为空,则没有默认值,您必须为 server_hostname 传递一个值。如果 server_hostname 是一个空字符串,则主机名匹配将被禁用(这是一个严重的安全性风险,可能导致中间人攻击)。family、proto 和 flags 是可选的地址族、协议和标志,用于传递给 getaddrinfo() 以进行 host 解析。如果给出,这些都应该是来自相应
socket
模块常量的整数。happy_eyeballs_delay,如果给出,则为该连接启用 Happy Eyeballs。它应该是一个浮点数,表示在开始下一个并行尝试之前等待连接尝试完成的时间(以秒为单位)。这是 RFC 8305 中定义的“连接尝试延迟”。RFC 推荐的合理默认值为
0.25
(250 毫秒)。interleave 控制当主机名解析为多个 IP 地址时地址的重新排序。如果为
0
或未指定,则不进行重新排序,并且地址按getaddrinfo()
返回的顺序尝试。如果指定了正整数,则地址按地址族交错,并且给定的整数被解释为 RFC 8305 中定义的“第一个地址族计数”。如果未指定 happy_eyeballs_delay,则默认值为0
,如果指定了 happy_eyeballs_delay,则默认值为1
。sock,如果给出,应该是一个现有的、已连接的
socket.socket
对象,供传输使用。如果给出 sock,则不应指定 host、port、family、proto、flags、happy_eyeballs_delay、interleave 和 local_addr。注意
sock 参数将套接字的所有权转移到创建的传输。要关闭套接字,请调用传输的
close()
方法。local_addr,如果给出,是一个
(local_host, local_port)
元组,用于在本地绑定套接字。local_host 和 local_port 使用getaddrinfo()
查找,类似于 host 和 port。ssl_handshake_timeout 是(对于 TLS 连接)在放弃连接之前等待 TLS 握手完成的时间(以秒为单位)。如果为
None
(默认),则为60.0
秒。ssl_shutdown_timeout 是在放弃连接之前等待 SSL 关闭完成的时间(以秒为单位)。如果为
None
(默认),则为30.0
秒。all_errors 决定在无法创建连接时引发哪些异常。默认情况下,只引发一个
Exception
:如果只有一个异常,则引发第一个异常,如果所有错误都具有相同的错误消息,则引发一个OSError
,其中包含组合的错误消息。当all_errors
为True
时,将引发一个ExceptionGroup
,其中包含所有异常(即使只有一个异常)。
在版本 3.5 中更改: 在
ProactorEventLoop
中添加了对 SSL/TLS 的支持。在版本 3.6 中更改: 套接字选项 socket.TCP_NODELAY 默认情况下为所有 TCP 连接设置。
在版本 3.7 中更改: 添加了 ssl_handshake_timeout 参数。
在版本 3.8 中更改: 添加了 happy_eyeballs_delay 和 interleave 参数。
Happy Eyeballs 算法:双栈主机成功。当服务器的 IPv4 路径和协议工作,但服务器的 IPv6 路径和协议不工作时,双栈客户端应用程序与 IPv4 客户端相比会遇到明显的连接延迟。这是不可取的,因为它会导致双栈客户端的用户体验更差。本文档指定了减少这种用户可见延迟的算法的要求,并提供了一种算法。
在版本 3.11 中更改: 添加了 ssl_shutdown_timeout 参数。
在版本 3.12 中更改: 添加了 all_errors。
另请参见
open_connection()
函数是一个高级的替代 API。它返回一个 (StreamReader
,StreamWriter
) 对,可以直接在 async/await 代码中使用。
- coroutine loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_port=None, allow_broadcast=None, sock=None)¶
创建数据报连接。
套接字族可以是
AF_INET
、AF_INET6
或AF_UNIX
,具体取决于 host(或如果提供,则取决于 family 参数)。套接字类型将为
SOCK_DGRAM
。protocol_factory 必须是一个可调用对象,返回一个 协议 实现。
成功后将返回一个
(transport, protocol)
元组。其他参数
local_addr,如果给出,是一个
(local_host, local_port)
元组,用于在本地绑定套接字。local_host 和 local_port 使用getaddrinfo()
查找。remote_addr,如果给出,是一个
(remote_host, remote_port)
元组,用于将套接字连接到远程地址。remote_host 和 remote_port 使用getaddrinfo()
查找。family、proto 和 flags 是可选的地址族、协议和标志,用于传递给
getaddrinfo()
以进行 host 解析。如果给出,这些都应该是来自相应socket
模块常量的整数。reuse_port 告诉内核允许此端点绑定到与其他现有端点绑定的相同端口,只要它们在创建时都设置了此标志。此选项在 Windows 和某些 Unix 上不受支持。如果 socket.SO_REUSEPORT 常量未定义,则此功能不受支持。
allow_broadcast 告诉内核允许此端点向广播地址发送消息。
sock 可以选择性地指定,以便使用预先存在的、已连接的
socket.socket
对象供传输使用。如果指定,local_addr 和 remote_addr 应该被省略(必须为None
)。注意
sock 参数将套接字的所有权转移到创建的传输。要关闭套接字,请调用传输的
close()
方法。
请参阅 UDP 回显客户端协议 和 UDP 回显服务器协议 示例。
在版本 3.4.4 中更改: 添加了 family、proto、flags、reuse_address、reuse_port、allow_broadcast 和 sock 参数。
在版本 3.8 中更改: 添加了对 Windows 的支持。
在版本 3.8.1 中更改: reuse_address 参数不再受支持,因为使用 socket.SO_REUSEADDR 会对 UDP 构成重大的安全隐患。显式传递
reuse_address=True
将引发异常。当多个具有不同 UID 的进程使用
SO_REUSEADDR
将套接字分配给相同的 UDP 套接字地址时,传入的数据包可能会随机分布在这些套接字之间。对于支持的平台,reuse_port 可以用作类似功能的替代方案。使用 reuse_port,socket.SO_REUSEPORT 将被使用,它专门防止具有不同 UID 的进程将套接字分配给相同的套接字地址。
在版本 3.11 中更改: reuse_address 参数自 Python 3.8.1、3.7.6 和 3.6.10 起已禁用,现已完全移除。
- coroutine loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
创建 Unix 连接。
套接字族将为
AF_UNIX
;套接字类型将为SOCK_STREAM
。成功后将返回一个
(transport, protocol)
元组。path 是 Unix 域套接字的名称,并且是必需的,除非指定了 sock 参数。抽象 Unix 套接字、
str
、bytes
和Path
路径都受支持。有关此方法参数的信息,请参阅
loop.create_connection()
方法的文档。可用性: Unix。
在版本 3.7 中更改: 添加了 ssl_handshake_timeout 参数。path 参数现在可以是 路径类对象。
在版本 3.11 中更改: 添加了 ssl_shutdown_timeout 参数。
创建网络服务器¶
- coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
创建 TCP 服务器(套接字类型
SOCK_STREAM
)监听 host 地址的 port 端口。返回一个
Server
对象。参数
protocol_factory 必须是一个可调用对象,返回一个 协议 实现。
host 参数可以设置为几种类型,这些类型决定了服务器将监听的位置
如果 host 是一个字符串,则 TCP 服务器将绑定到 host 指定的单个网络接口。
如果 host 是一个字符串序列,则 TCP 服务器将绑定到序列指定的全部网络接口。
如果 host 是一个空字符串或
None
,则假定所有接口,并将返回多个套接字的列表(很可能一个用于 IPv4,另一个用于 IPv6)。
port 参数可以设置为指定服务器应该监听的端口。如果为
0
或None
(默认值),则将选择一个随机的未使用的端口(请注意,如果 host 解析为多个网络接口,则将为每个接口选择一个不同的随机端口)。family 可以设置为
socket.AF_INET
或AF_INET6
,以强制套接字使用 IPv4 或 IPv6。如果未设置,family 将从主机名确定(默认为AF_UNSPEC
)。flags 是
getaddrinfo()
的位掩码。sock 可以选择性地指定,以便使用预先存在的套接字对象。如果指定,则不得指定 host 和 port。
注意
sock 参数将套接字的所有权转移到创建的服务器。要关闭套接字,请调用服务器的
close()
方法。backlog 是传递给
listen()
的最大排队连接数(默认为 100)。ssl 可以设置为
SSLContext
实例,以在接受的连接上启用 TLS。reuse_address 告诉内核在
TIME_WAIT
状态下重用本地套接字,而无需等待其自然超时到期。如果未指定,将在 Unix 上自动设置为True
。reuse_port 告诉内核允许此端点绑定到与其他现有端点绑定的相同端口,只要它们在创建时都设置了此标志。此选项在 Windows 上不受支持。
ssl_handshake_timeout 是(对于 TLS 服务器)在中止连接之前等待 TLS 握手完成的时间(以秒为单位)。如果为
None
(默认值),则为60.0
秒。ssl_shutdown_timeout 是在放弃连接之前等待 SSL 关闭完成的时间(以秒为单位)。如果为
None
(默认),则为30.0
秒。start_serving 设置为
True
(默认值)会导致创建的服务器立即开始接受连接。当设置为False
时,用户应该等待Server.start_serving()
或Server.serve_forever()
,以使服务器开始接受连接。
在版本 3.5 中更改: 在
ProactorEventLoop
中添加了对 SSL/TLS 的支持。在版本 3.5.1 中更改: host 参数可以是字符串序列。
在版本 3.6 中更改: 添加了 ssl_handshake_timeout 和 start_serving 参数。套接字选项 socket.TCP_NODELAY 默认情况下为所有 TCP 连接设置。
在版本 3.11 中更改: 添加了 ssl_shutdown_timeout 参数。
另请参见
函数
start_server()
是一个更高层次的替代 API,它返回一对StreamReader
和StreamWriter
,可用于 async/await 代码。
- coroutine loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
类似于
loop.create_server()
,但适用于AF_UNIX
套接字族。path 是 Unix 域套接字的名称,是必需的,除非提供了 sock 参数。支持抽象 Unix 套接字、
str
、bytes
和Path
路径。有关此方法参数的信息,请参阅
loop.create_server()
方法的文档。可用性: Unix。
在 3.7 版本中更改: 添加了 ssl_handshake_timeout 和 start_serving 参数。path 参数现在可以是
Path
对象。在版本 3.11 中更改: 添加了 ssl_shutdown_timeout 参数。
- coroutine loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
将已接受的连接包装到传输/协议对中。
此方法可用于在 asyncio 之外接受连接但使用 asyncio 处理连接的服务器。
参数
protocol_factory 必须是一个可调用对象,返回一个 协议 实现。
sock 是从
socket.accept
返回的预先存在的套接字对象。注意
sock 参数将套接字的所有权转移到创建的传输。要关闭套接字,请调用传输的
close()
方法。ssl 可以设置为
SSLContext
以在已接受的连接上启用 SSL。ssl_handshake_timeout 是(对于 SSL 连接)在放弃连接之前等待 SSL 握手完成的时间(以秒为单位)。如果为
None
(默认值),则为60.0
秒。ssl_shutdown_timeout 是在放弃连接之前等待 SSL 关闭完成的时间(以秒为单位)。如果为
None
(默认),则为30.0
秒。
返回一个
(transport, protocol)
对。在 3.5.3 版本中添加。
在版本 3.7 中更改: 添加了 ssl_handshake_timeout 参数。
在版本 3.11 中更改: 添加了 ssl_shutdown_timeout 参数。
传输文件¶
- coroutine loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)¶
通过 transport 发送 file。返回已发送的字节总数。
该方法使用高性能
os.sendfile()
(如果可用)。file 必须是使用二进制模式打开的常规文件对象。
offset 指示从哪里开始读取文件。如果指定,count 是要传输的字节总数,而不是发送文件直到到达 EOF。即使此方法引发错误,文件位置也会始终更新,并且可以使用
file.tell()
获取已发送的实际字节数。fallback 设置为
True
使 asyncio 在平台不支持 sendfile 系统调用(例如 Windows 或 Unix 上的 SSL 套接字)时手动读取和发送文件。如果系统不支持 sendfile 系统调用并且 fallback 为
False
,则引发SendfileNotAvailableError
。在版本 3.7 中添加。
TLS 升级¶
- coroutine loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
将现有的基于传输的连接升级到 TLS。
创建一个 TLS 编码器/解码器实例,并将其插入 transport 和 protocol 之间。编码器/解码器同时实现面向 transport 的协议和面向 protocol 的传输。
返回创建的双接口实例。在 await 之后,protocol 必须停止使用原始 transport,并且只能与返回的对象通信,因为编码器缓存 protocol 端的数据,并偶尔与 transport 交换额外的 TLS 会话数据包。
在某些情况下(例如,当传递的传输已关闭时),这可能会返回
None
。参数
transport 和 protocol 实例,例如
create_server()
和create_connection()
返回。sslcontext:一个已配置的
SSLContext
实例。server_side 在升级服务器端连接(例如由
create_server()
创建的连接)时传递True
。server_hostname:设置或覆盖目标服务器证书将与之匹配的主机名。
ssl_handshake_timeout 是(对于 TLS 连接)在放弃连接之前等待 TLS 握手完成的时间(以秒为单位)。如果为
None
(默认),则为60.0
秒。ssl_shutdown_timeout 是在放弃连接之前等待 SSL 关闭完成的时间(以秒为单位)。如果为
None
(默认),则为30.0
秒。
在版本 3.7 中添加。
在版本 3.11 中更改: 添加了 ssl_shutdown_timeout 参数。
监视文件描述符¶
- loop.add_reader(fd, callback, *args)¶
开始监控fd 文件描述符的读取可用性,并在fd 可读时使用指定参数调用callback。
- loop.remove_reader(fd)¶
停止监控fd 文件描述符的读取可用性。如果fd 之前正在被监控读取,则返回
True
。
- loop.add_writer(fd, callback, *args)¶
开始监控fd 文件描述符的写入可用性,并在fd 可写时使用指定参数调用callback。
使用
functools.partial()
传递关键字参数 到callback。
- loop.remove_writer(fd)¶
停止监控fd 文件描述符的写入可用性。如果fd 之前正在被监控写入,则返回
True
。
另请参阅 平台支持 部分了解这些方法的一些限制。
直接使用套接字对象¶
通常,使用基于传输的 API(如 loop.create_connection()
和 loop.create_server()
)的协议实现比直接使用套接字的实现更快。但是,在某些情况下,性能不是关键,直接使用 socket
对象更方便。
- coroutine loop.sock_recv(sock, nbytes)¶
从sock 接收最多nbytes 的数据。
socket.recv()
的异步版本。将接收到的数据作为字节对象返回。
sock 必须是非阻塞套接字。
在版本 3.7 中更改: 尽管此方法始终被记录为协程方法,但在 Python 3.7 之前的版本中返回了一个
Future
。从 Python 3.7 开始,这是一个async def
方法。
- coroutine loop.sock_recv_into(sock, buf)¶
从sock 接收数据到buf 缓冲区。模拟阻塞
socket.recv_into()
方法。返回写入缓冲区的字节数。
sock 必须是非阻塞套接字。
在版本 3.7 中添加。
- coroutine loop.sock_recvfrom(sock, bufsize)¶
从sock 接收最多bufsize 的数据报。
socket.recvfrom()
的异步版本。返回一个包含 (接收到的数据, 远程地址) 的元组。
sock 必须是非阻塞套接字。
在版本 3.11 中添加。
- coroutine loop.sock_recvfrom_into(sock, buf, nbytes=0)¶
从sock 接收最多nbytes 的数据报到buf。
socket.recvfrom_into()
的异步版本。返回一个包含 (接收到的字节数, 远程地址) 的元组。
sock 必须是非阻塞套接字。
在版本 3.11 中添加。
- coroutine loop.sock_sendall(sock, data)¶
将data 发送到sock 套接字。
socket.sendall()
的异步版本。此方法将继续发送到套接字,直到data 中的所有数据都被发送或发生错误。成功时返回
None
。发生错误时,会引发异常。此外,无法确定连接的接收端成功处理了多少数据(如果有)。sock 必须是非阻塞套接字。
在版本 3.7 中更改: 尽管该方法始终被记录为协程方法,但在 Python 3.7 之前,它返回了一个
Future
。从 Python 3.7 开始,这是一个async def
方法。
- coroutine loop.sock_sendto(sock, data, address)¶
从sock 发送数据报到address。
socket.sendto()
的异步版本。返回发送的字节数。
sock 必须是非阻塞套接字。
在版本 3.11 中添加。
- 协程 loop.sock_connect(sock, address)¶
将sock连接到address处的远程套接字。
socket.connect()
的异步版本。sock 必须是非阻塞套接字。
在版本 3.5.2 中更改:
address
不再需要解析。sock_connect
将尝试通过调用socket.inet_pton()
来检查address是否已解析。 如果没有,将使用loop.getaddrinfo()
来解析address。
- 协程 loop.sock_accept(sock)¶
接受连接。 模仿阻塞的
socket.accept()
方法。套接字必须绑定到地址并监听连接。 返回值是一个元组
(conn, address)
,其中conn是一个新的套接字对象,可用于在连接上发送和接收数据,而address是绑定到连接另一端套接字的地址。sock 必须是非阻塞套接字。
在版本 3.7 中更改: 尽管该方法始终被记录为协程方法,但在 Python 3.7 之前,它返回了一个
Future
。从 Python 3.7 开始,这是一个async def
方法。
- 协程 loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)¶
如果可能,使用高性能
os.sendfile
发送文件。 返回发送的总字节数。socket.sendfile()
的异步版本。sock必须是非阻塞的
socket.SOCK_STREAM
socket
。file必须是使用二进制模式打开的常规文件对象。
offset 指示从哪里开始读取文件。如果指定,count 是要传输的字节总数,而不是发送文件直到到达 EOF。即使此方法引发错误,文件位置也会始终更新,并且可以使用
file.tell()
获取已发送的实际字节数。fallback设置为
True
时,如果平台不支持sendfile系统调用(例如Windows或Unix上的SSL套接字),asyncio将手动读取并发送文件。如果系统不支持sendfile系统调用并且fallback为
False
,则引发SendfileNotAvailableError
。sock 必须是非阻塞套接字。
在版本 3.7 中添加。
DNS¶
- 协程 loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)¶
socket.getaddrinfo()
的异步版本。
- 协程 loop.getnameinfo(sockaddr, flags=0)¶
socket.getnameinfo()
的异步版本。
注意
getaddrinfo和getnameinfo都通过循环的默认线程池执行器在内部利用其同步版本。 当此执行器饱和时,这些方法可能会遇到延迟,高级网络库可能会将其报告为超时时间增加。 为了缓解这种情况,请考虑为其他用户任务使用自定义执行器,或使用具有更多工作线程的默认执行器。
在版本 3.7 中更改: getaddrinfo和getnameinfo方法始终记录为返回协程,但在 Python 3.7 之前,它们实际上返回的是asyncio.Future
对象。 从 Python 3.7 开始,这两种方法都是协程。
使用管道¶
- 协程 loop.connect_read_pipe(protocol_factory, pipe)¶
在事件循环中注册pipe的读端。
protocol_factory 必须是一个可调用对象,它返回一个 asyncio 协议 实现。
pipe是一个类文件对象。
返回元组
(transport, protocol)
,其中transport支持ReadTransport
接口,而protocol是protocol_factory实例化的对象。使用
SelectorEventLoop
事件循环时,pipe将设置为非阻塞模式。
- 协程 loop.connect_write_pipe(protocol_factory, pipe)¶
在事件循环中注册pipe的写端。
protocol_factory 必须是一个可调用对象,它返回一个 asyncio 协议 实现。
pipe是类文件对象。
返回元组
(transport, protocol)
,其中transport支持WriteTransport
接口,而protocol是protocol_factory实例化的对象。使用
SelectorEventLoop
事件循环时,pipe将设置为非阻塞模式。
注意
SelectorEventLoop
在Windows上不支持以上方法。 对Windows使用ProactorEventLoop
。
Unix信号¶
- loop.add_signal_handler(signum, callback, *args)¶
将callback设置为signum信号的处理程序。
回调将由loop调用,以及该事件循环的其他排队回调和可运行协程。 与使用
signal.signal()
注册的信号处理程序不同,使用此函数注册的回调可以与事件循环交互。如果信号编号无效或不可捕获,则引发
ValueError
。如果设置处理程序时出现问题,则引发RuntimeError
。使用
functools.partial()
传递关键字参数 到callback。与
signal.signal()
一样,此函数必须在主线程中调用。
- loop.remove_signal_handler(sig)¶
删除sig信号的处理程序。
如果信号处理程序被删除,则返回
True
,如果给定信号没有设置处理程序,则返回False
。可用性: Unix。
另请参见
signal
模块。
在线程或进程池中执行代码¶
- awaitable loop.run_in_executor(executor, func, *args)¶
安排在指定的执行器中调用func。
executor参数应该是一个
concurrent.futures.Executor
实例。如果executor为None
,则使用默认执行器。示例
import asyncio import concurrent.futures def blocking_io(): # File operations (such as logging) can block the # event loop: run them in a thread pool. with open('/dev/urandom', 'rb') as f: return f.read(100) def cpu_bound(): # CPU-bound operations will block the event loop: # in general it is preferable to run them in a # process pool. return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() ## Options: # 1. Run in the default loop's executor: result = await loop.run_in_executor( None, blocking_io) print('default thread pool', result) # 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) # 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom process pool', result) if __name__ == '__main__': asyncio.run(main())
请注意,由于
multiprocessing
(由ProcessPoolExecutor
使用)的特殊性,选项 3 需要入口点保护(if __name__ == '__main__'
)。请参阅主模块的安全导入。此方法返回一个
asyncio.Future
对象。使用
functools.partial()
传递关键字参数到func。在版本 3.5.3 中更改:
loop.run_in_executor()
不再配置它创建的线程池执行器的max_workers
,而是让线程池执行器(ThreadPoolExecutor
)来设置默认值。
- loop.set_default_executor(executor)¶
将executor设置为
run_in_executor()
使用的默认执行器。executor必须是ThreadPoolExecutor
的实例。在版本 3.11 中更改: executor必须是
ThreadPoolExecutor
的实例。
错误处理 API¶
允许自定义在事件循环中如何处理异常。
- loop.set_exception_handler(handler)¶
将handler设置为新的事件循环异常处理程序。
如果handler为
None
,则将设置默认异常处理程序。否则,handler必须是一个可调用对象,其签名与(loop, context)
匹配,其中loop
是对活动事件循环的引用,而context
是一个dict
对象,其中包含异常的详细信息(有关上下文的信息,请参阅call_exception_handler()
文档)。如果处理程序代表一个
Task
或Handle
被调用,它将在该任务或回调句柄的contextvars.Context
中运行。在版本 3.12 中更改: 处理程序可以在发生异常的任务或句柄的
Context
中被调用。
- loop.get_exception_handler()¶
返回当前异常处理程序,如果未设置自定义异常处理程序,则返回
None
。在版本 3.5.2 中添加。
- loop.default_exception_handler(context)¶
默认异常处理程序。
当发生异常且未设置异常处理程序时,会调用此方法。自定义异常处理程序可以调用此方法,以委托给默认处理程序的行为。
context参数与
call_exception_handler()
中的含义相同。
- loop.call_exception_handler(context)¶
调用当前事件循环异常处理程序。
context是一个
dict
对象,包含以下键(将来版本的 Python 中可能会引入新的键)‘message’: 错误消息;
‘exception’(可选):异常对象;
‘future’(可选):
asyncio.Future
实例;‘task’(可选):
asyncio.Task
实例;‘handle’(可选):
asyncio.Handle
实例;‘protocol’(可选):协议实例;
‘transport’(可选):传输实例;
‘socket’(可选):
socket.socket
实例;- ‘asyncgen’(可选):导致
异常的异步生成器。
注意
此方法不应在子类化的事件循环中重载。对于自定义异常处理,请使用
set_exception_handler()
方法。
启用调试模式¶
- loop.get_debug()¶
获取事件循环的调试模式(
bool
)。如果环境变量
PYTHONASYNCIODEBUG
设置为非空字符串,则默认值为True
,否则为False
。
- loop.set_debug(enabled: bool)¶
设置事件循环的调试模式。
在版本 3.7 中更改: 新的Python 开发模式现在也可以用来启用调试模式。
- loop.slow_callback_duration¶
此属性可用于设置被认为“缓慢”的最小执行持续时间(以秒为单位)。当启用调试模式时,会记录“缓慢”的回调。
默认值为 100 毫秒。
另请参见
运行子进程¶
本节中描述的方法是低级的。在常规的异步/等待代码中,请考虑使用高级的 asyncio.create_subprocess_shell()
和 asyncio.create_subprocess_exec()
方便函数。
注意
在 Windows 上,默认事件循环 ProactorEventLoop
支持子进程,而 SelectorEventLoop
不支持。有关详细信息,请参见 Windows 上的子进程支持。
- 协程 loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)¶
从 args 指定的一个或多个字符串参数创建子进程。
args 必须是字符串列表,表示为
第一个字符串指定程序可执行文件,其余字符串指定参数。字符串参数共同构成程序的
argv
。这类似于标准库
subprocess.Popen
类,使用shell=False
调用,并将字符串列表作为第一个参数传递;但是,Popen
接受一个作为字符串列表的单个参数,而 subprocess_exec 接受多个字符串参数。protocol_factory 必须是一个可调用对象,返回
asyncio.SubprocessProtocol
类的子类。其他参数
stdin 可以是以下任何一种
类文件对象
现有文件描述符(正整数),例如使用
os.pipe()
创建的文件描述符subprocess.PIPE
常量(默认值),它将创建一个新管道并连接它,值
None
将使子进程从当前进程继承文件描述符subprocess.DEVNULL
常量,表示将使用特殊的os.devnull
文件
stdout 可以是以下任何一种
类文件对象
subprocess.PIPE
常量(默认值),它将创建一个新管道并连接它,值
None
将使子进程从当前进程继承文件描述符subprocess.DEVNULL
常量,表示将使用特殊的os.devnull
文件
stderr 可以是以下任何一种
类文件对象
subprocess.PIPE
常量(默认值),它将创建一个新管道并连接它,值
None
将使子进程从当前进程继承文件描述符subprocess.DEVNULL
常量,表示将使用特殊的os.devnull
文件subprocess.STDOUT
常量,它将标准错误流连接到进程的标准输出流
所有其他关键字参数都将传递给
subprocess.Popen
而不进行解释,除了 bufsize、universal_newlines、shell、text、encoding 和 errors,这些参数不应该指定。asyncio
子进程 API 不支持将流解码为文本。可以使用bytes.decode()
将从流返回的字节转换为文本。
如果作为 stdin、stdout 或 stderr 传递的类文件对象表示管道,则应使用
connect_write_pipe()
或connect_read_pipe()
将该管道的另一端注册到事件循环中。有关其他参数的文档,请参见
subprocess.Popen
类的构造函数。返回一个
(transport, protocol)
对,其中 transport 符合asyncio.SubprocessTransport
基类,protocol 是由 protocol_factory 实例化的对象。
- 协程 loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)¶
从 cmd 创建子进程,cmd 可以是
str
或bytes
字符串,编码为 文件系统编码,使用平台的“shell”语法。这类似于标准库
subprocess.Popen
类,使用shell=True
调用。protocol_factory 必须是一个可调用对象,返回
SubprocessProtocol
类的子类。有关其余参数的更多详细信息,请参见
subprocess_exec()
。返回一个
(transport, protocol)
对,其中 transport 符合SubprocessTransport
基类,protocol 是由 protocol_factory 实例化的对象。
注意
应用程序有责任确保所有空格和特殊字符都以适当的方式引用,以避免 shell 注入 漏洞。可以使用 shlex.quote()
函数正确地转义将用于构建 shell 命令的字符串中的空格和特殊字符。
回调句柄¶
- 类 asyncio.Handle¶
由
loop.call_soon()
、loop.call_soon_threadsafe()
返回的回调包装器对象。- get_context()¶
返回与句柄关联的
contextvars.Context
对象。在版本 3.12 中添加。
- cancel()¶
取消回调。如果回调已经取消或执行,则此方法没有效果。
- cancelled()¶
如果回调被取消,则返回
True
。在版本 3.7 中添加。
- class asyncio.TimerHandle¶
由
loop.call_later()
和loop.call_at()
返回的回调包装器对象。此类是
Handle
的子类。- when()¶
以
float
秒的形式返回计划的回调时间。该时间是绝对时间戳,使用与
loop.time()
相同的时间参考。在版本 3.7 中添加。
服务器对象¶
服务器对象由 loop.create_server()
、loop.create_unix_server()
、start_server()
和 start_unix_server()
函数创建。
不要直接实例化 Server
类。
- class asyncio.Server¶
Server 对象是异步上下文管理器。当在
async with
语句中使用时,可以保证当async with
语句完成时,Server 对象已关闭且不再接受新连接。srv = await loop.create_server(...) async with srv: # some code # At this point, srv is closed and no longer accepts new connections.
Changed in version 3.7: 自 Python 3.7 起,Server 对象成为异步上下文管理器。
Changed in version 3.11: 此类在 Python 3.9.11、3.10.3 和 3.11 中公开为
asyncio.Server
。- close()¶
停止服务:关闭监听套接字并将
sockets
属性设置为None
。表示现有传入客户端连接的套接字保持打开状态。
服务器异步关闭;使用
wait_closed()
协程等待服务器关闭(并且没有更多连接处于活动状态)。
- get_loop()¶
返回与服务器对象关联的事件循环。
在版本 3.7 中添加。
- coroutine start_serving()¶
开始接受连接。
此方法是幂等的,因此可以在服务器已在提供服务时调用。
start_serving 关键字参数用于
loop.create_server()
和asyncio.start_server()
,允许创建最初不接受连接的 Server 对象。在这种情况下,可以使用Server.start_serving()
或Server.serve_forever()
使 Server 开始接受连接。在版本 3.7 中添加。
- coroutine serve_forever()¶
开始接受连接,直到协程被取消。取消
serve_forever
任务会导致服务器关闭。如果服务器已在接受连接,则可以调用此方法。每个 Server 对象只能存在一个
serve_forever
任务。示例
async def client_connected(reader, writer): # Communicate with the client with # reader/writer streams. For example: await reader.readline() async def main(host, port): srv = await asyncio.start_server( client_connected, host, port) await srv.serve_forever() asyncio.run(main('127.0.0.1', 0))
在版本 3.7 中添加。
- is_serving()¶
如果服务器正在接受新连接,则返回
True
。在版本 3.7 中添加。
- sockets¶
套接字类对象的列表,
asyncio.trsock.TransportSocket
,服务器正在监听这些对象。Changed in version 3.7: 在 Python 3.7 之前,
Server.sockets
用于直接返回服务器套接字的内部列表。在 3.7 中,返回该列表的副本。
事件循环实现¶
asyncio 附带了两种不同的事件循环实现:SelectorEventLoop
和 ProactorEventLoop
。
默认情况下,asyncio 配置为在 Unix 上使用 SelectorEventLoop
,在 Windows 上使用 ProactorEventLoop
。
- class asyncio.SelectorEventLoop¶
基于
selectors
模块的事件循环。使用为给定平台提供的最有效的 selector。也可以手动配置要使用的确切 selector 实现。
import asyncio import selectors class MyPolicy(asyncio.DefaultEventLoopPolicy): def new_event_loop(self): selector = selectors.SelectSelector() return asyncio.SelectorEventLoop(selector) asyncio.set_event_loop_policy(MyPolicy())
Availability: Unix, Windows.
- class asyncio.ProactorEventLoop¶
用于 Windows 的事件循环,使用“I/O 完成端口”(IOCP)。
Availability: Windows.
另请参见
- class asyncio.AbstractEventLoop¶
用于符合 asyncio 规范的事件循环的抽象基类。
Event Loop Methods 部分列出了替代
AbstractEventLoop
实现应该定义的所有方法。
示例¶
请注意,本节中的所有示例都故意展示了如何使用低级事件循环 API,例如 loop.run_forever()
和 loop.call_soon()
。现代 asyncio 应用程序很少需要以这种方式编写;考虑使用高级函数,例如 asyncio.run()
。
使用 call_soon() 的 Hello World¶
使用 loop.call_soon()
方法来安排回调的示例。回调显示 "Hello World"
,然后停止事件循环。
import asyncio
def hello_world(loop):
"""A callback to print 'Hello World' and stop the event loop"""
print('Hello World')
loop.stop()
loop = asyncio.new_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
另请参见
使用协程和 run()
函数创建的类似 Hello World 示例。
使用 call_later() 显示当前日期¶
一个回调示例,每秒显示一次当前日期。回调使用 loop.call_later()
方法在 5 秒后重新安排自身,然后停止事件循环。
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
loop.call_later(1, display_date, end_time, loop)
else:
loop.stop()
loop = asyncio.new_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
监视文件描述符以获取读取事件¶
使用 loop.add_reader()
方法等待文件描述符接收一些数据,然后关闭事件循环。
import asyncio
from socket import socketpair
# Create a pair of connected file descriptors
rsock, wsock = socketpair()
loop = asyncio.new_event_loop()
def reader():
data = rsock.recv(100)
print("Received:", data.decode())
# We are done: unregister the file descriptor
loop.remove_reader(rsock)
# Stop the event loop
loop.stop()
# Register the file descriptor for read event
loop.add_reader(rsock, reader)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
try:
# Run the event loop
loop.run_forever()
finally:
# We are done. Close sockets and the event loop.
rsock.close()
wsock.close()
loop.close()
另请参见
一个类似的 示例 使用传输、协议和
loop.create_connection()
方法。另一个类似的 示例 使用高级
asyncio.open_connection()
函数和流。
为 SIGINT 和 SIGTERM 设置信号处理程序¶
(此 signals
示例仅在 Unix 上有效。)
使用 loop.add_signal_handler()
方法为信号 SIGINT
和 SIGTERM
注册处理程序。
import asyncio
import functools
import os
import signal
def ask_exit(signame, loop):
print("got signal %s: exit" % signame)
loop.stop()
async def main():
loop = asyncio.get_running_loop()
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
functools.partial(ask_exit, signame, loop))
await asyncio.sleep(3600)
print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")
asyncio.run(main())