Python中的协程是处理并发任务的一种方式,它们基于异步编程模型,可以更高效地处理I/O密集型操作。协程有两种,一种无栈协程,python中以标准库asyncio为代表;一种有栈协程,python 中以第三方库gevent为代表。本文主要讲解asyncio协程。

协程通过asyncawait关键字来实现,能够让代码在等待I/O操作时继续执行其他任务,从而提高程序的性能和响应性。

协程(Coroutine)本质上是把程序的任务分成多个部分,在遇到阻塞的I/O操作时主动挂起当前部分,线程执行其他待执行的部分。协程相比多线程的一大优势就是省去了多线程之间的切换开销,获得了更高的运行效率。由于协程本质上在单进程单线程内执行程序,因此,同时执行的任务只能有一个。在切换执行任务时不存在系统级的上下文切换,都是在用户态内进行的执行代码块的切换。

asyncio 是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。asyncio包的运行核心就是event loop(事件循环)。可等待对象: 如果一个对象可以使用 await 关键字修饰,那么它就是可等待对象(awaitable object)。在Asyncio包中,许多API都是接收 可等待对象 作为参数。主要有三种可等待对象:coroutine、task、future。 其中coroutine 有两层相关定义:coroutine function(协程函数)、 coroutine object(协程对象)。

asyncio 的编程模型本质是一个消息循环,我们一般先定义一个协程函数(或任务), 从 asyncio 模块中获取事件循环loop,然后把需要执行的协程任务(或任务列表)扔到 loop中执行,就实现了异步IO。 实际应用中,我们先由协程函数创建协程任务,然后把它们加入协程任务列表,最后一起交由事件循环执行。

asyncio.iscoroutine() 验证协程函数 asyncio.ensure_future

基本概念

编辑
  • 协程函数:通过async def关键字定义的函数。可以将coroutines理解为一种特殊的,具有停止执行然后根据特殊io完成信息拿到结果后可以继续执行的任务。这里我们通过async关键字来定义协程、通过await关键字来暂停协程的执行。直接调用协程并不会开始执行(实际上得到了一个coroutine object),需要将其显式的放入event loop中才能执行。
  • 协程对象:调用协程函数时返回的对象,必须用await关键字来执行。res = work()创建协程对象,函数内部代码不会执行。如果要运行协程函数内部代码,必须要将协程对象交给事件循环来处理。
  • await:用于挂起协程的执行,直到await后的任务完成。await必须在async函数中使用。await后跟可等待对象,可等待对象包括协程对象、Future和Task对象,这些都是IO等待。等IO操作完成之后再继续往下执行,当前协程(任务)挂起时,事件循环可以执行其他协程(任务)。

