18.5.4. 传输和协议(基于回调的API)

18.5.4.1. 运输¶ T0>

Transports是由asyncio提供的类,以便抽象各种类型的通信通道。你一般不会自己实例化Transports;相反,您将调用AbstractEventLoop方法,该方法将创建传输并尝试启动基础通信通道,当它成功时调用您。

一旦建立了通信信道,传输总是与protocol实例配对。然后protocol可以出于各种目的调用transport的方法。

asyncio当前实现TCP,UDP,SSL和子进程管道的传输。transport上可用的方法取决于transport的种类。

Transports类不是线程安全的

18.5.4.1.1. BaseTransport

class asyncio.BaseTransport

BaseTransport基类。

close(self)

关闭Transport。如果Transport对象具有用于传出数据的缓冲区,则缓冲的数据将被异步刷新。也不再接收数据。在所有缓冲数据被刷新之后,Transport对象的connection_lost()方法将None作为其参数被调用。

is_closing(self)

如果Transport正在关闭或已经关闭,则返回True

版本3.5.1中的新功能。

get_extra_info(name, default=None)

返回可选的Transport对象信息。name是表示要获取的传输特定信息的字符串,如果信息不存在则返回default的值。

这种方法允许transport更容易地暴露通道特定的信息。

在3.5.1版更改: 'ssl_object'信息已添加到SSL套接字。

18.5.4.1.2. ReadTransport ¶ T0>

class asyncio.ReadTransport

只读transport接口。

pause_reading()

transport暂停接收。在调用resume_reading()之前,不会将任何数据传递到协议的data_received()

resume_reading()

恢复数据接收。如果一些数据可用于读取,则将再次调用协议的data_received()方法。

18.5.4.1.3. WriteTransport ¶ T0>

class asyncio.WriteTransport

只写transport接口。

abort()

立即关闭Transport对象,无需等待未知的操作完成。缓冲数据将丢失。数据也不会再被接收。Transport对象的connection_lost()方法最终将以None作为参数进行调用。

can_write_eof()

如果Transport对象支持write_eof()方法,则返回True,否则返回False

get_write_buffer_size()

返回Transport对象所使用的输出缓冲区的当前大小。

get_write_buffer_limits()

得到写缓冲的位和位的值。返回元组(低, 高)其中是确定的的字节数。

使用set_write_buffer_limits()函数设置写缓冲的高低位限制。

版本3.4.2中的新功能。

set_write_buffer_limits(high=None, low=None)

设置写入流控制的水限制。

这两个值控制调用协议的pause_writing()resume_writing()方法的时间。如果指定,低水限制必须小于或等于高水限制。都可以为负。

默认值是特定于实现的。如果仅给出了高水位限制,则低水位限制默认为小于或等于高水位限制的实施特定值。设置为零强制为零,并导致每当缓冲区变为非空时调用pause_writing()设置为零会导致resume_writing()仅在缓冲区为空时调用。对任一限制使用零通常是次优的,因为它减少了同时进行I / O和计算的机会。

使用get_write_buffer_limits()获取限制。

write(data)

将一些数据字节写入Transport对象。

本方法不阻塞;数据将被异步发送。

writelines(list_of_data)

将列表(或任何写迭代对象)写入到Transport对象中。这在功能上等同于对由迭代器产生的每个元素调用write(),但是可以更有效地实现。

write_eof()

在刷新缓冲的数据后关闭transport。但仍可接收数据。

如果transport不支持的话此方法可能引发NotImplementedError异常(例如,SSL)不支持半关闭。

18.5.4.1.4. DatagramTransport

DatagramTransport.sendto(data, addr=None)

数据字节发送到由addr(传输相关的目标地址)给出的远程对等体。如果addrNone,则将数据发送到在传输创建时给定的目标地址。

这个方法不阻塞;它缓冲数据并安排它异步发送。

DatagramTransport.abort()

立即关闭运输,无需等待待完成的操作。缓冲数据将丢失。不会收到更多的数据。协议的connection_lost()方法最终将以None作为参数进行调用。

18.5.4.1.5. BaseSubprocessTransport

class asyncio.BaseSubprocessTransport
get_pid()

