17.6. asyncore异步套接字处理器

Source code: Lib/asyncore.py


Asyncore模块提供了以异步的方式写入套接字服务客户端和服务器的基础结构。

只有两种方式使一个程序在单处理器上实现“同时做不止一件事”。多线程编程是最简单和最流行的方式,但是有另一种很不一样的技术,可以使得我们保持多线程的几乎所有优势,却不用真正使用多线程。 如果你的程序主要是受I/O限制的,这是唯一可行的方式。如果你的程序是受处理器限制的,则先发制人的调度线程可能是你真正需要的。但是,很少网络服务器是受处理器限制的。

如果您的操作系统支持在其I / O库的 select() 系统调用(几乎所有系统都支持),那么你可以用它一次处理多个通信信道;当你的I/O在后台忙碌时处理其他工作。虽然这一策略似乎很奇怪很复杂,尤其是最开始的时候,这在很多方面比多线程编程更容易理解和控制。asyncore 模块为你解决了很多困难,使你能快速构建复杂的高性能网络服务器和客户端。对于会话应用程序和协议, asynchat 模块是非常有用的。

两个模块背后的想法就是创建一个或者多个网络 通道, 及 asyncore.dispatcherasynchat.async_chat 类的实例. 如果你没有提供自己的映射的话,创建通道会把这两个实例加到由 loop() 函数使用的全局映射中。

一旦初始化通道被创建,调用 loop() 函数会激活通道服务,这会持续到最后一个通道(包括所有在异步服务中被加到映射中的通道)被关闭。 

asyncore.loop([timeout[, use_poll[, map[, count]]]])

输入一个循环轮询,直到输入count或者所有的通道被关闭。所有的参数都是可选的。count 参数默认为None, 只有当所有通道关闭后会使得循环结束。timeout 参数给合适的 select() 或者 poll() 调用设置超时参数,以秒为单位;默认为30秒。use_poll 参数,如果为真,显示 poll() 应该优先于 select() 被使用(默认为False)。

map 参数是一个包含要观察的通道的字典。 当通道关闭时它们会被从映射中删除。如果map 参数被省略,全局映射会被使用。通道(包括 asyncore.dispatcher, asynchat.async_chat 实例和子类)可以在映射中自由地组合。

class asyncore.dispatcher

dispatcher 类是一个底层套接字对象的简单封装。为使得这个更实用,这个类有少数由异步循环调用的,用来事件处理的函数。除此以外,可以将它视为一个正常的非阻塞套接字对象。

某一时间或者某一连接状态下底层事件的触发告诉异步循环,某一高级事件发生了。比如,如果你请求套接字连接另一个主机,我们知道,当套接字第一次变得可写入时,连接被创建(这时,你知道你可以成功的写入套接字)。这些高级事件指的是:

Event Description
handle_connect() Implied by the first read or write event
handle_close() Implied by a read event with no data available
handle_accept() Implied by a read event on a listening socket

在异步处理过程中,每一个被映射的通道的 readable()writable() 方法都会被用来决定是否这个通道的套接字应该被加到调用 select()或者 poll()来读或者写的事件的通道列表中。

所以,通道事件比基本套接字事件多很多。以下是全套可以在子类中被覆写的方法:

handle_read()

当异步循环检测到一个在通道套接字上的read() 调用时会成功被调用。

handle_write()

当异步循环检测到一个可写入的套接字可以被写入时被调用。常常这个方法为了性能会实现必要的缓存。比如:

def handle_write(self):
    sent = self.send(self.buffer)
    self.buffer = self.buffer[sent:]
handle_expt()

当套接字连接有带外数据(OOB data)时被调用。这几乎不会发生,因为OOB被支持但是几乎不使用。

handle_connect()

当活动opener的套接字真正创建连接时被调用。也许发送一个“欢迎”横幅,或者比如,初始化一个远程端点的协商协议。

handle_close()

当套接字被关闭时被调用。

handle_error()

当异常被引发并没有被其他处理时被调用。默认版本会打印出精简的回溯。

handle_accept()

当一个连接可以用一个对本地端点发出connect() 调用的端点来建立时,在监听通道(消极开启者)上调用 

readable()