同一个协程任务中,多个await,会依次等待等待对象执行完成;不同协程任务中,遇到await会交替执行。

  • 事件循环:管理协程的执行,不断地检查并运行已经注册的任务或协程。通常使用asyncio库中的事件循环。asyncio.run() 是最常用的启动事件循环并运行协程的方式,它会创建一个新的事件循环,运行传递的协程,直到协程完成,然后关闭事件循环。通常不需要直接操作事件循环。
    • loop.run_until_complete(future) 函数运行直到 future (Future 的实例) 被完成。
    • loop.run_forever() 函数运行事件循环直到 stop() 被调用。
    • loop.stop() 函数停止事件循环。
    • loop.is_running() 函数返回 True 如果事件循环当前正在运行。
    • loop.is_closed() 函数如果事件循环已经被关闭,返回 True 。
    • loop.close() 函数关闭事件循环。
    • loop.create_future() 函数创建一个附加到事件循环中的 asyncio.Future 对象。
    • loop.create_task(coro, *, name=None) 函数安排一个 协程 的执行。返回一个 Task 对象。
    • loop.set_task_factory(factory) 函数设置一个 task 工厂 , 被用于 loop.create_task() 。
    • loop.get_task_factory() 函数返回一个任务工厂,或者如果是使用默认值则返回 None。
    • loop.call_soon(callback, *args, context=None) 函数安排 callback 在事件循环的下一次迭代时附带 args 参数被调用。回调按其注册顺序被调用。每个回调仅被调用一次。方法不是线程安全的。
    • loop.call_soon_threadsafe(callback, *args, context=None) 函数是 call_soon() 的线程安全变体。必须被用于安排 来自其他线程 的回调。
    • loop.call_later(delay, callback, *args, context=None) 函数安排 callback 在给定的 delay 秒(可以是 int 或者 float)后被调用。
    • loop.call_at(when, callback, *args, context=None) 函数安排 callback 在给定的绝对时间戳的时间(一个 int 或者 float)被调用,使用与 loop.time() 同样的时间参考。
    • loop.time() 函数根据时间循环内部的单调时钟,返回当前时间, float 值。
    • loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None) 函数打开一个流式传输连接,连接到由 host 和 port 指定的地址。
    • loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True) 函数创建TCP服务 (socket 类型 SOCK_STREAM ) 监听 host 地址的 port 端口。
    • loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True) 函数与 loop.create_server() 类似但是专用于 AF_UNIX 套接字族。path 是必要的 Unix 域套接字名称,除非提供了 sock 参数。 抽象的 Unix 套接字, str, bytes 和 Path 路径都是受支持的。
    • loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None) 函数将已被接受的连接包装成一个传输/协议对。
    • loop.sock_recv(sock, nbytes) 函数从 sock 接收至多 nbytes。 socket.recv() 的异步版本。
    • loop.sock_recv_into(sock, buf) 函数从 sock 接收数据放入 buf 缓冲区。 模仿了阻塞型的 socket.recv_into() 方法。
    • loop.sock_sendall(sock, data) 函数将 data 发送到 sock 套接字。 socket.sendall() 的异步版本。
    • loop.sock_accept(sock) 函数接受一个连接。 模仿了阻塞型的 socket.accept() 方法。
    • loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True) 函数在可能的情况下使用高性能的 os.sendfile 发送文件。 返回所发送的字节总数。
    • loop.set_exception_handler(handler) 函数将 handler 设置为新的事件循环异常处理器。
    • loop.get_exception_handler() 函数返回当前的异常处理器,如果没有设置异常处理器,则返回 None 。
    • loop.default_exception_handler(context) 函数默认的异常处理器。
    • loop.call_exception_handler(context) 函数调用当前事件循环的异常处理器。
    • loop.get_debug() 函数获取事件循环调试模式设置(bool)。
    • loop.set_debug(enabled: bool) 函数设置事件循环的调试模式。
  • “可等待对象”(awaitable object),其状态可以是“挂起”(即还未完成),在完成时会通知 await 关键字使得挂起的协程恢复执行。可等待对象包括:
    • 协程对象: 通过 async def 定义的函数的调用得到的对象。这些对象表示一个可以被暂停和恢复的异步操作。
    • asyncio.Future 类对象:await future将暂停当前协程的执行,直到future完成。Future 是一种特殊的低级可等待对象,它代表了异步操作的最终结果。当一个 Future 对象被 await 时,意味着协程将会等待直到这个 Future 在其他地方被解决。在 asyncio 中,Future 对象的存在是为了使基于回调的代码可以与 async/await 语法一起使用。通常,在应用层代码中并不需要创建 Future 对象。
      • state数据成员: 任务的状态, 分别是 PENDING , CANCELLED , FINISHED .
      • loop数据成员: 事件循环, 用于执行回调函数.
      • callbacks数据成员: 回调函数列表, 用于存储回调函数.
      • result数据成员: 任务的结果.
      • exception数据成员: 任务的异常
      • Future对象创建后可以用set_result()方法设置结果。
      • result()返回Future对象的结果;如果任务尚未完成,则抛出asyncio.InvalidStateError。
      • exception()返回 Future 对象的异常(如果有的话)。如果任务尚未完成,则抛出 asyncio.InvalidStateError。
      • set_result(result):设置 Future 对象的结果,并通知所有等待的协程。如果任务已经完成,则抛出asyncio.InvalidStateError。
      • set_exception(exception): 设置 Future 对象的异常,并通知所有等待的协程。如果任务已经完成,则抛出asyncio.InvalidStateError。
      • cancel():取消 Future 对象。如果任务已经完成或不能取消,则返回False。
      • done():返回True如果任务已经完成。
      • cancelled():返回True如果任务已经被取消。
      • asyncio.Future(*, loop=None) 函数是一个 Future 代表一个异步运算的最终结果。线程不安全。
      • asyncio.isfuture(obj) 函数用来判断如果 obj 为一个 asyncio.Future类的示例、 asyncio.Task 类的实例或者一个具有 _asyncio_future_blocking 属性的对象,返回 True。
      • asyncio.ensure_future(obj, *, loop=None) 函数创建新任务。
      • asyncio.wrap_future(future, *, loop=None) 函数将一个 concurrent.futures.Future 对象封装到 asyncio.Future 对象中。
      • fut.result() 函数返回 Future 的结果。
      • fut.set_result(result) 函数将 Future 标记为 完成 并设置结果。
      • fut.set_exception(exception) 函数将 Future 标记为 完成 并设置一个异常。
      • fut.done() 函数如果 Future 为已 完成 则返回 True 。
      • fut.cancelled() 函数是如果 Future 已取消则返回 True
      • fut.add_done_callback(callback, *, context=None) 函数添加一个在 Future 完成 时运行的回调函数。
      • fut.remove_done_callback(callback) 函数从回调列表中移除 callback 。
      • fut.cancel() 函数取消 Future 并调度回调函数。
      • fut.exception() 函数返回 Future 已设置的异常。
      • fut.get_loop() 函数返回 Future 对象已绑定的事件循环。
    • Task 对象是asyncio的一种特殊类型的Future,即是Future的子类。Task 对象管理协程的执行并可以提供协程的执行状态和结果。
      • asyncio.create_task(my_task()) 将 my_task 协程包装成 Task 对象。await task 将暂停 main 协程的执行,直到 task 完成。Task对象通过add_done_callback方法增加回调函数。
      • cancel(msg=None)请求取消任务。这安排在事件循环的下一次循环中向包装的协程抛出 CancelledError 异常。协程有机会清理或甚至通过 try … except CancelledError … finally 块来拒绝请求。因此,与 Future.cancel() 不同,Task.cancel() 不保证任务会被取消,尽管完全抑制取消并不常见并且被积极反对。如果协程决定抑制取消,它需要调用 Task.uncancel() 来补充捕获异常。
      • task.cancelled() 如果任务被取消了,则返回 True。当取消请求通过 cancel() 请求并且包装的协程传播了抛入的 CancelledError 异常时,任务被取消。
      • uncancel() 减少对任务的取消请求计数。返回剩余的取消请求数量。注意,一旦取消的任务的执行完成,进一步调用 uncancel() 是无效的。这个方法由 asyncio 的内部使用,不建议终端用户代码使用。特别是,如果任务成功取消,这允许结构化并发的元素(如任务组和 asyncio.timeout())继续运行,将取消隔离在相关的结构化块中。
      • cancelling()返回对任务的待处理取消请求的数量,即调用 cancel() 的次数减去调用 uncancel() 的次数。注意,如果这个数字大于零但任务仍在执行中,cancelled() 仍然返回 False。这是因为这个数字可以通过调用 uncancel() 减少,这可能导致任务最终未被取消。
      • task.done() 如果任务完成了,则返回 True。任务完成的条件是协程返回了值、引发了异常,或者任务被取消了。
      • task.result() 返回任务的结果。如果任务完成了,返回包装协程的结果(如果协程引发了异常,则重新引发那个异常)。如果任务被取消了,这个方法会引发 CancelledError 异常。如果任务的结果尚不可用,这个方法会引发 InvalidStateError 异常。
      • task.exception() 返回任务的异常。如果包装的协程引发了异常,则返回那个异常。如果包装的协程正常返回,则这个方法返回 None。如果任务被取消了,这个方法会引发 CancelledError 异常。如果任务尚未完成,这个方法会引发 InvalidStateError 异常。
      • add_done_callback(callback, *, context=None)添加一个回调函数,当任务完成时运行。这个方法仅应在低级回调代码中使用。
      • task.remove_done_callback(callback) 从回调列表中移除回调函数。这个方法仅应在低级回调代码中使用。
      • task.get_stack(*, limit=None) 返回此任务的堆栈帧列表。如果包装的协程尚未完成,则返回协程挂起时的堆栈。如果协程成功完成或被取消,则返回一个空列表。如果协程因异常终止,则返回回溯帧列表。帧始终按从最旧到最新的顺序返回。返回的帧的数量由可选的 limit 参数控制;默认情况下返回所有可用的帧。堆栈或回溯的返回列表顺序不同:堆栈的最新帧会被返回,而回溯的最旧帧会被返回。
      • task.print_stack(*, limit=None, file=None) 打印任务的堆栈或回溯。这产生的输出类似于 traceback 模块对于 get_stack() 获取的帧。limit 参数直接传递给 get_stack()。file 参数是一个 IO 流,输出将写入到该流;默认情况下,输出写入 sys.stdout。
      • task.get_coro() 返回由任务包装的协程对象。注意 对于已经急切完成的任务,这将返回 None。参见急切任务工厂。
      • task.get_name() 返回任务的名称。如果没有明确分配名称,默认的 asyncio 任务实现会在实例化期间生成一个默认名称。
      • task.set_name(value) 设置任务的名称。value 参数可以是任何对象,然后会被转换为字符串。在默认的任务实现中,名称会显示在任务对象的 repr() 输出中。
      • get_context()返回与任务关联的 contextvars.Context 对象。
    • 实现了 __await__ 方法的类和对象

