18.5.5. Streams(基于协程的API)

18.5.5.1. Stream的函数

None

本模块中的顶层函数仅用作方便包装;有真正没有什么特别的,如果他们不做你想要的,随时复制他们的代码。

coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, **kwds)

create_connection()的包装器返回(rader,writer)对。

reader返回的是一个StreamReader实例;writer是一个StreamWriter实例。

这些参数是AbstractEventLoop.create_connection()的所有常见参数,除了protocol_factory;最常见的是位置主机和端口,各种可选的关键字参数如下。

其他可选的关键字参数是loop(设置事件循环实例使用)和limit(设置传递给StreamReader的缓冲区限制) 。

此函数是coroutine

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, **kwds)

启动一个socket服务器,为每个连接的客户端产生一个回调。返回值与create_server()相同。

使用两个参数调用client_connected_cb参数:client_readerclient_writerclient_readerStreamReader对象,而client_writerStreamWriter对象。client_connected_cb参数可以是普通回调函数或协程函数;如果它是协程功能,它将被自动转换为任务

其余的参数是create_server()的所有常见参数,除了protocol_factory;最常见的是位置主机端口,各种可选的关键字参数如下。

其他可选的关键字参数是loop(设置事件循环实例使用)和limit(设置传递给StreamReader的缓冲区限制) 。

此函数是协程

coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, **kwds)

create_unix_connection()的包装器返回(读取器,写入器)对。

有关返回值和其他详细信息的信息,请参见open_connection()

此函数是coroutine

None

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, **kwds)

启动UNIX Domain Socket服务器,为每个连接的客户端回调一个。

有关返回值和其他详细信息的信息,请参见start_server()

此函数是coroutine

可用性:UNIX。

18.5.5.2. StreamReader

class asyncio.StreamReader(limit=None, loop=None)

此类为不是线程安全的

exception()

None

feed_eof()

确认EOF。

feed_data(data)

在内部缓冲区中输入数据字节。将恢复等待数据的任何操作。

set_exception(exc)

None

set_transport(transport)

设置传输。

coroutine read(n=-1)

读取最多n个字节。如果未提供n或设置为-1,请读取直到EOF并返回所有读取的字节。

如果接收到EOF并且内部缓冲区为空,则返回一个空的bytes对象。

此方法是coroutine

coroutine readline()

读取一行,其中“line”是以\n结尾的字节序列。

如果接收到EOF,并且未找到\n,则该方法将返回部分读取字节。

如果接收到EOF并且内部缓冲区为空,则返回一个空的bytes对象。

此方法是coroutine

coroutine readexactly(n)

读取n字节。如果读取n之前到达流的末尾,则引发IncompleteReadError,异常的IncompleteReadError.partial属性包含部分读字节。

此方法是协程

coroutine readuntil(separator=b'n')

从流中读取数据,直到找到separator

成功时,数据和分隔符将从内部缓冲区(消耗)中删除。返回的数据将包括末尾的分隔符。

配置流限制用于检查结果。Limit设置可以返回的数据的最大长度,不计算分隔符。

如果发生EOF并且仍未找到完整的分隔符,则会引发IncompleteReadError异常,并且内部缓冲区将被重置。IncompleteReadError.partial属性可能包含部分分隔符。

如果由于超限而无法读取数据,则会引发LimitOverrunError异常,并且数据将保留在内部缓冲区中,因此可以再次读取。

版本3.5.2中的新功能。

at_eof()

如果缓冲区为空且调用feed_eof(),则返回True

18.5.5.3. StreamWriter的¶ T0>

class asyncio.StreamWriter(transport, protocol, reader, loop)

包装运输。

这会使write()writelines()can_write_eof()write_eof()get_extra_info()close()它添加drain(),它返回可选的Future,您可以等待流控制。它还添加了直接引用Transport的传输属性。

此类为not thread safe

transport

运输。

can_write_eof()

如果传输支持write_eof()False,则返回True请参见WriteTransport.can_write_eof()

close()

关闭传输:请参阅BaseTransport.close()

coroutine drain()

让底层传输的写缓冲区有机会被刷新。

