18.5.3. 任务和协程

18.5.3.1. 协程

asyncio一起使用的协程可以通过async def语句或使用generators来实现。在Python 3.5中添加了协程的async def类型,如果不需要支持旧的Python版本,则推荐使用。

基于生成器的协程应该用@asyncio.coroutine来装饰,尽管这不是强制的。装饰器能够与async def协程兼容,并且还作为文档。基于生成器的协程使用在 PEP 380中引入的产生 来自 而不是原始的yield语法。

“协程”一词与“生成器”一样,用于两个不同的(虽然相关的)概念:

  • 定义协程的函数(使用async def或用@asyncio.coroutine装饰的函数定义) 。如果需要消除歧义,我们称之为协程函数iscoroutinefunction()返回True)。
  • 通过调用协程函数获得的对象。此对象表示将最终完成的计算或I / O操作(通常是组合)。如果需要消除歧义,我们将它称为协程对象iscoroutine()返回True)。

协程可以做的事:

  • result = await future 或者 result = yield from future – 挂起协程直到future完成, 然后返回future的结果, 或者抛出一个异常, 两者都将传播. (如果future被取消,则会抛出CancelledError异常。)注意任务是 futures,关于 futures 的一切也适用于任务。
  • result = await coroutine or result = yield from coroutine – 等待另一个协程生产结果 (或者抛出一个异常, 其将被传播). coroutine表达式必须是到另一个协程的调用
  • return expression – 为通过 await 或者 yield from 来等待当前协程结果的协程生成一个结果.
  • raise exception - 为通过 await 或者 yield from 来等待当前协程结果的协程抛出一个异常.

调用协程不会启动其代码运行 - 调用返回的协程对象在您安排其执行之前不会执行任何操作。开始运行有两种基本方法:调用await 协程yield from 协程从另一个协程(假设另一个协程已经运行!),或使用ensure_future()函数或AbstractEventLoop.create_task()方法调度其执行。

协程(和任务)只能在事件循环运行时运行。

@asyncio.coroutine

装饰器来标记基于生成器的协程。This enables the generator use yield from to call async def coroutines, and also enables the generator to be called by async def coroutines, for instance using an await expression.

没有必要装饰async def协程本身。

如果生成器在销毁之前没有生成,则会记录一条错误消息。请参见Detect coroutines never scheduled

注意

在本文档中,一些方法被记录为协程,即使它们是返回Future的纯Python函数。这是有意在未来有调整这些职能的执行的自由。如果需要在回调式代码中使用这样的函数,请用ensure_future()包围其结果。

18.5.3.1.1. 示例:Hello World协程

协程显示 "Hello World"示例:

import asyncio

async def hello_world():
    print("Hello World!")

loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()

也可以看看

The Hello World with call_soon() example uses the AbstractEventLoop.call_soon() method to schedule a callback.

18.5.3.1.2. 示例:显示当前日期的协程

协程五秒内使用sleep() 函数显示每一秒的当前时间的示例::

import asyncio
import datetime

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

使用生成器实现的同一协议:

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)

也可以看看

display the current date with call_later()示例使用带有AbstractEventLoop.call_later()方法的回调显示当前日期。

18.5.3.1.3. 示例:链接协程

示例链接协程:

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

compute()链接到print_sum()print_sum()协程等待直到compute()然后返回其结果。

序列图示例:

“任务”是由AbstractEventLoop.run_until_complete()方法创建的,当它获取一个协程对象而不是一个任务。

该图显示了控制流,它没有准确描述内部如何工作。例如,睡眠协程在其内部创建一个 future 来调用 AbstractEventLoop.call_later()来实现 在1秒钟内唤醒任务。

18.5.3.2. InvalidStateError ¶ T0>

exception asyncio.InvalidStateError

在此状态下不允许操作。

18.5.3.3. TimeoutError ¶ T0>

exception asyncio.TimeoutError

操作超过了给定的截止日期。

注意

此异常与内置TimeoutError异常不同!

18.5.3.4. 将来¶ T0>

class asyncio.Future(*, loop=None)

此类别几乎concurrent.futures.Future兼容。

区别:

此类为not thread safe

cancel()

取消future并安排回调。

如果future已经完成或取消,返回False否则,将future的状态更改为“已取消”,安排回调并返回True

cancelled()

如果未来被取消,返回True

done()

如果未来完成,返回True。

完成意味着结果/异常可用,或未来被取消。

result()

返回这个未来所代表的结果。

如果未来已取消,引发CancelledError如果未来的结果尚不可用,引发InvalidStateError如果未来完成并且设置了异常,则会引发此异常。

exception()

返回在此未来设置的异常。

如果未来完成,则会返回异常(如果没有设置异常,则为None)。如果未来已取消,引发CancelledError如果未来还没有完成,引发InvalidStateError

add_done_callback(fn)

添加回调以便在未来完成时运行。