底层实现

编辑

res = await fun1()底层步骤:

  1. 创建协程对象:调用 fun1() 返回一个协程对象。这个对象表示一个尚未完成的异步操作。
  2. 调度协程对象:在Python 3.7及以后的版本中,协程对象会被自动封装为一个Task对象。这一步实际上是由事件循环完成的。
  3. 等待Future完成:使用await关键字暂停当前协程(caller)的执行,直到Task对象完成。这一步是通过 asyncio 的事件循环来实现的。await在内部会阻塞当前协程,直到Task对象的Future完成。
  4. 获取结果:一旦Task对象完成,await会恢复当前协程的执行,并获取Task的结果。Task对象的result()方法被调用,获取异步操作的最终结果。这个结果被赋值给变量res。
  5. 处理异常:如果协程执行过程中抛出了异常,await会将异常重新抛出,允许在调用await的协程中捕获并处理它。

六种正确的并发调用方式和两种错误调用方式

编辑
import asyncio
import time
import io
 

async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')


async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')


async def s1():
    await a()
    await b()

async def s2():
    await asyncio.create_task(a())
    await asyncio.create_task(b())

async def c1():
    await asyncio.gather(a(), b())

async def c2():
    await asyncio.wait([asyncio.create_task(a()), asyncio.create_task(b())])