返回子进程的进程ID。

get_pipe_transport(fd)

返回与整数文件对应的通信管道的传输描述器fd

  • 0:标准输入(stdin)的可读流式传输,或者None,如果子流程未使用stdin=PIPE
  • 1:标准输出(stdout)的可写流式传输,或者None,如果子流程未使用stdout=PIPE
  • 2:标准错误(stderr)的可写流式传输,或者None,如果子流程未使用stderr=PIPE
  • 其他fdNone
get_returncode()

subprocess.Popen.returncode属性类似,返回子进程returncode作为整数或None(如果未返回)。

kill(self)

杀死子进程,类似subprocess.Popen.kill()

在POSIX系统上,函数将发送SIGKILL信号到子进程。在Windows上,此方法是terminate()的别名。

send_signal(signal)

信号号发送到子过程,如subprocess.Popen.send_signal()中所示。

terminate()

请求子进程停止,如subprocess.Popen.terminate()中。此方法是close()方法的别名。

在POSIX系统上,此方法将SIGTERM发送到子过程。在Windows上,调用Windows API函数TerminateProcess()以停止子过程。

close()

如果子进程尚未返回,则通过调用terminate()方法来请求子进程停止,并关闭所有管道的传输(stdinstdout < t4>和stderr)。

18.5.4.2. 协议¶ T0>

asyncio提供了可以子类化来实现网络协议的基类。这些类与transports(见下文)结合使用:协议解析输入数据,并要求输出数据的写入,而传输负责实际的I / O和缓冲。

当子类化协议类时,建议您覆盖某些方法。这些方法是可被回调的:它们将由传输器在某些事件产生时(例如当接收到一些数据时)被调用;你不应该自己调用他们,除非你正在重新实现一个Transport类。

注意

所有回调都有默认实现,它们是空的。因此,您只需要实现您关注的事件的回调方法。

18.5.4.2.1. 协议类

class asyncio.Protocol

用于实现流协议的基类(用于TCP和SSL Transport)。

class asyncio.DatagramProtocol

用于实现数据报协议的基类(用于UDP Transport).

class asyncio.SubprocessProtocol

用于实现与子进程(通过一组单向管道)通信的协议的基类。

18.5.4.2.2. 连接回调

以下回调可以在ProtocolDatagramProtocolSubprocessProtocol实例上调用:

BaseProtocol.connection_made(transport)

连接完成时被调用。

transport参数是表示当前产生连接的Transport对象。您可以将您需要的一些内容存储起来(例如存到一些对象的属性中),如果你需要话。

BaseProtocol.connection_lost(exc)

在连接丢失或关闭时调用。

参数是异常对象或None后者意味着接收到正常的EOF,或者连接被连接的这一侧中止或关闭。

connection_made()connection_lost()每次成功连接只调用一次。所有其他回调将在这两种方法之间被调用,这允许在协议实现中更容易的资源管理。

以下回调只能在SubprocessProtocol实例上调用:

SubprocessProtocol.pipe_data_received(fd, data)

当子进程将数据写入其stdout或stderr管道时调用。fd是管道的整数文件描述器。data是包含数据的非空字节对象。

SubprocessProtocol.pipe_connection_lost(fd, exc)

当与子进程通信的其中一个管道关闭时调用。fd是关闭的整数文件描述器。

SubprocessProtocol.process_exited()

当子进程退出时调用。

18.5.4.2.3. 流协议

Protocol实例上调用以下回调:

Protocol.data_received(data)

当接收到一些数据时调用。data是包含传入数据的非空字节对象。

注意

数据是否被缓冲,分块或重组取决于传输。一般来说,你不应该依赖于特定的语义,而是让你的解析通用和足够灵活。但是,始终以正确的顺序接收数据。

Protocol.eof_received()

当另一端发出信号时,它将不再发送任何数据(例如通过调用write_eof(),如果另一端也使用asyncio)。

此方法可能返回false值(包括None),在这种情况下,传输将关闭自身。相反,如果此方法返回一个真值,关闭传输由协议决定。由于默认实现返回None,它隐式关闭连接。

注意

某些传输(如SSL)不支持半关闭连接,在这种情况下,从此方法返回true将不会阻止关闭连接。