回调使用单个参数调用 - 未来对象。如果未来在调用时已经完成,则使用call_soon()计划回调。

Use functools.partial to pass parameters to the callback例如,fut.add_done_callback(functools.partial(print, “Future:”, flush = True)) t0 >将调用print(“Future:”, fut, flush = True)

remove_done_callback(fn)

从“调用完成时”列表中删除回调的所有实例。

返回已删除的回调的数量。

set_result(result)

标记未来完成并设置其结果。

如果调用此方法时未来已经完成,则引发InvalidStateError

set_exception(exception)

标记未来完成并设置异常。

如果调用此方法时未来已经完成,则引发InvalidStateError

18.5.3.4.1. 示例:使用run_until_complete()的期物

组合Futurecoroutine function的示例:

import asyncio

@asyncio.coroutine
def slow_operation(future):
    yield from asyncio.sleep(1)
    future.set_result('Future is done!')

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()

协程函数负责计算(需要1秒),并将结果存储到未来。run_until_complete()方法等待未来的完成。

注意

run_until_complete()方法在内部使用add_done_callback()方法在未来完成时通知您。

18.5.3.4.2. 示例:使用run_forever()的期物

可以使用Future.add_done_callback()方法来不同地编写前面的示例来明确描述控制流:

import asyncio

@asyncio.coroutine
def slow_operation(future):
    yield from asyncio.sleep(1)
    future.set_result('Future is done!')

def got_result(future):
    print(future.result())
    loop.stop()

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
    loop.run_forever()
finally:
    loop.close()

在此示例中,未来用于将slow_operation()链接到got_result():当slow_operation()完成时,got_result()与结果一起调用。

18.5.3.5. 任务¶ T0>

class asyncio.Task(coro, *, loop=None)

调度 协程 的执行︰ 封装成future。task就是 Future 的一个子类。

一项任务负责在事件循环中执行一个协程对象。如果包装的协程从未来产生,则任务暂停包装的协程的执行并等待未来的完成。当future完成时,包装的协程的执行重新开始与result或future的异常一起。

事件循环使用协同调度:事件循环一次只运行一个任务。如果其他事件循环在不同线程中运行,则其他任务可以并行运行。当任务等待未来的完成时,事件循环执行新任务。

任务的取消与未来的取消不同。调用cancel()会向包装的协程中抛出CancelledErrorcancelled() only returns True if the wrapped coroutine did not catch the CancelledError exception, or raised a CancelledError exception.

如果待处理的任务被销毁,则其包装的coroutine的执行未完成。它可能是一个错误,并记录了一个警告:请参阅Pending task destroyed

不要直接创建Task实例:使用ensure_future()函数或AbstractEventLoop.create_task()方法。

此类为not thread safe

classmethod all_tasks(loop=None)

返回一组事件循环的所有任务。

默认情况下,将返回当前事件循环的所有任务。

classmethod current_task(loop=None)

在事件循环或None中返回当前正在运行的任务。

默认情况下,返回当前事件循环的当前任务。

当不在Task的上下文中调用时返回None

cancel()

请求此任务取消本身。

这安排在通过事件循环的下一个循环中将CancelledError抛出到包装的协程中。协程然后有机会使用try / except / finally清理或甚至拒绝请求。

Future.cancel()不同,这不能保证任务将被取消:异常可能被捕获和执行,延迟任务的取消或完全阻止取消。任务也可能返回值或引发不同的异常。

在调用此方法后,cancelled()不会返回True(除非任务已取消)。当包装的协程以CancelledError异常(即使cancel()未被调用)终止时,任务将被标记为已取消。

get_stack(*, limit=None)

返回此任务的协程的堆栈帧的列表。

如果协程未完成,则返回它被暂停的堆栈。如果协程已成功完成或被取消,则返回一个空列表。如果协程由异常终止,则返回回溯帧列表。

帧总是从最旧到最新排序。

可选限制给出了要返回的最大帧数;默认情况下返回所有可用的帧。其含义根据是否返回堆栈或回溯而有所不同:返回堆栈的最新帧,但返回回溯的最早帧。(这与追溯模块的行为相匹配。)

由于我们无法控制的原因,对于暂停的协程只返回一个堆栈帧。

print_stack(*, limit=None, file=None)

打印此任务的协程的堆栈或回溯。

这为由get_stack()检索的帧产生与追溯模块类似的输出。limit参数传递给get_stack()。文件参数是写入输出的I / O流;默认情况下输出写入sys.stderr。

18.5.3.5.1. 示例:并行执行任务

并行执行3个任务(A,B,C)的示例:

import asyncio