async def c3():
    task1 = asyncio.create_task(a())
    task2 = asyncio.create_task(b())
    await task1
    await task2

async def c4():
    task = asyncio.create_task(b())
    await a()
    await task    

async def c5():
    task = asyncio.ensure_future(b())
    await a()
    await task

async def c6():
    loop = asyncio.get_event_loop()
    task = loop.create_task(b())
    await a()
    await task

def show_perf(func):
    print('*' * 20)
    start = time.perf_counter()
    asyncio.run(func())
    print(f'{func.__name__} Cost: {time.perf_counter() - start}')

show_perf(c2)

示例代码

编辑

下面是一个使用 asyncio 模块的简单示例:

import asyncio
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟I/O操作
    print("World")

async def main():
    await asyncio.gather(say_hello(), say_hello())

# 运行协程
if __name__ == "__main__":
    asyncio.run(main())

在这个示例中:

  • say_hello 是一个协程函数,它在打印“Hello”后等待1秒钟,然后打印“World”。
  • main 协程函数使用 asyncio.gather 并发执行两个 say_hello 协程。
  • asyncio.run(main()) 启动事件循环并运行 main 协程。

进阶用法:

协程中的异常处理:可以使用 try 和 except 来处理协程中的异常。

async def risky_operation():
    try:
        # 一些可能引发异常的操作
        await asyncio.sleep(1)
        raise ValueError("Something went wrong!")
    except ValueError as e:
        print(f"Error: {e}")

asyncio.run(risky_operation())

并发任务:使用 asyncio.gather 来并发执行多个协程任务。

async def task1():
    await asyncio.sleep(2)
    print("Task 1 completed")

async def task2():
    await asyncio.sleep(1)
    print("Task 2 completed")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

另一个多任务的例子:

import asyncio

async def fetch_data(name, delay):
    print(f"{name} started, will take {delay} seconds.")
    await asyncio.sleep(delay)
    print(f"{name} completed.")
    return f"Result from {name}"

async def main():
    # 创建多个任务
    tasks = [
        asyncio.create_task(fetch_data("Task 1", 3)),
        asyncio.create_task(fetch_data("Task 2", 1)),
        asyncio.create_task(fetch_data("Task 3", 2)),
    ]
    
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    
    print("All tasks completed.")
    for result in results:
        print(result)

# 运行主函数
asyncio.run(main())

超时控制:可以使用 asyncio.wait_for 来设置协程的超时时间。

async def slow_operation():
    await asyncio.sleep(10)

async def main():
    try:
        await asyncio.wait_for(slow_operation(), timeout=2)
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())

协程是Python中处理异步操作的强大工具,适用于各种需要并发操作的场景,如网络请求、文件读写等。使用它们可以让代码更清晰、执行效率更高。

其他

编辑

ensure_future

编辑

asyncio.ensure_future 用于将协程或 Future 对象调度到事件循环中,并返回一个 Future 对象。它确保协程在事件循环中运行,并允许你检查任务的状态或结果。Python 3.7引入了asyncio.create_task, 功能与 ensure_future 类似,但更具语义化。如果你的代码只需要调度协程而不需要处理 Future 对象,建议使用 asyncio.create_task。

asyncio.gather和asyncio.await的用途区别

编辑

asyncio.gather是一个简单、直接的用法。封装为Task执行的黑盒,返回协程的执行结果。

 asyncio.gather(*aws, return_exceptions=False)

