传输和协议

前言

传输和协议由底层事件循环 API 使用,例如 loop.create_connection()。它们使用基于回调的编程风格,并能够实现高性能的网络或 IPC 协议(例如 HTTP)。

本质上,传输和协议应该只在库和框架中使用,而永远不应在高级 asyncio 应用程序中使用。

此文档页面涵盖了传输协议

简介

在最高级别,传输关注于字节如何传输,而协议确定传输哪些字节(以及在某种程度上何时传输)。

换句话说:传输是套接字(或类似的 I/O 端点)的抽象,而协议是从传输的角度来看应用程序的抽象。

另一种观点是,传输和协议接口共同定义了使用网络 I/O 和进程间 I/O 的抽象接口。

传输对象和协议对象之间始终存在 1:1 的关系:协议调用传输方法来发送数据,而传输调用协议方法来传递已接收的数据。

大多数面向连接的事件循环方法(例如 loop.create_connection())通常接受一个 *protocol_factory* 参数,用于为接受的连接创建一个 *Protocol* 对象,该连接由 *Transport* 对象表示。此类方法通常返回一个 (transport, protocol) 元组。

内容

此文档页面包含以下部分

传输

源代码: Lib/asyncio/transports.py


传输是 asyncio 提供的类,用于抽象各种类型的通信通道。

传输对象始终由 asyncio 事件循环 实例化。

asyncio 实现了 TCP、UDP、SSL 和子进程管道的传输。传输上可用的方法取决于传输的类型。

传输类不是线程安全的

传输层次结构

class asyncio.BaseTransport

所有传输的基类。包含所有 asyncio 传输共享的方法。

class asyncio.WriteTransport(BaseTransport)

用于只写连接的基本传输。

WriteTransport 类的实例是从 loop.connect_write_pipe() 事件循环方法返回的,并且也被子进程相关的方法(如 loop.subprocess_exec())使用。

class asyncio.ReadTransport(BaseTransport)

用于只读连接的基本传输。

ReadTransport 类的实例是从 loop.connect_read_pipe() 事件循环方法返回的,并且也被子进程相关的方法(如 loop.subprocess_exec())使用。

class asyncio.Transport(WriteTransport, ReadTransport)

表示双向传输的接口,例如 TCP 连接。

用户不直接实例化传输;他们调用一个实用程序函数,向其传递一个协议工厂和其他创建传输和协议所需的信息。

Transport 类的实例是从或被事件循环方法(如 loop.create_connection()loop.create_unix_connection()loop.create_server()loop.sendfile() 等)返回或使用。

class asyncio.DatagramTransport(BaseTransport)

用于数据报 (UDP) 连接的传输。

DatagramTransport 类的实例是从 loop.create_datagram_endpoint() 事件循环方法返回的。

class asyncio.SubprocessTransport(BaseTransport)

表示父进程与其子操作系统进程之间连接的抽象。

SubprocessTransport 类的实例通过事件循环方法 loop.subprocess_shell()loop.subprocess_exec() 返回。

基础传输

BaseTransport.close()

关闭传输。

如果传输具有用于传出数据的缓冲区,则缓冲数据将异步刷新。不会再接收到任何数据。在刷新所有缓冲数据后,将调用协议的 protocol.connection_lost() 方法,并使用 None 作为其参数。传输一旦关闭就不应再使用。

BaseTransport.is_closing()

如果传输正在关闭或已关闭,则返回 True

BaseTransport.get_extra_info(name, default=None)

返回有关传输或其使用的底层资源的信息。

name 是一个字符串,表示要获取的特定于传输的信息。

如果信息不可用,或者如果传输不支持使用给定的第三方事件循环实现或在当前平台上查询信息,则 default 是要返回的值。

例如,以下代码尝试获取传输的底层套接字对象

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

可以在某些传输上查询的信息类别

BaseTransport.set_protocol(protocol)

设置一个新协议。

仅当两个协议都明确支持切换时,才应切换协议。

BaseTransport.get_protocol()

返回当前协议。

只读传输

ReadTransport.is_reading()

如果传输正在接收新数据,则返回 True

3.7 版本新增。

ReadTransport.pause_reading()

暂停传输的接收端。在调用 resume_reading() 之前,不会将任何数据传递给协议的 protocol.data_received() 方法。

在 3.7 版本中更改: 此方法是幂等的,即当传输已暂停或关闭时也可以调用。

