事件循环¶
源代码: Lib/asyncio/events.py, Lib/asyncio/base_events.py
前言
事件循环是每个 asyncio 应用程序的核心。事件循环运行异步任务和回调,执行网络 IO 操作,并运行子进程。
应用程序开发人员通常应该使用高级 asyncio 函数,例如 asyncio.run(),很少需要引用循环对象或调用其方法。本节主要面向需要更精细控制事件循环行为的低级代码、库和框架的作者。
获取事件循环
以下低级函数可用于获取、设置或创建事件循环
- asyncio.get_running_loop()¶
返回当前 OS 线程中正在运行的事件循环。
如果没有正在运行的事件循环,则引发
RuntimeError。此函数只能从协程或回调中调用。
在 3.7 版本加入。
- asyncio.get_event_loop()¶
获取当前事件循环。
当从协程或回调(例如使用 call_soon 或类似 API 调度)调用时,此函数将始终返回正在运行的事件循环。
如果没有设置正在运行的事件循环,该函数将返回
get_event_loop_policy().get_event_loop()调用的结果。由于此函数行为复杂(尤其是在使用自定义事件循环策略时),在协程和回调中,首选使用
get_running_loop()函数而非get_event_loop()。如上所述,考虑使用高级
asyncio.run()函数,而不是使用这些低级函数手动创建和关闭事件循环。3.14 版本变更: 如果没有当前事件循环,则引发
RuntimeError。备注
asyncio策略系统已弃用,并将在 Python 3.16 中移除;从那时起,如果存在当前正在运行的事件循环,此函数将返回该循环,否则将返回由set_event_loop()设置的循环。
- asyncio.set_event_loop(loop)¶
将 loop 设置为当前 OS 线程的当前事件循环。
- asyncio.new_event_loop()¶
创建并返回一个新的事件循环对象。
请注意,get_event_loop()、set_event_loop() 和 new_event_loop() 函数的行为可以通过设置自定义事件循环策略来改变。
目录
本页文档包含以下部分
事件循环方法 部分是事件循环 API 的参考文档;
回调句柄 部分记录了
Handle和TimerHandle实例,它们由loop.call_soon()和loop.call_later()等调度方法返回;服务器对象 部分记录了从
loop.create_server()等事件循环方法返回的类型;事件循环实现 部分记录了
SelectorEventLoop和ProactorEventLoop类;示例 部分展示了如何使用一些事件循环 API。
事件循环方法¶
事件循环具有以下低级 API
运行和停止循环¶
- loop.run_until_complete(future)¶
运行直到 future (
Future的一个实例)完成。如果参数是协程对象,它会被隐式调度为
asyncio.Task运行。返回 Future 的结果或引发其异常。
- loop.run_forever()¶
运行事件循环,直到调用
stop()。如果
stop()在run_forever()之前被调用,循环将以零超时轮询 I/O 选择器一次,运行所有响应 I/O 事件调度的回调(以及那些已经调度的回调),然后退出。如果在
run_forever()正在运行时调用stop(),循环将运行当前批次的回调然后退出。请注意,在这种情况下,由回调调度的新回调不会运行;相反,它们将在下次调用run_forever()或run_until_complete()时运行。
- loop.stop()¶
停止事件循环。
- loop.is_running()¶
如果事件循环当前正在运行,则返回
True。
- loop.is_closed()¶
如果事件循环已关闭,则返回
True。
- loop.close()¶
关闭事件循环。
调用此函数时,循环不得处于运行状态。所有待处理的回调都将被丢弃。
此方法会清空所有队列并关闭执行器,但不会等待执行器完成。
此方法是幂等且不可逆的。事件循环关闭后,不应调用其他方法。
- async loop.shutdown_asyncgens()¶
调度所有当前打开的异步生成器对象通过
aclose()调用关闭。调用此方法后,如果迭代新的异步生成器,事件循环将发出警告。这应该用于可靠地完成所有已调度的异步生成器。请注意,使用
asyncio.run()时无需调用此函数。示例
try: loop.run_forever() finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
在 3.6 版本加入。
- async loop.shutdown_default_executor(timeout=None)¶
调度默认执行器的关闭,并等待它加入
ThreadPoolExecutor中的所有线程。一旦调用此方法,使用loop.run_in_executor()使用默认执行器将引发RuntimeError。timeout 参数指定执行器完成加入所允许的时间(以
float秒为单位)。如果为默认值None,则执行器被允许无限时间。如果达到 timeout,则会发出
RuntimeWarning,并且默认执行器会终止,而不等待其线程完成加入。备注
使用
asyncio.run()时不要调用此方法,因为后者会自动处理默认执行器关闭。在 3.9 版本中新增。
3.12 版本变更: 添加了 timeout 参数。
调度回调¶
- loop.call_soon(callback, *args, context=None)¶
调度 callback 回调 在事件循环的下一次迭代中以 args 参数被调用。
返回
asyncio.Handle的实例,该实例以后可用于取消回调。回调按照注册顺序调用。每个回调都将只被调用一次。
可选的仅限关键字 context 参数指定 callback 运行的自定义
contextvars.Context。如果未提供 context,回调将使用当前上下文。与
call_soon_threadsafe()不同,此方法不是线程安全的。
- loop.call_soon_threadsafe(callback, *args, context=None)¶
线程安全的
call_soon()变体。当从另一个线程调度回调时,必须 使用此函数,因为call_soon()不是线程安全的。此函数可以安全地从可重入上下文或信号处理程序中调用,但是,在此类上下文中使用返回的句柄是不安全或无益的。
如果在已关闭的循环上调用,则引发
RuntimeError。这可能发生在主应用程序关闭时的一个辅助线程上。请参阅文档的并发和多线程部分。
3.7 版本变更: 添加了 context 仅限关键字参数。有关详细信息,请参阅 PEP 567。
备注
大多数 asyncio 调度函数不允许传递关键字参数。要做到这一点,请使用 functools.partial()
# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
使用部分对象通常比使用 lambda 更方便,因为 asyncio 可以在调试和错误消息中更好地呈现部分对象。
调度延迟回调¶
事件循环提供了调度回调函数在未来某个时间点被调用的机制。事件循环使用单调时钟来跟踪时间。
- loop.call_later(delay, callback, *args, context=None)¶
调度 callback 在给定的 delay 秒数(可以是整数或浮点数)后被调用。
返回
asyncio.TimerHandle的实例,可用于取消回调。callback 将只被调用一次。如果两个回调被调度在完全相同的时间,则它们的调用顺序是未定义的。
可选的位置参数 args 将在回调被调用时传递给它。如果你想让回调使用关键字参数调用,请使用
functools.partial()。可选的仅限关键字 context 参数允许为 callback 运行指定自定义
contextvars.Context。如果未提供 context,则使用当前上下文。备注
为了性能,使用
loop.call_later()调度的回调可能会提前最多一个时钟分辨率运行(请参阅time.get_clock_info('monotonic').resolution)。3.7 版本变更: 添加了 context 仅限关键字参数。有关详细信息,请参阅 PEP 567。
3.8 版本变更: 在 Python 3.7 及更早版本中使用默认事件循环实现时,delay 不能超过一天。这已在 Python 3.8 中修复。
- loop.call_at(when, callback, *args, context=None)¶
调度 callback 在给定绝对时间戳 when (整数或浮点数)时调用,使用与
loop.time()相同的时间参考。此方法的行为与
call_later()相同。返回
asyncio.TimerHandle的实例,可用于取消回调。备注
为了性能,使用
loop.call_at()调度的回调可能会提前最多一个时钟分辨率运行(请参阅time.get_clock_info('monotonic').resolution)。3.7 版本变更: 添加了 context 仅限关键字参数。有关详细信息,请参阅 PEP 567。
3.8 版本变更: 在 Python 3.7 及更早版本中使用默认事件循环实现时,when 和当前时间之间的差异不能超过一天。这已在 Python 3.8 中修复。
备注
3.8 版本变更: 在 Python 3.7 及更早版本中,超时(相对 delay 或绝对 when)不应超过一天。这已在 Python 3.8 中修复。
参见
asyncio.sleep() 函数。
创建 Futures 和 Tasks¶
- loop.create_future()¶
创建附加到事件循环的
asyncio.Future对象。这是在 asyncio 中创建 Futures 的首选方式。这允许第三方事件循环提供 Future 对象的替代实现(具有更好的性能或检测)。
3.5.2 版本新增。
- loop.create_task(coro, *, name=None, context=None, eager_start=None, **kwargs)¶
-
第三方事件循环可以使用自己的
Task子类进行互操作。在这种情况下,结果类型是Task的子类。完整的函数签名与
Task构造函数(或工厂)的签名大致相同——此函数的所有关键字参数都通过该接口传递。如果提供了 name 参数且不为
None,则使用Task.set_name()将其设置为任务的名称。可选的仅限关键字 context 参数允许为 coro 运行指定自定义
contextvars.Context。如果未提供 context,则创建当前上下文的副本。可选的仅限关键字 eager_start 参数允许指定任务是在调用 create_task 时急切执行,还是稍后调度。如果未传递 eager_start,则将使用
loop.set_task_factory()设置的模式。3.8 版本变更: 添加了 name 参数。
3.11 版本变更: 添加了 context 参数。
3.13.3 版本变更: 添加了
kwargs,它传递任意额外的参数,包括name和context。3.13.4 版本变更: 回滚了传递 name 和 context(如果为 None)的更改,同时仍然传递其他任意关键字参数(以避免破坏与 3.13.3 的向后兼容性)。
3.14 版本变更: 所有 kwargs 现在都已传递。eager_start 参数适用于急切任务工厂。
- loop.set_task_factory(factory)¶
设置一个任务工厂,该工厂将由
loop.create_task()使用。如果 factory 是
None,则将设置默认任务工厂。否则,factory 必须是可调用对象,其签名匹配(loop, coro, **kwargs),其中 loop 是活动事件循环的引用,而 coro 是协程对象。可调用对象必须传递所有 kwargs,并返回一个asyncio.Task兼容对象。3.13.3 版本变更: 要求将所有 kwargs 传递给
asyncio.Task。3.13.4 版本变更: name 不再传递给任务工厂。如果 context 为
None,则不再传递给任务工厂。3.14 版本变更: name 和 context 现在再次无条件地传递给任务工厂。
- loop.get_task_factory()¶
返回任务工厂,如果正在使用默认工厂,则返回
None。
打开网络连接¶
- async loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None, all_errors=False)¶
打开到由 host 和 port 指定的给定地址的流传输连接。
套接字家族可以是
AF_INET或AF_INET6,具体取决于 host(或 family 参数,如果提供)。套接字类型将是
SOCK_STREAM。protocol_factory 必须是一个可调用对象,返回一个 asyncio 协议 实现。
此方法将在后台尝试建立连接。成功时,它返回一个
(transport, protocol)对。底层操作的时间顺序概述如下
连接建立并为其创建传输。
protocol_factory 在没有参数的情况下被调用,并期望返回一个 协议 实例。
通过调用其
connection_made()方法,协议实例与传输耦合。成功时返回一个
(transport, protocol)元组。
创建的传输是依赖于实现的双向流。
其他参数
ssl:如果给定且不为 false,则创建 SSL/TLS 传输(默认创建纯 TCP 传输)。如果 ssl 是
ssl.SSLContext对象,则此上下文用于创建传输;如果 ssl 是True,则使用从ssl.create_default_context()返回的默认上下文。server_hostname 设置或覆盖目标服务器证书将与之匹配的主机名。仅当 ssl 不为
None时才应传递。默认情况下使用 host 参数的值。如果 host 为空,则没有默认值,并且您必须为 server_hostname 传递一个值。如果 server_hostname 是一个空字符串,则禁用主机名匹配(这是一个严重的安全风险,可能导致中间人攻击)。family、proto、flags 是可选的地址家族、协议和标志,用于通过 getaddrinfo() 进行 host 解析。如果给定,这些都应该来自相应的
socket模块常量的整数。happy_eyeballs_delay,如果给定,则为此连接启用 Happy Eyeballs。它应该是一个浮点数,表示等待连接尝试完成的时间(以秒为单位),然后并行启动下一次尝试。这是 RFC 8305 中定义的“连接尝试延迟”。RFC 推荐的合理默认值是
0.25(250 毫秒)。interleave 控制当主机名解析为多个 IP 地址时地址的重新排序。如果为
0或未指定,则不进行重新排序,并按getaddrinfo()返回的顺序尝试地址。如果指定正整数,则地址按地址族交错,并且给定整数被解释为 RFC 8305 中定义的“第一地址族计数”。如果未指定 happy_eyeballs_delay,则默认值为0;如果指定,则默认值为1。sock,如果给定,应该是一个现有的、已连接的
socket.socket对象,用于传输。如果给定 sock,则不应指定 host、port、family、proto、flags、happy_eyeballs_delay、interleave 和 local_addr。备注
sock 参数将套接字的所有权转移给创建的传输。要关闭套接字,请调用传输的
close()方法。local_addr,如果给定,是一个
(local_host, local_port)元组,用于在本地绑定套接字。local_host 和 local_port 使用getaddrinfo()查找,类似于 host 和 port。ssl_handshake_timeout 是(对于 TLS 连接)等待 TLS 握手完成的时间(以秒为单位),在此之前将中止连接。
60.0秒(如果为None(默认))。ssl_shutdown_timeout 是等待 SSL 关闭完成的时间(以秒为单位),在此之前将中止连接。
30.0秒(如果为None(默认))。all_errors 决定在无法创建连接时引发哪些异常。默认情况下,只引发一个
Exception:如果只有一个错误或所有错误消息相同,则为第一个异常;或者,如果错误消息合并,则为单个OSError。当all_errors为True时,将引发一个包含所有异常(即使只有一个)的ExceptionGroup。
3.5 版本变更: 在
ProactorEventLoop中添加了对 SSL/TLS 的支持。3.6 版本变更: 默认情况下,所有 TCP 连接都设置了套接字选项 socket.TCP_NODELAY。
3.7 版本变更: 添加了 ssl_handshake_timeout 参数。
3.8 版本变更: 添加了 happy_eyeballs_delay 和 interleave 参数。
Happy Eyeballs 算法:双栈主机的成功。当服务器的 IPv4 路径和协议工作,但服务器的 IPv6 路径和协议不工作时,双栈客户端应用程序会经历显著的连接延迟,与仅 IPv4 客户端相比。这是不可取的,因为它会导致双栈客户端的用户体验更差。本文档指定了减少这种用户可见延迟的算法要求,并提供了一种算法。
3.11 版本变更: 添加了 ssl_shutdown_timeout 参数。
3.12 版本变更: 添加了 all_errors。
参见
open_connection()函数是一个高级替代 API。它返回一对 (StreamReader,StreamWriter),可以直接在 async/await 代码中使用。
- async loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_port=None, allow_broadcast=None, sock=None)¶
创建一个数据报连接。
套接字家族可以是
AF_INET、AF_INET6或AF_UNIX,具体取决于 host(或 family 参数,如果提供)。套接字类型将是
SOCK_DGRAM。protocol_factory 必须是一个可调用对象,返回一个 协议 实现。
成功时返回一个
(transport, protocol)元组。其他参数
local_addr,如果给定,是一个
(local_host, local_port)元组,用于在本地绑定套接字。local_host 和 local_port 使用getaddrinfo()查找。备注
在 Windows 上,当使用带有
local_addr=None的 proactor 事件循环时,运行它将引发带有errno.WSAEINVAL的OSError。remote_addr,如果给定,是一个
(remote_host, remote_port)元组,用于将套接字连接到远程地址。remote_host 和 remote_port 使用getaddrinfo()查找。family、proto、flags 是可选的地址家族、协议和标志,用于通过
getaddrinfo()进行 host 解析。如果给定,这些都应该来自相应的socket模块常量的整数。reuse_port 告诉内核允许此端点绑定到与其他现有端点相同的端口,只要它们在创建时都设置了此标志。此选项在 Windows 和某些 Unixes 上不受支持。如果未定义 socket.SO_REUSEPORT 常量,则不支持此功能。
allow_broadcast 告诉内核允许此端点向广播地址发送消息。
可以可选地指定 sock 以使用一个预先存在的、已连接的
socket.socket对象供传输使用。如果指定,则应省略 local_addr 和 remote_addr(必须为None)。备注
sock 参数将套接字的所有权转移给创建的传输。要关闭套接字,请调用传输的
close()方法。
请参阅UDP echo 客户端协议和UDP echo 服务器协议示例。
3.4.4 版本变更: 添加了 family、proto、flags、reuse_address、reuse_port、allow_broadcast 和 sock 参数。
3.8 版本变更: 添加了对 Windows 的支持。
3.8.1 版本变更: reuse_address 参数不再受支持,因为使用 socket.SO_REUSEADDR 对 UDP 构成严重的安全隐患。显式传递
reuse_address=True将引发异常。当多个具有不同 UID 的进程使用
SO_REUSEADDR将套接字分配给相同的 UDP 套接字地址时,传入的数据包可能会随机分布在套接字之间。对于受支持的平台,reuse_port 可用作类似功能的替代。使用 reuse_port,将使用 socket.SO_REUSEPORT,它专门防止具有不同 UID 的进程将套接字分配给相同的套接字地址。
3.11 版本变更: reuse_address 参数,自 Python 3.8.1、3.7.6 和 3.6.10 以来已禁用,现已完全移除。
- async loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
创建 Unix 连接。
套接字家族将是
AF_UNIX;套接字类型将是SOCK_STREAM。成功时返回一个
(transport, protocol)元组。path 是 Unix 域套接字的名称,除非指定了 sock 参数,否则为必需。支持抽象 Unix 套接字、
str、bytes和Path路径。有关此方法参数的信息,请参阅
loop.create_connection()方法的文档。可用性: Unix。
3.7 版本变更: 添加了 ssl_handshake_timeout 参数。path 参数现在可以是类路径对象。
3.11 版本变更: 添加了 ssl_shutdown_timeout 参数。
创建网络服务器¶
- async loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
创建一个 TCP 服务器(套接字类型
SOCK_STREAM),监听 host 地址的 port 端口。返回
Server对象。参数
protocol_factory 必须是一个可调用对象,返回一个 协议 实现。
host 参数可以设置为多种类型,这些类型决定了服务器将在何处监听
如果 host 是字符串,TCP 服务器将绑定到由 host 指定的单个网络接口。
如果 host 是字符串序列,TCP 服务器将绑定到序列中指定的所有网络接口。
如果 host 是空字符串或
None,则假定所有接口,并将返回多个套接字的列表(很可能一个用于 IPv4,另一个用于 IPv6)。
port 参数可以设置为指定服务器应监听的端口。如果为
0或None(默认值),将选择一个随机未使用的端口(请注意,如果 host 解析为多个网络接口,则每个接口将选择不同的随机端口)。family 可以设置为
socket.AF_INET或AF_INET6以强制套接字使用 IPv4 或 IPv6。如果未设置,family 将从主机名确定(默认为AF_UNSPEC)。flags 是
getaddrinfo()的位掩码。可以可选地指定 sock 以使用预先存在的套接字对象。如果指定,则不得指定 host 和 port。
备注
sock 参数将套接字的所有权转移给创建的服务器。要关闭套接字,请调用服务器的
close()方法。backlog 是传递给
listen()的最大排队连接数(默认为 100)。ssl 可以设置为
SSLContext实例,以在接受的连接上启用 TLS。reuse_address 告诉内核重用处于
TIME_WAIT状态的本地套接字,而无需等待其自然超时过期。如果未指定,则在 Unix 上会自动设置为True。reuse_port 告诉内核允许此端点绑定到与其他现有端点相同的端口,只要它们在创建时都设置了此标志。此选项在 Windows 上不受支持。
将 keep_alive 设置为
True可通过启用消息的定期传输来保持连接处于活动状态。
3.13 版本变更: 添加了 keep_alive 参数。
ssl_handshake_timeout 是(对于 TLS 服务器)等待 TLS 握手完成的时间(以秒为单位),在此之前将中止连接。
60.0秒(如果为None(默认))。ssl_shutdown_timeout 是等待 SSL 关闭完成的时间(以秒为单位),在此之前将中止连接。
30.0秒(如果为None(默认))。start_serving 设置为
True(默认值)会导致创建的服务器立即开始接受连接。当设置为False时,用户应等待Server.start_serving()或Server.serve_forever(),以使服务器开始接受连接。
3.5 版本变更: 在
ProactorEventLoop中添加了对 SSL/TLS 的支持。3.5.1 版本变更: host 参数可以是一个字符串序列。
3.6 版本变更: 添加了 ssl_handshake_timeout 和 start_serving 参数。默认情况下,所有 TCP 连接都设置了套接字选项 socket.TCP_NODELAY。
3.11 版本变更: 添加了 ssl_shutdown_timeout 参数。
参见
start_server()函数是一个更高级的替代 API,它返回一对StreamReader和StreamWriter,可以在 async/await 代码中使用。
- async loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)¶
类似于
loop.create_server(),但适用于AF_UNIX套接字家族。path 是 Unix 域套接字的名称,除非提供了 sock 参数,否则为必需。支持抽象 Unix 套接字、
str、bytes和Path路径。如果 cleanup_socket 为真,则当服务器关闭时,Unix 套接字将自动从文件系统中移除,除非在服务器创建后套接字已被替换。
有关此方法参数的信息,请参阅
loop.create_server()方法的文档。可用性: Unix。
3.7 版本变更: 添加了 ssl_handshake_timeout 和 start_serving 参数。path 参数现在可以是
Path对象。3.11 版本变更: 添加了 ssl_shutdown_timeout 参数。
3.13 版本变更: 添加了 cleanup_socket 参数。
- async loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
将已接受的连接包装成传输/协议对。
此方法可用于在 asyncio 外部接受连接但使用 asyncio 处理它们的服务器。
参数
protocol_factory 必须是一个可调用对象,返回一个 协议 实现。
sock 是从
socket.accept返回的现有套接字对象。备注
sock 参数将套接字的所有权转移给创建的传输。要关闭套接字,请调用传输的
close()方法。ssl 可以设置为
SSLContext以在接受的连接上启用 SSL。ssl_handshake_timeout 是(对于 SSL 连接)等待 SSL 握手完成的时间(以秒为单位),在此之前将中止连接。
60.0秒(如果为None(默认))。ssl_shutdown_timeout 是等待 SSL 关闭完成的时间(以秒为单位),在此之前将中止连接。
30.0秒(如果为None(默认))。
返回一个
(transport, protocol)对。3.5.3 版本中添加。
3.7 版本变更: 添加了 ssl_handshake_timeout 参数。
3.11 版本变更: 添加了 ssl_shutdown_timeout 参数。
传输文件¶
- async loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)¶
通过 transport 发送 file。返回发送的总字节数。
如果可用,此方法使用高性能
os.sendfile()。file 必须是已二进制模式打开的常规文件对象。
offset 指明从文件何处开始读取。如果指定了 count,则表示要传输的总字节数,而不是发送文件直到 EOF。文件位置始终会更新,即使此方法引发错误,并且可以使用
file.tell()获取实际发送的字节数。将 fallback 设置为
True会使 asyncio 在平台不支持 sendfile 系统调用时(例如 Windows 或 Unix 上的 SSL 套接字)手动读取和发送文件。如果系统不支持 sendfile 系统调用且 fallback 为
False,则引发SendfileNotAvailableError。在 3.7 版本加入。
TLS 升级¶
- async loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
将现有的基于传输的连接升级为 TLS。
创建 TLS 编码器/解码器实例,并将其插入到 transport 和 protocol 之间。编码器/解码器实现面向 transport 的协议和面向 protocol 的传输。
返回创建的双接口实例。在 await 之后,protocol 必须停止使用原始 transport,并且只与返回的对象通信,因为编码器会缓存 protocol 侧的数据,并偶尔与 transport 交换额外的 TLS 会话包。
在某些情况下(例如,当传入的传输已关闭时),这可能会返回
None。参数
通过
create_server()和create_connection()等方法返回的 transport 和 protocol 实例。sslcontext:一个已配置的
SSLContext实例。server_side:当服务器端连接正在升级时(例如由
create_server()创建的连接),传递True。server_hostname:设置或覆盖用于匹配目标服务器证书的主机名。
ssl_handshake_timeout 是(对于 TLS 连接)等待 TLS 握手完成的时间(以秒为单位),在此之前将中止连接。
60.0秒(如果为None(默认))。ssl_shutdown_timeout 是等待 SSL 关闭完成的时间(以秒为单位),在此之前将中止连接。
30.0秒(如果为None(默认))。
在 3.7 版本加入。
3.11 版本变更: 添加了 ssl_shutdown_timeout 参数。
监控文件描述符¶
- loop.add_reader(fd, callback, *args)¶
开始监控 fd 文件描述符的读可用性,一旦 fd 可用于读取,就调用带有指定参数的 callback。
为 fd 注册的任何现有回调都将被取消并替换为 callback。
- loop.remove_reader(fd)¶
停止监控 fd 文件描述符的读可用性。如果 fd 之前正在被监控以进行读取,则返回
True。
- loop.add_writer(fd, callback, *args)¶
开始监控 fd 文件描述符的写可用性,一旦 fd 可用于写入,就调用带有指定参数的 callback。
为 fd 注册的任何现有回调都将被取消并替换为 callback。
使用
functools.partial()传递关键字参数 给 callback。
- loop.remove_writer(fd)¶
停止监控 fd 文件描述符的写可用性。如果 fd 之前正在被监控以进行写入,则返回
True。
有关这些方法的一些限制,请参阅 平台支持 部分。
直接使用套接字对象¶
通常,使用基于传输的 API(例如 loop.create_connection() 和 loop.create_server())的协议实现比直接使用套接字实现的协议更快。然而,在某些用例中,性能不是关键,直接使用 socket 对象更方便。
- async loop.sock_recv(sock, nbytes)¶
从 sock 接收最多 nbytes。
socket.recv()的异步版本。将接收到的数据作为字节对象返回。
sock 必须是非阻塞套接字。
在 3.7 版本中更改: 尽管此方法始终被记录为协程方法,但在 Python 3.7 之前的版本返回一个
Future。从 Python 3.7 开始,它是一个async def方法。
- async loop.sock_recv_into(sock, buf)¶
从 sock 接收数据到 buf 缓冲区。模仿阻塞的
socket.recv_into()方法。返回写入缓冲区的字节数。
sock 必须是非阻塞套接字。
在 3.7 版本加入。
- async loop.sock_recvfrom(sock, bufsize)¶
从 sock 接收最大 bufsize 的数据报。
socket.recvfrom()的异步版本。返回一个元组 (接收到的数据, 远程地址)。
sock 必须是非阻塞套接字。
在 3.11 版本中新增。
- async loop.sock_recvfrom_into(sock, buf, nbytes=0)¶
从 sock 接收最多 nbytes 的数据报到 buf 中。
socket.recvfrom_into()的异步版本。返回一个元组 (接收到的字节数, 远程地址)。
sock 必须是非阻塞套接字。
在 3.11 版本中新增。
- async loop.sock_sendall(sock, data)¶
将 data 发送至 sock 套接字。
socket.sendall()的异步版本。此方法会持续向套接字发送数据,直到 data 中的所有数据都已发送或发生错误。
None在成功时返回。发生错误时,会引发异常。此外,无法确定连接的接收端成功处理了多少数据(如果有的话)。sock 必须是非阻塞套接字。
在 3.7 版本中更改: 尽管该方法一直被记录为协程方法,但在 Python 3.7 之前它返回一个
Future。从 Python 3.7 开始,这是一个async def方法。
- async loop.sock_sendto(sock, data, address)¶
将数据报从 sock 发送至 address。
socket.sendto()的异步版本。返回发送的字节数。
sock 必须是非阻塞套接字。
在 3.11 版本中新增。
- async loop.sock_connect(sock, address)¶
将 sock 连接到 address 处的远程套接字。
socket.connect()的异步版本。sock 必须是非阻塞套接字。
在 3.5.2 版本中更改:
address不再需要解析。sock_connect将尝试通过调用socket.inet_pton()来检查 address 是否已解析。如果未解析,将使用loop.getaddrinfo()来解析 address。
- async loop.sock_accept(sock)¶
接受连接。模仿阻塞的
socket.accept()方法。套接字必须绑定到一个地址并监听连接。返回值是一个
(conn, address)对,其中 conn 是一个可用于在该连接上发送和接收数据的 新 套接字对象,而 address 是连接另一端绑定到套接字的地址。sock 必须是非阻塞套接字。
在 3.7 版本中更改: 尽管该方法一直被记录为协程方法,但在 Python 3.7 之前它返回一个
Future。从 Python 3.7 开始,这是一个async def方法。
- async loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)¶
如果可能,使用高性能
os.sendfile发送文件。返回发送的总字节数。socket.sendfile()的异步版本。sock 必须是一个非阻塞的
socket.SOCK_STREAMsocket。file 必须是以二进制模式打开的常规文件对象。
offset 指明从文件何处开始读取。如果指定了 count,则表示要传输的总字节数,而不是发送文件直到 EOF。文件位置始终会更新,即使此方法引发错误,并且可以使用
file.tell()获取实际发送的字节数。当 fallback 设置为
True时,如果平台不支持 sendfile 系统调用(例如 Windows 或 Unix 上的 SSL 套接字),asyncio 会手动读取和发送文件。如果系统不支持 sendfile 系统调用且 fallback 为
False,则引发SendfileNotAvailableError。sock 必须是非阻塞套接字。
在 3.7 版本加入。
DNS¶
- async loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)¶
socket.getaddrinfo()的异步版本。
- async loop.getnameinfo(sockaddr, flags=0)¶
socket.getnameinfo()的异步版本。
备注
getaddrinfo 和 getnameinfo 都通过循环的默认线程池执行器在内部使用其同步版本。当此执行器饱和时,这些方法可能会遇到延迟,这可能会被更高级的网络库报告为超时增加。为缓解此问题,请考虑为其他用户任务使用自定义执行器,或设置具有更多 worker 的默认执行器。
在 3.7 版本中更改: getaddrinfo 和 getnameinfo 方法一直被记录为返回一个协程,但在 Python 3.7 之前,它们实际上返回 asyncio.Future 对象。从 Python 3.7 开始,这两个方法都是协程。
使用管道¶
- async loop.connect_read_pipe(protocol_factory, pipe)¶
在事件循环中注册 pipe 的读端。
protocol_factory 必须是一个可调用对象,返回一个 asyncio 协议 实现。
pipe 是一个 文件类对象。
返回对
(transport, protocol),其中 transport 支持ReadTransport接口,protocol 是由 protocol_factory 实例化的对象。对于
SelectorEventLoop事件循环,pipe 被设置为非阻塞模式。
- async loop.connect_write_pipe(protocol_factory, pipe)¶
在事件循环中注册 pipe 的写端。
protocol_factory 必须是一个可调用对象,返回一个 asyncio 协议 实现。
pipe 是 文件类对象。
返回对
(transport, protocol),其中 transport 支持WriteTransport接口,protocol 是由 protocol_factory 实例化的对象。对于
SelectorEventLoop事件循环,pipe 被设置为非阻塞模式。
备注
SelectorEventLoop 不支持 Windows 上的上述方法。在 Windows 上请改用 ProactorEventLoop。
Unix 信号¶
- loop.add_signal_handler(signum, callback, *args)¶
设置 callback 作为 signum 信号的处理程序。
回调将由 loop 调用,连同该事件循环的其他排队回调和可运行协程。与使用
signal.signal()注册的信号处理程序不同,使用此函数注册的回调允许与事件循环交互。如果信号编号无效或无法捕获,则引发
ValueError。如果设置处理程序时出现问题,则引发RuntimeError。使用
functools.partial()传递关键字参数 给 callback。与
signal.signal()一样,此函数必须在主线程中调用。
- loop.remove_signal_handler(sig)¶
移除 sig 信号的处理程序。
如果信号处理程序被移除,则返回
True;如果未为给定信号设置处理程序,则返回False。可用性: Unix。
参见
signal 模块。
在线程或进程池中执行代码¶
- awaitable loop.run_in_executor(executor, func, *args)¶
安排在指定的执行器中调用 func。
executor 参数应为一个
concurrent.futures.Executor实例。如果 executor 为None,则使用默认执行器。默认执行器可以通过loop.set_default_executor()设置,否则,如果需要,run_in_executor()会延迟初始化并使用concurrent.futures.ThreadPoolExecutor。示例
import asyncio import concurrent.futures def blocking_io(): # File operations (such as logging) can block the # event loop: run them in a thread pool. with open('/dev/urandom', 'rb') as f: return f.read(100) def cpu_bound(): # CPU-bound operations will block the event loop: # in general it is preferable to run them in a # process pool. return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() ## Options: # 1. Run in the default loop's executor: result = await loop.run_in_executor( None, blocking_io) print('default thread pool', result) # 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) # 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom process pool', result) # 4. Run in a custom interpreter pool: with concurrent.futures.InterpreterPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom interpreter pool', result) if __name__ == '__main__': asyncio.run(main())
请注意,由于
multiprocessing的特殊性(由ProcessPoolExecutor使用),选项 3 需要入口点保护 (if __name__ == '__main__')。请参阅 安全导入主模块。此方法返回一个
asyncio.Future对象。使用
functools.partial()传递关键字参数 给 func。在 3.5.3 版本中更改:
loop.run_in_executor()不再配置其创建的线程池执行器的max_workers,而是将其留给线程池执行器 (ThreadPoolExecutor) 来设置默认值。
- loop.set_default_executor(executor)¶
将 executor 设置为
run_in_executor()使用的默认执行器。 executor 必须是ThreadPoolExecutor的实例,其中包括InterpreterPoolExecutor。在 3.11 版本中更改: executor 必须是
ThreadPoolExecutor的实例。
错误处理 API¶
允许自定义事件循环中异常的处理方式。
- loop.set_exception_handler(handler)¶
将 handler 设置为新的事件循环异常处理程序。
如果 handler 为
None,则将设置默认异常处理程序。否则,handler 必须是一个可调用对象,其签名匹配(loop, context),其中loop是对活动事件循环的引用,context是一个包含异常详细信息的dict对象(有关上下文的详细信息,请参阅call_exception_handler()文档)。如果处理程序是代表
Task或Handle调用的,它将在该任务或回调句柄的contextvars.Context中运行。在 3.12 版本中更改: 处理程序可以在异常源自的任务或句柄的
Context中调用。
- loop.get_exception_handler()¶
返回当前的异常处理程序,如果没有设置自定义异常处理程序,则返回
None。3.5.2 版本新增。
- loop.default_exception_handler(context)¶
默认异常处理程序。
当发生异常且未设置异常处理程序时,会调用此方法。自定义异常处理程序可以调用此方法,以推迟到默认处理程序行为。
context 参数与
call_exception_handler()中的含义相同。
- loop.call_exception_handler(context)¶
调用当前事件循环异常处理程序。
context 是一个
dict对象,包含以下键(未来 Python 版本可能会引入新键)'message': 错误消息;
'exception' (可选): 异常对象;
'future' (可选):
asyncio.Future实例;'task' (可选):
asyncio.Task实例;'handle' (可选):
asyncio.Handle实例;'protocol' (可选): 协议 实例;
'transport' (可选): 传输 实例;
'socket' (可选):
socket.socket实例;'source_traceback' (可选): 源代码的追溯信息;
'handle_traceback' (可选): 句柄的追溯信息;
- 'asyncgen' (可选): 导致异常的异步生成器。
异常。
备注
此方法不应在子类化的事件循环中重载。对于自定义异常处理,请使用
set_exception_handler()方法。
启用调试模式¶
- loop.get_debug()¶
获取事件循环的调试模式 (
bool)。如果环境变量
PYTHONASYNCIODEBUG设置为非空字符串,则默认值为True,否则为False。
- loop.set_debug(enabled: bool)¶
设置事件循环的调试模式。
在 3.7 版本中更改: 新的 Python 开发模式 现在也可以用于启用调试模式。
- loop.slow_callback_duration¶
此属性可用于设置被视为“慢”的最小执行持续时间(以秒为单位)。当启用调试模式时,将记录“慢”回调。
默认值为 100 毫秒。
参见
运行子进程¶
本小节中描述的方法是低级的。在常规的 async/await 代码中,请考虑改用高级的 asyncio.create_subprocess_shell() 和 asyncio.create_subprocess_exec() 便利函数。
备注
在 Windows 上,默认事件循环 ProactorEventLoop 支持子进程,而 SelectorEventLoop 不支持。详情请参阅 Windows 上的子进程支持。
- async loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)¶
从 args 指定的一个或多个字符串参数创建子进程。
args 必须是一个由
第一个字符串指定程序可执行文件,其余字符串指定参数。字符串参数共同构成程序的
argv。这类似于标准库
subprocess.Popen类在调用时将shell=False且字符串列表作为第一个参数传入;然而,Popen接受一个字符串列表作为单个参数,而 subprocess_exec 接受多个字符串参数。protocol_factory 必须是一个可调用对象,返回
asyncio.SubprocessProtocol类的子类。其他参数
stdin 可以是以下任意一种
文件类对象
现有文件描述符(正整数),例如使用
os.pipe()创建的那些subprocess.PIPE常量(默认值),它将创建一个新管道并连接它,值
None,它将使子进程继承此进程的文件描述符subprocess.DEVNULL常量,表示将使用特殊的os.devnull文件
stdout 可以是以下任意一种
文件类对象
subprocess.PIPE常量(默认值),它将创建一个新管道并连接它,值
None,它将使子进程继承此进程的文件描述符subprocess.DEVNULL常量,表示将使用特殊的os.devnull文件
stderr 可以是以下任意一种
文件类对象
subprocess.PIPE常量(默认值),它将创建一个新管道并连接它,值
None,它将使子进程继承此进程的文件描述符subprocess.DEVNULL常量,表示将使用特殊的os.devnull文件subprocess.STDOUT常量,它将标准错误流连接到进程的标准输出流
所有其他关键字参数都未经解释地传递给
subprocess.Popen,除了 bufsize、universal_newlines、shell、text、encoding 和 errors,这些参数根本不应指定。asyncio子进程 API 不支持将流解码为文本。可以使用bytes.decode()将从流返回的字节转换为文本。
如果作为 stdin、stdout 或 stderr 传递的文件类对象表示管道,则此管道的另一端应使用
connect_write_pipe()或connect_read_pipe()注册以用于事件循环。有关其他参数的文档,请参阅
subprocess.Popen类的构造函数。返回一对
(transport, protocol),其中 transport 符合asyncio.SubprocessTransport基类,protocol 是由 protocol_factory 实例化的对象。
- async loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)¶
从 cmd 创建一个子进程,cmd 可以是
str或bytes字符串,编码为 文件系统编码,使用平台的“shell”语法。这类似于标准库
subprocess.Popen类在调用时将shell=True。protocol_factory 必须是一个可调用对象,返回
SubprocessProtocol类的子类。有关其余参数的更多详细信息,请参阅
subprocess_exec()。返回一对
(transport, protocol),其中 transport 符合SubprocessTransport基类,protocol 是由 protocol_factory 实例化的对象。
备注
应用程序有责任确保所有空格和特殊字符都经过适当引用,以避免 shell 注入漏洞。可以使用 shlex.quote() 函数正确转义用于构造 shell 命令的字符串中的空格和特殊字符。
回调句柄¶
- class asyncio.Handle¶
由
loop.call_soon()、loop.call_soon_threadsafe()返回的回调包装器对象。- get_context()¶
返回与句柄关联的
contextvars.Context对象。3.12 新版功能.
- cancel()¶
取消回调。如果回调已取消或已执行,此方法无效。
- cancelled()¶
如果回调已取消,则返回
True。在 3.7 版本加入。
- class asyncio.TimerHandle¶
由
loop.call_later()和loop.call_at()返回的回调包装器对象。此类是
Handle的子类。- when()¶
返回以
float秒表示的计划回调时间。该时间是一个绝对时间戳,使用与
loop.time()相同的时间参考。在 3.7 版本加入。
服务器对象¶
服务器对象由 loop.create_server()、loop.create_unix_server()、start_server() 和 start_unix_server() 函数创建。
请勿直接实例化 Server 类。
- class asyncio.Server¶
Server 对象是异步上下文管理器。当在
async with语句中使用时,可以保证在async with语句完成后,Server 对象已关闭并且不接受新连接。srv = await loop.create_server(...) async with srv: # some code # At this point, srv is closed and no longer accepts new connections.
在 3.7 版本中更改: 从 Python 3.7 开始,Server 对象是异步上下文管理器。
在 3.11 版本中更改: 此类别在 Python 3.9.11、3.10.3 和 3.11 中作为
asyncio.Server公开。- close()¶
停止服务:关闭监听套接字并将
sockets属性设置为None。代表现有传入客户端连接的套接字保持打开状态。
服务器异步关闭;使用
wait_closed()协程等待服务器关闭(且没有更多活动连接)。
- close_clients()¶
关闭所有现有的传入客户端连接。
对所有关联的传输调用
close()。在关闭服务器时,应在
close_clients()之前调用close(),以避免与新客户端连接发生竞态。在 3.13 版本加入。
- abort_clients()¶
立即关闭所有现有传入客户端连接,而不等待未决操作完成。
对所有关联的传输调用
abort()。在关闭服务器时,应在
abort_clients()之前调用close(),以避免与新客户端连接发生竞态。在 3.13 版本加入。
- get_loop()¶
返回与服务器对象关联的事件循环。
在 3.7 版本加入。
- async start_serving()¶
开始接受连接。
此方法是幂等的,因此可以在服务器已运行时调用。
loop.create_server()和asyncio.start_server()的 start_serving 仅关键字参数允许创建最初不接受连接的 Server 对象。在这种情况下,可以使用Server.start_serving()或Server.serve_forever()使 Server 开始接受连接。在 3.7 版本加入。
- async serve_forever()¶
开始接受连接,直到协程被取消。
serve_forever任务的取消会导致服务器关闭。如果服务器已接受连接,则可以调用此方法。每个 Server 对象只能存在一个
serve_forever任务。示例
async def client_connected(reader, writer): # Communicate with the client with # reader/writer streams. For example: await reader.readline() async def main(host, port): srv = await asyncio.start_server( client_connected, host, port) await srv.serve_forever() asyncio.run(main('127.0.0.1', 0))
在 3.7 版本加入。
- is_serving()¶
如果服务器正在接受新连接,则返回
True。在 3.7 版本加入。
- sockets¶
服务器正在监听的类套接字对象列表,
asyncio.trsock.TransportSocket。在 3.7 版本中更改: 在 Python 3.7 之前,
Server.sockets直接返回服务器套接字的内部列表。在 3.7 中,返回该列表的副本。
事件循环实现¶
asyncio 提供了两种不同的事件循环实现:SelectorEventLoop 和 ProactorEventLoop。
默认情况下,asyncio 配置为使用 EventLoop。
- class asyncio.SelectorEventLoop¶
一个基于
selectors模块的AbstractEventLoop子类。使用给定平台可用的最有效的 选择器。也可以手动配置要使用的确切选择器实现。
import asyncio import selectors async def main(): ... loop_factory = lambda: asyncio.SelectorEventLoop(selectors.SelectSelector()) asyncio.run(main(), loop_factory=loop_factory)
可用性: Unix, Windows。
- class asyncio.ProactorEventLoop¶
一个针对 Windows 的
AbstractEventLoop子类,它使用“I/O 完成端口”(IOCP)。可用性: Windows。
- class asyncio.EventLoop¶
给定平台可用的最有效的
AbstractEventLoop子类的别名。在 Unix 上,它是
SelectorEventLoop的别名;在 Windows 上,它是ProactorEventLoop的别名。在 3.13 版本加入。
示例¶
请注意,本节中的所有示例都有意展示了如何使用低级事件循环 API,例如 loop.run_forever() 和 loop.call_soon()。现代 asyncio 应用程序很少需要以这种方式编写;考虑使用高级函数,例如 asyncio.run()。
使用 call_soon() 的 Hello World¶
一个使用 loop.call_soon() 方法安排回调的示例。回调显示 "Hello World",然后停止事件循环。
import asyncio
def hello_world(loop):
"""A callback to print 'Hello World' and stop the event loop"""
print('Hello World')
loop.stop()
loop = asyncio.new_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
参见
一个类似的 Hello World 示例,通过协程和 run() 函数创建。
使用 call_later() 显示当前日期¶
一个每秒显示当前日期的回调示例。该回调使用 loop.call_later() 方法在 5 秒后重新安排自身,然后停止事件循环。
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
loop.call_later(1, display_date, end_time, loop)
else:
loop.stop()
loop = asyncio.new_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
监视文件描述符的读取事件¶
使用 loop.add_reader() 方法等待文件描述符接收到数据,然后关闭事件循环。
import asyncio
from socket import socketpair
# Create a pair of connected file descriptors
rsock, wsock = socketpair()
loop = asyncio.new_event_loop()
def reader():
data = rsock.recv(100)
print("Received:", data.decode())
# We are done: unregister the file descriptor
loop.remove_reader(rsock)
# Stop the event loop
loop.stop()
# Register the file descriptor for read event
loop.add_reader(rsock, reader)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
try:
# Run the event loop
loop.run_forever()
finally:
# We are done. Close sockets and the event loop.
rsock.close()
wsock.close()
loop.close()
参见
一个类似的 示例,使用传输、协议和
loop.create_connection()方法。另一个类似的 示例,使用高级
asyncio.open_connection()函数和流。
为 SIGINT 和 SIGTERM 设置信号处理程序¶
(此 signals 示例仅在 Unix 上有效。)
使用 loop.add_signal_handler() 方法注册信号 SIGINT 和 SIGTERM 的处理程序。
import asyncio
import functools
import os
import signal
def ask_exit(signame, loop):
print("got signal %s: exit" % signame)
loop.stop()
async def main():
loop = asyncio.get_running_loop()
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
functools.partial(ask_exit, signame, loop))
await asyncio.sleep(3600)
print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")
asyncio.run(main())