并发地运行aws序列中的所有可等待对象。如果aws中的任何可等待对象是协程,它会被自动调度为一个任务(Task)。如果所有的可等待对象都成功完成,结果将是一个聚合的返回值列表。结果值的顺序与 aws 中的可等待对象的顺序相对应。如果参数return_exceptions 为 False(默认值),第一个引发的异常会立即传递给等待gather()的任务。aws 序列中的其他可等待对象不会被取消,并将继续运行。如果参数return_exceptions为True,异常会被视为成功结果,并被聚合到结果列表中。如果gather()被取消,所有提交的(尚未完成的)可等待对象也会被取消。如果aws序列中的任何任务(Task)或未来(Future)被取消,它将被视为引发了CancelledError——在这种情况下,gather() 调用不会被取消。这是为了防止一个被取消的任务/未来导致其他任务/未来也被取消。注意:一个新的替代方案来并发地创建和运行任务并等待它们完成是 asyncio.TaskGroup。TaskGroup 提供比 gather 更强的安全保证,用于调度子任务的嵌套:如果一个任务(或子任务,即由任务调度的任务)引发异常,TaskGroup 会取消其余调度的任务,而 gather 不会这样做。

asyncio.await是一个高级、复杂的用法。

done, pending = asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

并发运行aws可迭代对象中的Future和Task实例,并阻塞直到满足return_when指定的条件。aws可迭代对象不能为空。返回两个Tasks/Futures集合:(done, pending)。参数timeout(浮点数或整数),如果指定,可以用来控制在返回之前等待的最大秒数。注意,这个函数不会引发TimeoutError。在超时时,如果某些 Futures或Tasks尚未完成,不会取消Futures,它们将简单地被返回到第二个集合中。参数return_when指定了函数应何时返回。它必须是以下常量之一:

  • asyncio.FIRST_COMPLETED 当任何 Future 完成或被取消时,函数将返回。
  • asyncio.FIRST_EXCEPTION 当任何 Future 因引发异常而完成时,函数将返回。如果没有 Future 引发异常,则等同于 ALL_COMPLETED。
  • asyncio.ALL_COMPLETED 当所有 Futures 完成或被取消时,函数将返回。

asyncio.wait的返回值有2项,第一项表示完成的任务列表(done),第二项表示等待(Future)完成的任务列表(pending),每个任务都是一个Task实例,由于这2个任务都已经完成,所以可以执行task.result()获得协程返回值。asyncio.wait支持选择返回的时机。asyncio.wait的参数return_when,在默认情况下,asyncio.wait会等待全部任务完成(return_when='ALL_COMPLETED'),它还支持FIRST_COMPLETED(第一个协程完成就返回)和FIRST_EXCEPTION(出现第一个异常就返回)。

使用asyncio.wait的场合:

  • 需要拿到封装好的Task,以便取消或者添加成功回调等
  • 业务上需要FIRST_COMPLETED/FIRST_EXCEPTION即返回的

asyncio.wait的历史:

  • 3.10 版本中: 移除了 loop 参数。
  • 3.11 版本中: 直接将协程对象传递给 wait() 是禁止的。
  • 3.12 版本中: 添加了对生成器生成任务的支持。

3种超时机制

编辑

类asyncio.timeout(delay)返回一个异步上下文管理器,以限制用于等待的时间。创建后可用Timeout.reschedule()重新设定超时时间。对超时的协程在事件循环下一次迭代时会被取消。超时的任务会被抛出TimeoutError异常。这是唯一可在上下文管理器捕获的异常。asyncio.timeout可安全地嵌套。

async def main():
    try:
        async with asyncio.timeout(10):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

类asyncio.timeout_at(when)用于指出超时的绝对时间:

async def main():
    loop = get_running_loop()
    deadline = loop.time() + 20
    try:
        async with asyncio.timeout_at(deadline):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

协程asyncio.wait_for(aw, timeout)用于在一个超时范围内等待协程完成。参数aw如果是协程则自动调度为Task。超时抛出TimeoutError异常。

asyncio.create_task 与 loop.create_task 与 asyncio.ensure_future的区别

编辑

创建一个Task一共有3种方法。

从Python 3.7开始可以统一的使用更高阶的asyncio.create_task。其实asyncio.create_task就是用的loop.create_task:

 def create_task(coro):
    loop = events.get_running_loop()
    return loop.create_task(coro)

asyncio.ensure_future除了接受协程,还可以是Future对象或者awaitable对象:

  • 如果参数是协程,其实底层还是用的loop.create_task,返回Task对象
  • 如果是Future对象会直接返回
  • 如果是一个awaitable对象会await这个对象的__await__方法,再执行一次ensure_future,最后返回Task或者Future

所以就像ensure_future名字说的,确保这个是一个Future对象:Task是Future 子类,前面说过一般情况下开发者不需要自己创建Future

在独立线程中运行协程

编辑

Python3.9新增加在一个独立的线程中异步运行函数 func:

asyncio.to_thread(func, /, *args, **kwargs)

所有传递给这个函数的 *args 和 **kwargs 会直接传递给 func。此外,当前的 contextvars.Context 也会被传播,使得可以在独立线程中访问来自事件循环线程的上下文变量。返回一个协程,可以通过 await 来获取 func 的最终结果。这个协程函数主要用于执行IO密集的函数/方法,如果这些函数在主线程中运行,会阻塞事件循环。例如:

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # 注意,time.sleep() 可以被任何阻塞的
    # IO 绑定操作替代,比如文件操作。
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

直接在任何协程中调用 blocking_io() 会阻塞事件循环,导致额外增加1秒的运行时间。通过使用 asyncio.to_thread(),我们可以在一个独立的线程中运行它,而不会阻塞事件循环。

注意:由于全局解释器锁(GIL),asyncio.to_thread() 通常只能用于将IO操作密集的函数变为非阻塞。

对于任务随机出现,需要异步执行它们,可以使用asyncio.Queue()来实现生产者-消费者模式。

queue.join()方法会将进程阻塞起来,释放的条件是_unfinished_tasks计数器清零。该计数器随着put()方法而增加,随着task_done()方法而降低。

自省

编辑

asyncio.current_task: 返回当前运行的Task实例,如果没有正在运行的任务则返回 None。如果 loop 为 None 则会使用 get_running_loop()获取当前事件循环。 asyncio.all_tasks: 返回事件循环所运行的未完成的Task对象的集合。

shield

编辑

asyncio.shield,可以屏蔽取消操作:

task1 = asyncio.shield(func_1())

异步上下文管理器

编辑

异步上下文管理器是一种用于协程(asyncio)的特殊类型的上下文管理器,用于管理异步资源的分配和释放。异步上下文管理器通常与 async with 语句一起使用,以确保在异步代码块执行前分配资源,并在执行后释放资源。

要创建一个异步上下文管理器,需要定义一个类,该类必须实现两个特殊方法 __aenter__ 和 __aexit__。这些方法允许您定义资源的获取和释放逻辑。

__aenter__ 方法:在进入 async with 代码块时调用。通常在这里执行资源的分配或初始化操作。

__aexit__ 方法:在退出 async with 代码块时调用。通常在这里执行资源的释放或清理操作。

async with 语句必须写在一个协程函数中 ,不能在函数外使用

@asynccontextmanager
async def async_timed(func):
    start = time.perf_counter()
    yield await func()
    print(f'Cost: {time.perf_counter() - start}')

 
class MyAsyncContextManager:
    async def __aenter__(self):
        print("Entering the async context")
        return self

    async def __aexit__(self, exc_type, exc, tb):
        """
         with语句运行结束之后触发此方法的运行
         exc_type:如果抛出异常, 这里获取异常类型
         exc_val:如果抛出异常, 这里显示异常内容
         exc_tb:如果抛出异常, 这里显示所在位置, traceback
         """
        print("Exiting the async context") 

async def main():
    async with MyAsyncContextManager() as manager:
        print("Inside the async context")

    async with async_timed(s1) as rv:
        print(f'Result: {rv}')

asyncio.run(main())

给 Task (Future) 添加回调函数

编辑
task = loop.create_task(a())
def callback(future):
    print(f'Result: {future.result()}')
task.add_done_callback(callback)
await task

执行同步函数

编辑

对于阻塞的time.sleep()等同步逻辑怎么利用asyncio实现并发呢?答案是用run_in_executor。

def a():
    time.sleep(1)
    return 'A'


async def b():
    await asyncio.sleep(1)
    return 'B'


def show_perf(func):
    print('*' * 20)
    start = time.perf_counter()
    asyncio.run(func())
    print(f'{func.__name__} Cost: {time.perf_counter() - start}')

async def c1():
    loop = asyncio.get_running_loop()
    await asyncio.gather(loop.run_in_executor(None, a),b() )

show_perf(c1)

loop.run_in_executor()的第一个参数是要传递concurrent.futures.Executor实例的,传递None会选择默认的executor:loop._default_executor

当然我们还可以用进程池做执行器:concurrent.futures.ProcessPoolExecutor()

多线程执行协程

编辑

使用run_coroutine_threadsafe()在其他线程执行协程(这是线程安全的):

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


def shutdown(loop):
    loop.stop()


async def b1():
    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_loop, args=(new_loop,))
    t.start()

    future = asyncio.run_coroutine_threadsafe(a(), new_loop)
    print(future)
    print(f'Result: {future.result(timeout=2)}')
    new_loop.call_soon_threadsafe(partial(shutdown, new_loop))

await b1()