data_received()可在连接期间调用任意次数。但是,eof_received()最多被调用一次,如果调用,data_received()将不会被调用。

状态机:

18.5.4.2.4. 数据报协议

DatagramProtocol实例上调用以下回调。

DatagramProtocol.datagram_received(data, addr)

在接收到数据报时调用。data是包含传入数据的字节对象。addr是发送数据的对方的地址;确切的格式取决于Transport对象。

DatagramProtocol.error_received(exc)

在先前的发送或接收操作引发OSError时调用。excOSError实例。

该方法极少调用,当传输(例如,UDP)检测到数据报不能传递到接收者。大多数时候,不可传递的数据报默认将被丢弃。

18.5.4.2.5. 流量控制回调

这些回调可以在ProtocolDatagramProtocolSubprocessProtocol实例上调用:

BaseProtocol.pause_writing()

当传输缓冲区超过高水位标记时调用。

BaseProtocol.resume_writing()

当Transport缓冲低于低位标记时调用。

pause_writing() and resume_writing() calls are paired – pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.

注意

如果缓冲区大小等于高水位线,pause_writing()不会被调用 - 它必须严格超过。相反,当缓冲区大小等于或低于低水位标记时,调用resume_writing()这些结束条件对于确保当任一标记为零时都如预期的那样重要。

注意

在BSD系统(OS X,FreeBSD等)DatagramProtocol不支持流控制,因为写入太多数据包导致的发送失败不能被轻易检测到。套接字总是出现“就绪”,多余的包被丢弃;可能会或可能不会引发具有errno设置为errno.ENOBUFSOSError如果它被提升,它将被报告到DatagramProtocol.error_received()但是否则被忽略。

18.5.4.2.6. 协程和协议

协程可以使用ensure_future()在协议方法中调度,但不能保证执行顺序。协议不知道在协议方法中创建协程,因此不会等待它们。

要具有可靠的执行顺序,请在协议中使用stream objects产生 例如,StreamWriter.drain()协程可以用于等待直到写入缓冲区被刷新。

18.5.4.3. Protocol示例

18.5.4.3.1. TCP回显客户端协议

TCP echo客户端使用AbstractEventLoop.create_connection()方法,发送数据并等待连接关闭:

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

事件循环运行两次。在这个简短的例子中,优先使用run_until_complete()方法来引发异常,如果服务器没有监听,而不必写一个短的协程来处理异常并停止运行循环。run_until_complete()退出时,循环不再运行,因此在发生错误时不需要停止循环。

也可以看看

TCP echo client using streams示例使用asyncio.open_connection()函数。

18.5.4.3.2. TCP回显服务器协议

TCP回显服务器使用AbstractEventLoop.create_server()方法,发送回接收到的数据并关闭连接:

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Transport.close()可以在WriteTransport.write()之后立即调用,即使数据尚未在套接字上发送:两种方法都是异步的。不需要yield ,因为这些传输方法不是协程。

也可以看看

TCP echo server using streams示例使用asyncio.start_server()函数。

18.5.4.3.3. UDP echo客户端协议

UDP echo客户端使用AbstractEventLoop.create_datagram_endpoint()方法,当我们收到答案时发送数据并关闭传输:

import asyncio

class EchoClientProtocol:
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        loop = asyncio.get_event_loop()
        loop.stop()

loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
    lambda: EchoClientProtocol(message, loop),
    remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

18.5.4.3.4. UDP echo服务器协议

UDP echo服务器使用AbstractEventLoop.create_datagram_endpoint()方法,发送回接收到的数据:

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
    EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

transport.close()
loop.close()

18.5.4.3.5. 使用协议注册一个打开的套接字以等待数据

等待套接字使用协议使用AbstractEventLoop.create_connection()方法接收数据,然后关闭事件循环

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

class MyProtocol(asyncio.Protocol):
    transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport (it will call connection_lost())
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed, stop the event loop
        loop.stop()

# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

None

watch a file descriptor for read events示例使用低级AbstractEventLoop.add_reader()方法注册套接字的文件描述器。

register an open socket to wait for data using streams示例使用协程中的open_connection()函数创建的高级流。