@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        yield from asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
tasks = [
    asyncio.ensure_future(factorial("A", 2)),
    asyncio.ensure_future(factorial("B", 3)),
    asyncio.ensure_future(factorial("C", 4))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

输出:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24

任务在创建时自动计划执行。所有任务完成后,事件循环停止。

18.5.3.6. 任务函数

注意

在下面的函数中,可选的loop参数允许显式设置基础任务或协程使用的事件循环对象。如果它不提供,则使用默认的事件循环。

asyncio.as_completed(fs, *, loop=None, timeout=None)

返回一个迭代器,其值在等待时为Future实例。

如果超时在所有期货完成之前发生,则引发asyncio.TimeoutError

例:

for f in as_completed(fs):
    result = yield from f  # The 'yield from' may raise
    # Use result

注意

期货f不一定是fs的成员。

asyncio.ensure_future(coro_or_future, *, loop=None)

调度执行一个 coroutine object:并且它封装成future。返回任务对象。

如果参数是Future,则直接返回。

版本3.4.4中的新功能。

在版本3.5.1中更改:该函数接受任何awaitable对象。

也可以看看

AbstractEventLoop.create_task()方法。

asyncio.async(coro_or_future, *, loop=None)

ensure_future()的已弃用别名。

自3.4.4版起已弃用。

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

自给定的协程对象或futures返回一个future汇总结果。

所有futures必须共享相同的事件循环。如果所有任务都成功完成,则返回的future的结果是结果列表(按照原始序列的顺序,不一定是结果到达的顺序)。如果return_exceptions为True,则任务中的异常被视为与成功结果相同,并在结果列表中收集;否则,第一个引发的异常将立即传播到返回的future。

取消:如果外部未来被取消,所有的孩子(还没有完成)也被取消。如果有任何子项被取消,系统会将其视为CancelledError - 外部未来在此情况下取消(这是为了防止取消一个孩子导致其他孩子被取消。)

asyncio.iscoroutine(obj)

返回True if objcoroutine object,其可以基于生成器或async def协程。

asyncio.iscoroutinefunction(func)

如果func被确定为coroutine function,则返回True,其可以是装饰的生成器函数或async def函数。

asyncio.run_coroutine_threadsafe(coro, loop)

coroutine object提交到给定的事件循环。

返回concurrent.futures.Future以访问结果。

此函数旨在从与运行事件循环的线程不同的线程调用。用法:

# Create a coroutine
coro = asyncio.sleep(1, result=3)
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果在协程中出现异常,则将通知返回的未来。它也可以用来取消事件循环中的任务:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print('The coroutine raised an exception: {!r}'.format(exc))
else:
    print('The coroutine returned: {!r}'.format(result))

请参阅文档的concurrency and multithreading部分。

注意

与模块中的其他函数不同,run_coroutine_threadsafe()需要明确传递循环参数。

版本3.5.1中的新功能。

coroutine asyncio.sleep(delay, result=None, *, loop=None)

创建一个在给定时间(以秒为单位)后完成的协程如果提供result,则当协程完成时,将生成给调用者。

睡眠的分辨率取决于事件循环的granularity of the event loop

此函数是coroutine

asyncio.shield(arg, *, loop=None)

等待未来,屏蔽它取消。

该声明:

res = yield from shield(something())

正好等于语句:

res = yield from something()

除了,如果包含它的协程被取消,在something()中运行的任务不会被取消。something()的角度来看,取消没有发生。但是它的调用者仍然被取消,所以yield-from表达式仍然引发CancelledError注意:如果something()被其他方法取消,这仍然会取消shield()

如果要完全忽略取消(不推荐),您可以将shield()与try / except子句组合,如下所示:

try:
    res = yield from shield(something())
except CancelledError:
    res = None
coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

等待由序列futures给出的Futures和协程对象完成。协程将被包裹在任务中。返回含两个集合的Future:(done,pending)。

序列futures不能为空。

timeout可用于控制返回前等待的最大秒数。timeout可以是一个int或float。如果未指定超时None,则等待时间没有限制。

return_when指示此函数应返回的时间。它必须是concurrent.futures模块的以下常量之一:

常量描述
FIRST_COMPLETED当任何未来完成或被取消时,该函数将返回。
FIRST_EXCEPTION当任何future通过抛出异常完成时,函数将返回。如果没有未来引发异常,则它等效于ALL_COMPLETED
ALL_COMPLETED当所有期货完成或被取消时,该函数将返回。

此函数是coroutine

用法:

done, pending = yield from asyncio.wait(fs)

注意

这不引发asyncio.TimeoutError在超时发生时未完成的期货在第二组中返回。

coroutine asyncio.wait_for(fut, timeout, *, loop=None)

等待单个Futurecoroutine object完成超时。如果超时None,则阻止直到未来完成。

协程将包裹在Task中。

返回未来或协程的结果。当超时发生时,它取消任务并引发asyncio.TimeoutError要避免任务取消,请将其封装在shield()中。

如果等待被取消,未来fut也会被取消。

此函数是coroutine,用法:

result = yield from asyncio.wait_for(fut, 60.0)

在版本3.4.3中更改:如果等待被取消,未来fut现在也被取消。