注意:

  • 协程应该从另一个线程中调用,而非事件循环运行所在线程,所以用 asyncio.new_event_loop () 新建一个事件循环
  • 在执行协程前要确保新创建的事件循环是运行着的,所以需要用 start_loop 之类的方式启动循环
  • 创建的新的事件循环跑在一个线程里面,由于 loop.run_forever 会阻塞程序关闭,所以需要结束时杀掉线程,所以用 call_soon_threadsafe 回调函数 shutdown 去停止事件循环

异步迭代器

编辑

实现了__aiter__()和__anext__()方法的对象是异步迭代对象。要求__anext__()必须返回一个awaitable对象。async for会处理异步迭代器的__anext__()方法所返回的可等待对象,直到其引发一个StopAsyncIteration异常。

根本上的误解是期望async for会自动并行化迭代。实际上,它并不这样做,它只是允许对异步源进行顺序迭代。async for的作用是顺序地遍历异步源(如异步生成器或异步迭代器),每次迭代的操作会在异步事件循环中依次执行。异步迭代意味着你可以与事件循环中的其他异步任务(包括其他类似的迭代)并行运行它。普通的for无法进行异步迭代,至少不能在不阻塞事件循环的情况下进行。这是因为普通的for调用__next__作为一个阻塞函数,并且不await其结果。而且你不能手动await通过for获取的元素。for循环依赖__next__引发StopIteration来结束迭代,因此无法直接用await处理异步操作。如果__next__是一个协程,那么在等待其结果之前,StopIteration异常不可见。这就是为什么async for被引入的原因,不仅在Python中,还有其他具有async/await和通用for的语言中。

import asyncio

class Reader:
    """
    自定义异步迭代器(同时也是异步可迭代对象)
    """

    def __init__(self):
        self.count = 0

    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        value = await self.readline()
        if value == None:
            raise StopAsyncIteration
        return value
async def main():
    async for i in Reader():
        print(i)

asyncio.run(main())

async for 并不会创建任务以并发执行。它用于允许对异步源进行顺序迭代。如果想对迭代的对象并发,参加下例:

import asyncio

async def process_all():
    tasks = []

    async for obj in my_async_generator:
        # Python 3.7+. Use ensure_future for older versions.
        task = asyncio.create_task(process_obj(obj))
        tasks.append(task)
    
    await asyncio.gather(*tasks)


async def process_obj(obj):
    ...

asyncio.as_completed

编辑

asyncio.as_completed()是 Python 的 asyncio 库中用于处理异步任务的一个函数。它允许你以异步方式逐个处理一组协程对象(coroutines),并按它们完成的顺序返回结果。这个函数特别适用于你需要在协程列表中逐个处理结果,而不是等到所有协程完成后一次性处理它们的场景。

import asyncio
import time

async def task(n):
    print(f"Task {n} starting")
    await asyncio.sleep(n)  # 模拟耗时操作
    print(f"Task {n} completed")
    return n

async def main():
    tasks = [task(1), task(1), task(1)]  # 创建三个协程任务
    for completed_task in asyncio.as_completed(tasks):
        result = await completed_task
        print(f"Result: {result}")

start = time.perf_counter()
asyncio.run(main())
print(f'Cost: {time.perf_counter() - start}')

上述例子耗时1秒,而不是3秒。

asyncio.TaskGroup

编辑

Python3.11提供了更为现代化的asyncio.TaskGroup类,可替代create_task() 。

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(
            say_after(1, 'hello'))

        task2 = tg.create_task(
            say_after(2, 'world'))

        print(f"started at {time.strftime('%X')}")

    # The await is implicit when the context manager exits.

    print(f"finished at {time.strftime('%X')}")

async with 语句将等待组中的所有任务完成。在等待期间,新的任务仍然可以被添加到组中(例如,通过将 tg 传递给某个协程并在该协程中调用 tg.create_task())。一旦最后一个任务完成并且 async with 块被退出,组中将不能再添加新任务。

如果组中的任何任务第一次遇到除了 asyncio.CancelledError 之外的异常,剩余的任务将会被取消。之后无法再向组中添加新的任务。此时,如果 async with 语句的主体仍在活动中(即,__aexit__() 尚未被调用),直接包含 async with 语句的任务也会被取消。 resulting asyncio.CancelledError 将中断一个 await,但不会从包含的 async with 语句中向外传播。

Eager Task Factory

编辑

Python3.12增加了用于急切任务执行的任务工厂:

asyncio.eager_task_factory(loop, coro, *, name=None, context=None)