ReadTransport.resume_reading()

恢复接收端。如果有一些数据可供读取,则将再次调用协议的 protocol.data_received() 方法。

在 3.7 版本中更改: 此方法是幂等的,即当传输已经在读取时也可以调用。

只写传输

WriteTransport.abort()

立即关闭传输,而无需等待挂起的操作完成。缓冲数据将丢失。不会再接收到任何数据。协议的 protocol.connection_lost() 方法最终将使用 None 作为其参数调用。

WriteTransport.can_write_eof()

如果传输支持 write_eof(),则返回 True,否则返回 False

WriteTransport.get_write_buffer_size()

返回传输使用的输出缓冲区的当前大小。

WriteTransport.get_write_buffer_limits()

获取用于写入流量控制的 *高水位线* 和 *低水位线*。返回一个元组 (low, high),其中 *low* 和 *high* 是正字节数。

使用 set_write_buffer_limits() 设置限制。

3.4.2 版本新增。

WriteTransport.set_write_buffer_limits(high=None, low=None)

设置用于写入流量控制的 *高水位线* 和 *低水位线*。

这两个值(以字节数为单位)控制何时调用协议的 protocol.pause_writing()protocol.resume_writing() 方法。如果指定了,则低水位线必须小于或等于高水位线。highlow 都不能为负数。

当缓冲区大小大于或等于 high 值时,会调用 pause_writing()。如果写入已暂停,则当缓冲区大小小于或等于 low 值时,会调用 resume_writing()

默认值是特定于实现的。如果只给出高水位线,则低水位线默认为小于或等于高水位线的特定于实现的值。将 high 设置为零也会强制 low 为零,并导致只要缓冲区变为非空就调用 pause_writing()。将 low 设置为零会导致只有当缓冲区为空时才调用 resume_writing()。将任一限制设置为零通常不是最佳的,因为它减少了同时进行 I/O 和计算的机会。

使用 get_write_buffer_limits() 获取限制。

WriteTransport.write(data)

将一些 data 字节写入传输。

此方法不会阻塞;它会缓冲数据并安排异步发送。

WriteTransport.writelines(list_of_data)

将数据字节列表(或任何可迭代对象)写入传输。这在功能上等同于对可迭代对象产生的每个元素调用 write(),但可能会更有效地实现。

WriteTransport.write_eof()

在刷新所有缓冲数据后,关闭传输的写入端。仍然可以接收数据。

如果传输(例如 SSL)不支持半关闭连接,则此方法可能会引发 NotImplementedError

数据报传输

DatagramTransport.sendto(data, addr=None)

data 字节发送到由 addr 给出的远程对等方(传输相关的目标地址)。如果 addrNone,则数据将发送到在传输创建时给出的目标地址。

此方法不会阻塞;它会缓冲数据并安排异步发送。

3.13 版本更改: 可以调用此方法并传递一个空的字节对象来发送零长度的数据报。用于流控制的缓冲区大小计算也已更新,以考虑数据报头。

DatagramTransport.abort()

立即关闭传输,而无需等待挂起的操作完成。缓冲数据将丢失。不会再接收到任何数据。协议的 protocol.connection_lost() 方法最终将使用 None 作为其参数调用。

子进程传输

SubprocessTransport.get_pid()

以整数形式返回子进程的进程 ID。

SubprocessTransport.get_pipe_transport(fd)

返回与整数文件描述符 fd 对应的通信管道的传输。

  • 0:标准输入 (stdin) 的可读流式传输,如果子进程不是使用 stdin=PIPE 创建的,则为 None

  • 1:标准输出 (stdout) 的可写流式传输,如果子进程不是使用 stdout=PIPE 创建的,则为 None

  • 2:标准错误 (stderr) 的可写流式传输,如果子进程不是使用 stderr=PIPE 创建的,则为 None

  • 其他 fdNone

SubprocessTransport.get_returncode()

以整数形式返回子进程的返回码,如果它尚未返回,则返回 None,这类似于 subprocess.Popen.returncode 属性。

SubprocessTransport.kill()

杀死子进程。

在 POSIX 系统上,该函数会向子进程发送 SIGKILL 信号。在 Windows 上,此方法是 terminate() 的别名。

另请参阅 subprocess.Popen.kill()

SubprocessTransport.send_signal(signal)

subprocess.Popen.send_signal() 中所示,将 signal 编号发送到子进程。