目的用途是写:

w.write(data)
yield from w.drain()

当传输缓冲区的大小达到高水限值(协议暂停)时,阻塞直到缓冲区的大小下降到低水限值,并且协议恢复。当没有什么要等待,收益率继续立即。

drain()得到的结果为循环提供了调度写入操作和刷新缓冲区的机会。尤其应当在可能大量的数据写入传输时使用,而协程不会从write()的调用之间产生。

此方法是coroutine

get_extra_info(name, default=None)

返回可选的传输信息:请参阅BaseTransport.get_extra_info()

write(data)

将一些数据字节写入传输:参见WriteTransport.write()

writelines(data)

将数据字节的列表(或任何可迭代的)写入传输:参见WriteTransport.writelines()

write_eof()

刷新缓冲数据后关闭传输的写入结束:请参阅WriteTransport.write_eof()

18.5.5.4. StreamReaderProtocol ¶ T0>

class asyncio.StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)

Trivial助手类适用于ProtocolStreamReader之间。Protocol的子类。

stream_reader is a StreamReader instance, client_connected_cb is an optional function called with (stream_reader, stream_writer) when a connection is made, loop is the event loop instance to use.

(这是一个帮助类,而不是使StreamReader本身为Protocol子类,因为StreamReader有其他潜在用途, StreamReader意外调用了协议的不当方法。)

18.5.5.5. IncompleteReadError ¶ T0>

exception asyncio.IncompleteReadError
Incomplete read error, subclass of EOFError.
expected

预期字节总数(int)。

partial

在达到流结束之前读取字节字符串(bytes)。

18.5.5.6. LimitOverrunError ¶ T0>

exception asyncio.LimitOverrunError

在查找分隔符时达到缓冲区限制。

consumed

要消耗的总字节数。

18.5.5.7. 流示例

18.5.5.7.1. 使用流的TCP回显客户端

TCP回显客户端使用asyncio.open_connection()函数:

import asyncio

@asyncio.coroutine
def tcp_echo_client(message, loop):
    reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
                                                        loop=loop)

    print('Send: %r' % message)
    writer.write(message.encode())

    data = yield from reader.read(100)
    print('Received: %r' % data.decode())

    print('Close the socket')
    writer.close()

message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()

18.5.5.7.2. 使用流的TCP回显服务器

TCP回显服务器使用asyncio.start_server()函数:

import asyncio

@asyncio.coroutine
def handle_echo(reader, writer):
    data = yield from reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    yield from writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

18.5.5.7.3. 获取HTTP头

在命令行中获取URL的HTTP头的简单示例:

import asyncio
import urllib.parse
import sys

@asyncio.coroutine
def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        connect = asyncio.open_connection(url.hostname, 443, ssl=True)
    else:
        connect = asyncio.open_connection(url.hostname, 80)
    reader, writer = yield from connect
    query = ('HEAD {path} HTTP/1.0\r\n'
             'Host: {hostname}\r\n'
             '\r\n').format(path=url.path or '/', hostname=url.hostname)
    writer.write(query.encode('latin-1'))
    while True:
        line = yield from reader.readline()
        if not line:
            break
        line = line.decode('latin1').rstrip()
        if line:
            print('HTTP header> %s' % line)

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_http_headers(url))
loop.run_until_complete(task)
loop.close()

用法:

python example.py http://example.com/path/page.html

或使用HTTPS:

python example.py https://example.com/path/page.html

18.5.5.7.4. 注册一个开放的套接字以使用流等待数据

协程等待,直到套接字使用open_connection()函数接收数据:

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

@asyncio.coroutine
def wait_for_data(loop):
    # Create a pair of connected sockets
    rsock, wsock = socketpair()

    # Register the open socket to wait for data
    reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = yield from reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_data(loop))
loop.close()

也可以看看

register an open socket to wait for data using a protocol示例使用由AbstractEventLoop.create_connection()方法创建的低级协议注册开放套接字以等待数据。

watch a file descriptor for read events示例使用低级AbstractEventLoop.add_reader()方法注册套接字的文件描述器。