当使用这个工厂(通过 loop.set_task_factory(asyncio.eager_task_factory))时,协程在任务构造期间会同步开始执行。只有在协程阻塞时,任务才会被调度到事件循环。这可以提高性能,因为避免了对同步完成的协程进行事件循环调度的开销。一个常见的例子是,使用缓存或备忘录来避免实际的 I/O 操作的协程,这样可以在可能的情况下提高性能。注意:协程的立即执行是一种语义上的变化。如果协程返回或引发异常,则任务不会被调度到事件循环中。如果协程执行阻塞,则任务会被调度到事件循环中。这种变化可能会引入对现有应用程序的行为更改。例如,应用程序的任务执行顺序可能会发生变化。

Python3.12增加了创建一个急切任务工厂:

asyncio.create_eager_task_factory(custom_task_constructor)

类似于 eager_task_factory(),在创建新任务时使用提供的 custom_task_constructor,而不是默认的 Task。custom_task_constructor 必须是一个符合 Task.__init__ 签名的可调用对象。该可调用对象必须返回一个与 asyncio.Task 兼容的对象。此函数返回一个可调用对象,旨在通过 loop.set_task_factory(factory) 作为事件循环的任务工厂使用。

历史

编辑

在最早的Python 3.4中,协程函数是通过@asyncio.coroutine 和 yeild from 实现的, 如下所示。

 import asyncio
 
 @asyncio.coroutine
 def func1(i):
     print("协程函数{}马上开始执行。".format(i))
     yield from asyncio.sleep(2)
     print("协程函数{}执行完毕!".format(i))
 
 if __name__ == '__main__':
     # 获取事件循环
     loop = asyncio.get_event_loop()
 
     # 执行协程任务
     loop.run_until_complete(func1(1))
 
     # 关闭事件循环
     loop.close()

这里我们定义了一个func1的协程函数,我们可以使用asyncio.iscoroutinefunction来验证。定义好协程函数后,我们首先获取事件循环loop,使用它的run_until_complete方法执行协程任务,然后关闭loop。

 print(asyncio.iscoroutinefunction(func1(1))) # True

Python 3.5以后引入了async/await 语法定义协程函数,代码如下所示。每个协程函数都以async声明,以区别于普通函数,对于耗时的代码或函数我们使用await声明,表示碰到等待时挂起,以切换到其它任务。 Python 3.7提供了一个更简便的asyncio.run方法,提供的asyncio.create_task方法

# 方法1:使用ensure_future方法。future代表一个对象,未执行的任务。
 task1 = asyncio.ensure_future(func1(1))
 task2 = asyncio.ensure_future(func1(2))
 
 # 方法2:使用loop.create_task方法
 task1 = loop.create_task(func1(1))
 task2 = loop.create_task(func1(2))
 
 # 方法3:使用Python 3.7提供的asyncio.create_task方法
 task1 = asyncio.create_task(func1(1))
 task2 = asyncio.create_task(func1(2))

创建多个协程任务列表后,我们还要使用asyncio.wait方法收集协程任务,并交由事件循环处理执行。新的asyncio.gather方法,它的作用asyncio.wait方法类似,但更强大。如果列表中传入的不是create_task方法创建的协程任务,它会自动将协程对象封装成协程任务。

     tasks = []
     for i in range(1, 5):
         # 这里未由协程函数创建协程任务
         tasks.append(func1(i))
         
     # 注意这里*号。gather自动将函数列表封装成了协程任务。
     await asyncio.gather(*tasks)

两者更大的区别在协程任务执行完毕后对于返回结果的处理上。通常获取任务执行结果通常对于一个程序至关重要。asyncio.wait 会返回两个值:done 和 pending,done 为已完成的协程任务列表,pending 为超时未完成的协程任务类别,需通过task.result()方法可以获取每个协程任务返回的结果;而asyncio.gather 返回的是所有已完成协程任务的 result,不需要再进行调用或其他操作,就可以得到全部结果。

import asyncio
 
 async def func1(i):
     print(f"协程函数{i}马上开始执行。")
     await asyncio.sleep(2)
     return i
 
 async def main():
     tasks = []
     for i in range(1, 5):
         tasks.append(asyncio.create_task(func1(i)))
         
     # 获取任务执行结果。
     done, pending = await asyncio.wait(tasks)
     for task in done:
         print(f"执行结果: {task.result()}")
 
 if __name__ == '__main__':
     asyncio.run(main())

#-*- coding:utf-8 -*-
 import asyncio
 
 async def func1(i):
     print(f"协程函数{i}马上开始执行。")
     await asyncio.sleep(2)
     return i
 
 async def main():
     tasks = []
     for i in range(1, 5):
         tasks.append(func1(i))
 
     results = await asyncio.gather(*tasks)
     for result in results:
         print(f"执行结果: {result}")
 
 if __name__ == '__main__':
     asyncio.run(main())

gather返回的任务执行结果是按照添加顺序有序的,wait方法获取的结果是无序的。