SubprocessTransport.terminate()

停止子进程。

在 POSIX 系统上,此方法会向子进程发送 SIGTERM 信号。在 Windows 上,会调用 Windows API 函数 TerminateProcess() 来停止子进程。

另请参阅 subprocess.Popen.terminate()

SubprocessTransport.close()

通过调用 kill() 方法杀死子进程。

如果子进程尚未返回,则关闭 stdinstdoutstderr 管道的传输。

协议

源代码: Lib/asyncio/protocols.py


asyncio 提供了一组抽象基类,应该使用这些基类来实现网络协议。这些类旨在与 传输 一起使用。

抽象基协议类的子类可以实现某些或所有方法。所有这些方法都是回调:它们由传输在某些事件上调用,例如当收到一些数据时。基础协议方法应由相应的传输调用。

基础协议

class asyncio.BaseProtocol

具有所有协议共享的方法的基础协议。

class asyncio.Protocol(BaseProtocol)

用于实现流协议(TCP、Unix 套接字等)的基类。

class asyncio.BufferedProtocol(BaseProtocol)

用于实现具有手动控制接收缓冲区的流协议的基类。

class asyncio.DatagramProtocol(BaseProtocol)

用于实现数据报 (UDP) 协议的基类。

class asyncio.SubprocessProtocol(BaseProtocol)

用于实现与子进程(单向管道)通信的协议的基类。

基础协议

所有 asyncio 协议都可以实现基础协议回调。

连接回调

连接回调会在所有协议上调用,每次成功连接只调用一次。所有其他协议回调只能在这两个方法之间调用。

BaseProtocol.connection_made(transport)

当建立连接时调用。

transport 参数是表示连接的传输对象。协议负责存储对其传输的引用。

BaseProtocol.connection_lost(exc)

当连接丢失或关闭时调用。

参数是一个异常对象或 None。后者表示收到常规的 EOF,或者连接被此连接方中止或关闭。

流量控制回调

传输层可以调用流量控制回调来暂停或恢复协议执行的写入操作。

有关更多详细信息,请参阅 set_write_buffer_limits() 方法的文档。

BaseProtocol.pause_writing()

当传输的缓冲区超过高水位线时调用。

BaseProtocol.resume_writing()

当传输的缓冲区降到低水位线以下时调用。

如果缓冲区大小等于高水位线,则不会调用 pause_writing():缓冲区大小必须严格超过高水位线。

相反,当缓冲区大小等于或低于低水位线时,会调用 resume_writing()。这些结束条件对于确保当任何一个标记为零时按预期运行非常重要。

流式协议

事件方法,例如 loop.create_server()loop.create_unix_server()loop.create_connection()loop.create_unix_connection()loop.connect_accepted_socket()loop.connect_read_pipe()loop.connect_write_pipe() 接受返回流式协议的工厂方法。

Protocol.data_received(data)

当收到一些数据时调用。data 是一个非空的 bytes 对象,其中包含传入的数据。

数据是缓冲的、分块的还是重新组装的取决于传输。一般来说,你不应该依赖于特定的语义,而是使你的解析是通用的和灵活的。但是,始终按正确的顺序接收数据。

当连接打开时,该方法可以被调用任意次数。

但是,protocol.eof_received() 最多被调用一次。一旦调用 eof_received(),则不会再调用 data_received()

Protocol.eof_received()

当另一端发出信号表明它不会发送更多数据时调用(例如,如果另一端也使用 asyncio,则通过调用 transport.write_eof())。

此方法可能会返回假值(包括 None),在这种情况下,传输将自行关闭。相反,如果此方法返回真值,则使用的协议将确定是否关闭传输。由于默认实现返回 None,因此它隐式地关闭连接。

一些传输协议(包括 SSL)不支持半关闭连接,在这种情况下,从此方法返回真值将导致连接关闭。

状态机

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

缓冲流式协议

3.7 版本新增。

缓冲协议可以与任何支持 流式协议 的事件循环方法一起使用。

BufferedProtocol 的实现允许显式手动分配和控制接收缓冲区。然后,事件循环可以使用协议提供的缓冲区来避免不必要的数据复制。这可以为接收大量数据的协议带来显著的性能提升。复杂的协议实现可以显著减少缓冲区分配的数量。

以下回调在 BufferedProtocol 实例上调用

BufferedProtocol.get_buffer(sizehint)

调用以分配新的接收缓冲区。

