18.5.9. 使用asyncio进行开发

异步编程不同于经典的顺序编程。本页列出了常见的陷阱,并解释了如何避免它们。

18.5.9.1. asyncio 的调试模式

asyncio的实现是为了性能而编写的。为了简化异步代码的开发,您可以启用调试模式

要为应用程序启用所有调试检查:

示例调试检查:

18.5.9.2. 取消

取消任务在经典编程中不常见。在异步编程中,不仅它是常见的,但你必须准备你的代码来处理它。

可以使用Future.cancel()方法显式取消Future和Task。当超时发生时,wait_for()函数取消等待的任务。还有许多其他情况下,任务可以间接取消。

如果Future被取消,请不要调用Futureset_result()set_exception()例如,写:

if not fut.cancelled():
    fut.set_result('done')

不要使用AbstractEventLoop.call_soon()直接调用set_result()set_exception()

如果你等待未来,你应该早点检查,如果未来被取消,以避免无用的操作。例:

@coroutine
def slow_operation(fut):
    if fut.cancelled():
        return
    # ... slow computation ...
    yield from fut
    # ...

shield()函数也可用于忽略取消。

18.5.9.3. 并发和多线程

事件循环在线程中运行,并在同一线程中执行所有回调和任务。当任务在事件循环中运行时,没有其他任务在同一线程中运行。但是当任务使用yield 时,任务将被挂起,事件循环执行下一个任务。

要计划不同线程的回调,应使用AbstractEventLoop.call_soon_threadsafe()方法。例:

loop.call_soon_threadsafe(callback, *args)

大多数asyncio对象不是线程安全的。你只需要担心如果你访问事件循环之外的对象。例如,要取消未来,请不要直接调用其Future.cancel()方法,但是:

loop.call_soon_threadsafe(fut.cancel)

为了处理信号和执行子进程,事件循环必须在主线程中运行。

要从不同的线程调度协程对象,应使用run_coroutine_threadsafe()函数。它返回concurrent.futures.Future以访问结果:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
result = future.result(timeout)  # Wait for the result with a timeout

AbstractEventLoop.run_in_executor()方法可以与线程池执行程序一起使用,以在不同线程中执行回调,从而不阻塞事件循环的线程。

也可以看看

Synchronization primitives部分介绍了同步任务的方法。

Subprocess and threads节列出了从不同线程运行子进程的asyncio限制。

18.5.9.4. 正确处理阻塞函数

不应该直接调用阻塞函数。例如,如果功能块阻塞1秒钟,其他任务会延迟1秒钟,这对性能有重要影响。

对于网络和子进程,asyncio模块提供了高级API,如protocols

执行器可用于在不同的线程中或甚至在不同的进程中运行任务,以不阻塞事件循环的线程。请参阅AbstractEventLoop.run_in_executor()方法。

也可以看看

Delayed calls部分详细说明事件循环如何处理时间。

18.5.9.5. 日志

asyncio模块使用记录器'asyncio'中的logging模块记录信息。

18.5.9.6. 检测从未计划的协程对象

当调用协程函数并且其结果未传递到ensure_future()或传递给AbstractEventLoop.create_task()方法时,协程对象的执行将永远被调度这可能是一个错误。Enable the debug mode of asynciolog a warning以检测它。

示例与错误:

import asyncio

@asyncio.coroutine
def test():
    print("never scheduled")

test()

调试模式下的输出:

Coroutine test() at test.py:3 was never yielded from
Coroutine object created at (most recent call last):
  File "test.py", line 7, in <module>
    test()

修复是使用协调对象调用ensure_future()函数或AbstractEventLoop.create_task()方法。

也可以看看

Pending task destroyed

18.5.9.7. 检测从未消耗过的异常

Python通常在未处理的异常上调用sys.displayhook()如果调用Future.set_exception(),但该异常从不消耗,则不会调用sys.displayhook()相反,当未来被垃圾回收器删除时,发出a log is emitted,其中traceback引发异常。

未处理异常的示例:

import asyncio