每次在异步循环中被调用来决定是否一个通道的套接字应该被加到读事件可以发生的列表中。默认方法简单地返回 True,显然,所有的通道会关注于读事件。

writable()

每次在异步循环中被调用来决定是否一个通道的套接字应该被加到写事件可以发生的列表中。 默认方法简单地返回 True,显然,所有的通道会关注于写事件。

此外,每一个通道delegates或者继承许多套接字的方法。其中许多方法几乎是一样的。

create_socket(family, type)

这和普通套接字的创建是完全一样的,并且使用相同的设置。更多关于套接字的创建的信息参考 socket 文档。

connect(address)

因为是通常的套接字对象,address 是一个元组,第一个元素是要连接的主机,第二个是端口。

send(data)

发送 data 到套接字的远程端点。

recv(buffer_size)

从套接字的远程端点读取最多 buffer_size 字节。空字符显示通道被从另一端关闭了。

注意,即使 select.select() 或者 select.poll() 已经报告套接字可以开始读了,recv() 可能会抛出带有 EAGAIN 或者 EWOULDBLOCK 的错误 socket.error 。 

listen(backlog)

监听连接到套接字的连接。backlog 参数定义了最大队列连接的数量,最小为1;最大值和系统有关 (通常为 5)。

bind(address)

绑定套接字到 address. 套接字必须没有被绑定。(address 的格式取决于地址族 - 参考 socket 文档获取更多信息。) 调用 dispatcher 对象的 set_reuse_addr() 方法来标记套接字可以可重用 (要设置SO_REUSEADDR 选项)。 

accept()

接受一个连接。套接字必须被绑定到地址并监听连接。返回值可以是 None 或者一对 (conn, address),其中 conn 是一个在连接上用来发送和接收数据的 套接字对象,address 是连接另一端绑定到套接字上的地址。None 被返回时表示连接没有发生,这种情况下服务器应该忽略这个事件并继续监听之后传进的连接。

close()

关闭套接字。所有之后的操作都会失效。远程端点将会收不到更多的数据(在队列中的数据被刷新之后)。当被垃圾回收的时候,套接字会被自动关闭。

class asyncore.dispatcher_with_send

一个 dispatcher的子类,添加了简单的缓冲输出能力,对简单的客户端很有用。对于更复杂的情况用 asynchat.async_chat

class asyncore.file_dispatcher

file_dispatcher 接受文件描述符或者文件对象做参数,可附带可选映射参数,并覆写它以和 poll() or loop() 函数一起使用。如果提供了一个文件对象或者带有 fileno() 方法的其他的任何对象, 这个方法会被调用并传递给 file_wrapper 构造器。可用性: UNIX。

class asyncore.file_wrapper

file_wrapper接受一个整数文件描述符并调用 os.dup() 来复制handle,因此,原始的handle可以会被file_wrapper独立地关掉。这个类实现了足够的方法来模拟被file_dispatcher 类使用的套接字。可用性: UNIX.

17.6.1. asyncore 示例: 基本的HTTP客户端

以下是一个基本的HTTP客户端,使用了dispatcher 类实现套接字处理。

import asyncore, socket

class HTTPClient(asyncore.dispatcher):

    def __init__(self, host, path):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect( (host, 80) )
        self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path

    def handle_connect(self):
        pass

    def handle_close(self):
        self.close()

    def handle_read(self):
        print self.recv(8192)

    def writable(self):
        return (len(self.buffer) > 0)

    def handle_write(self):
        sent = self.send(self.buffer)
        self.buffer = self.buffer[sent:]


client = HTTPClient('www.python.org', '/')
asyncore.loop()

17.6.2. asyncore 示例:基本回显服务器

以下是一个基本的回显服务器,使用了 dispatcher 类来接收连接和分派传入连接给处理程序。

import asyncore
import socket

class EchoHandler(asyncore.dispatcher_with_send):

    def handle_read(self):
        data = self.recv(8192)
        if data:
            self.send(data)

class EchoServer(asyncore.dispatcher):

    def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(5)

    def handle_accept(self):
        pair = self.accept()
        if pair is not None:
            sock, addr = pair
            print 'Incoming connection from %s' % repr(addr)
            handler = EchoHandler(sock)

server = EchoServer('localhost', 8080)
asyncore.loop()