sizehint 是返回缓冲区建议的最小大小。返回比 sizehint 建议的更小或更大的缓冲区是可以接受的。当设置为 -1 时,缓冲区大小可以是任意的。返回大小为零的缓冲区是错误的。

get_buffer() 必须返回一个实现 缓冲区协议 的对象。

BufferedProtocol.buffer_updated(nbytes)

当缓冲区用接收到的数据更新时调用。

nbytes 是写入缓冲区的总字节数。

BufferedProtocol.eof_received()

请参阅 protocol.eof_received() 方法的文档。

get_buffer() 可以在连接期间被调用任意次数。但是,protocol.eof_received() 最多被调用一次,如果调用,get_buffer()buffer_updated() 在其之后将不会再被调用。

状态机

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

数据报协议

数据报协议实例应该由传递给 loop.create_datagram_endpoint() 方法的协议工厂构造。

DatagramProtocol.datagram_received(data, addr)

当收到数据报时调用。data 是一个包含传入数据的 bytes 对象。addr 是发送数据的对等方的地址;具体格式取决于传输方式。

DatagramProtocol.error_received(exc)

当先前的发送或接收操作引发 OSError 时调用。excOSError 实例。

当传输(例如 UDP)检测到数据报无法传递给其接收者时,会在极少数情况下调用此方法。但在许多情况下,无法传递的数据报将被静默丢弃。

注意

在 BSD 系统(macOS、FreeBSD 等)上,数据报协议不支持流量控制,因为没有可靠的方法来检测因写入过多数据包而导致的发送失败。

套接字始终显示为“就绪”,多余的数据包将被丢弃。 可能会或可能不会引发将 errno 设置为 errno.ENOBUFSOSError;如果引发,它将报告给 DatagramProtocol.error_received(),否则将被忽略。

子进程协议

子进程协议实例应该由传递给 loop.subprocess_exec()loop.subprocess_shell() 方法的协议工厂构造。

SubprocessProtocol.pipe_data_received(fd, data)

当子进程将数据写入其 stdout 或 stderr 管道时调用。

fd 是管道的整数文件描述符。

data 是一个包含接收数据的非空 bytes 对象。

SubprocessProtocol.pipe_connection_lost(fd, exc)

当与子进程通信的管道之一关闭时调用。

fd 是已关闭的整数文件描述符。

SubprocessProtocol.process_exited()

当子进程退出时调用。

它可以在 pipe_data_received()pipe_connection_lost() 方法之前调用。

示例

TCP 回声服务器

使用 loop.create_server() 方法创建一个 TCP 回声服务器,将接收到的数据发送回去,并关闭连接

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        EchoServerProtocol,
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

另请参阅

使用流的 TCP 回声服务器 示例使用了高层级的 asyncio.start_server() 函数。

TCP 回声客户端

使用 loop.create_connection() 方法创建一个 TCP 回声客户端,发送数据,并等待直到连接关闭

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

另请参阅

使用流的 TCP 回声客户端 示例使用了高层级的 asyncio.open_connection() 函数。

UDP 回声服务器

使用 loop.create_datagram_endpoint() 方法创建一个 UDP 回声服务器,将接收到的数据发送回去

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        EchoServerProtocol,
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

UDP 回声客户端

使用 loop.create_datagram_endpoint() 方法创建一个 UDP 回声客户端,发送数据并在收到答案后关闭传输

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

连接现有套接字

使用带协议的 loop.create_connection() 方法,等待套接字接收数据

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

另请参阅

监视文件描述符的读取事件 示例使用了低层级的 loop.add_reader() 方法来注册 FD。

使用流的 注册一个打开的套接字以等待使用流的数据 示例使用了由协程中的 open_connection() 函数创建的高层级流。

loop.subprocess_exec() 和 SubprocessProtocol

一个子进程协议的示例,用于获取子进程的输出并等待子进程退出。

子进程由 loop.subprocess_exec() 方法创建

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()
        self.pipe_closed = False
        self.exited = False

    def pipe_connection_lost(self, fd, exc):
        self.pipe_closed = True
        self.check_for_exit()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exited = True
        # process_exited() method can be called before
        # pipe_connection_lost() method: wait until both methods are
        # called.
        self.check_for_exit()

    def check_for_exit(self):
        if self.pipe_closed and self.exited:
            self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

另请参阅使用高层级 API 编写的 相同示例