@asyncio.coroutine
def bug():
    raise Exception("not consumed")

loop = asyncio.get_event_loop()
asyncio.ensure_future(bug())
loop.run_forever()
loop.close()

输出:

Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at asyncio/coroutines.py:139> exception=Exception('not consumed',)>
Traceback (most recent call last):
  File "asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "test.py", line 5, in bug
    raise Exception("not consumed")
Exception: not consumed

Enable the debug mode of asyncio以获取创建任务的回溯。调试模式下的输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3> exception=Exception('not consumed',) created at test.py:8>
source_traceback: Object created at (most recent call last):
  File "test.py", line 8, in <module>
    asyncio.ensure_future(bug())
Traceback (most recent call last):
  File "asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "asyncio/coroutines.py", line 79, in __next__
    return next(self.gen)
  File "asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "test.py", line 5, in bug
    raise Exception("not consumed")
Exception: not consumed

有不同的选项来解决这个问题。第一个选项是在另一个协程中链接协程,并使用经典的try / except:

@asyncio.coroutine
def handle_exception():
    try:
        yield from bug()
    except Exception:
        print("exception consumed")

loop = asyncio.get_event_loop()
asyncio.ensure_future(handle_exception())
loop.run_forever()
loop.close()

另一个选项是使用AbstractEventLoop.run_until_complete()函数:

task = asyncio.ensure_future(bug())
try:
    loop.run_until_complete(task)
except Exception:
    print("exception consumed")

也可以看看

Future.exception()方法。

18.5.9.8. 链协程正确

当协程函数调用其他协程函数和任务时,应使用yield 显式链接它们。否则,不能保证执行是顺序的。

使用asyncio.sleep()来模拟慢操作的不同错误的示例:

import asyncio

@asyncio.coroutine
def create():
    yield from asyncio.sleep(3.0)
    print("(1) create file")

@asyncio.coroutine
def write():
    yield from asyncio.sleep(1.0)
    print("(2) write into file")

@asyncio.coroutine
def close():
    print("(3) close file")

@asyncio.coroutine
def test():
    asyncio.ensure_future(create())
    asyncio.ensure_future(write())
    asyncio.ensure_future(close())
    yield from asyncio.sleep(2.0)
    loop.stop()

loop = asyncio.get_event_loop()
asyncio.ensure_future(test())
loop.run_forever()
print("Pending tasks at exit: %s" % asyncio.Task.all_tasks(loop))
loop.close()

预期输出:

(1) create file
(2) write into file
(3) close file
Pending tasks at exit: set()

实际输出:

(3) close file
(2) write into file
Pending tasks at exit: {<Task pending create() at test.py:7 wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending create() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>

create()完成之前,close()write()之前被调用,顺序:create()write()close()

要修正此示例,任务必须用yield 标记

@asyncio.coroutine
def test():
    yield from asyncio.ensure_future(create())
    yield from asyncio.ensure_future(write())
    yield from asyncio.ensure_future(close())
    yield from asyncio.sleep(2.0)
    loop.stop()

或没有asyncio.ensure_future()

@asyncio.coroutine
def test():
    yield from create()
    yield from write()
    yield from close()
    yield from asyncio.sleep(2.0)
    loop.stop()

18.5.9.9. 挂起的任务被销毁

如果待处理的任务被销毁,则其包装的coroutine的执行未完成。这可能是一个错误,因此记录一个警告。

日志示例:

Task was destroyed but it is pending!
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()]>>

Enable the debug mode of asyncio以获取创建任务的回溯。登录调试模式示例:

Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "test.py", line 15, in <module>
    task = asyncio.ensure_future(coro, loop=loop)
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()] created at test.py:7> created at test.py:15>

18.5.9.10. 关闭传输和事件循环

当不再需要传输时,调用其close()方法释放资源。事件循环也必须明确地关闭。

如果传输或事件循环未显式关闭,则会在其析构函数中发出ResourceWarning警告。默认情况下,忽略ResourceWarning警告。Debug mode of asyncio部分介绍如何显示它们。