Python asyncio异步编程入门

针对该视频的一篇笔记

先决条件:协程、并发

正文

这次讲的是asyncio最新的用法,然后是Python 3. 7 以上才支持的,也是推荐大家学新不学旧。

asyncio并不能提升你的运算速度,它比较适合处理那些需要等待的任务^(应用场景)^,最典型的就是网络的通讯

那 asyncio的运算核心其实就是一个event loop,它就像一个大脑,它面对着很多可以执行的任务,然后决定执行哪个任务。那在 Python 的asyncio里,同时执行的任务只能有一个,它不存在系统级的上下文切换,它跟线程不一样,它需要每一个任务主动告诉event loop,我这边结束了,你可以让别的任务开始了。因此它有一个好处,就是不存在竞争冒险这样的问题,你可以明确的知道我每一个任务什么时候停止运算了。

那想要使用asyncio第一件事就是理解什么叫coroutine,什么叫task。

coroutine在 Python 的语境里,我们一般会指两种东西,一个是coroutine function,一个是coroutine object。

import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print( 'world')

coro = main()

这个 async def main 就是一个coroutine function。Python 里面所有async def main开头的东西都叫coroutine function,它定义了一个 coroutine 的过程。那 Python 一般的函数,当我们调用它的时候,它返回的是这个函数的返回值,对不对?但是所有的 coroutine function,当你调用的时候,它返回的是一个coroutine object,它的本质跟生成器函数有些像。

就是当你在调用main的时候,它并不会运行 main 里面的程序,它只会返回一个coroutine object。

好,那怎么运行coroutine的代码?我们需要两件事,第一是进入async的模式,也就是进入这个 event loop,开始控制整个程序的状态。第二就是把coroutine变成task。

我们先说如何进入 async 模式,那我们正常写Python代码运行的时候,如果我们管它叫 synchronized 模式的话,那么在 synchronize 模式下切换到 asynchronized 模式,切换到让这个 event loop 开始控制一切。基本上我们只用一个入口函数叫做asyncio.run,那 asyncio.run,它的参数是一个coroutine,它会做两件事儿,第一它会建立起这个 event loop,第二它会把这个 coroutine 变成这个 event loop 里面的一个task,那 event loop 建立之后,他就会去找哪个 task 可以执行,那当然只有一个task,所以它就会开始运行 run 给进来的这个coroutine。

import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print( 'world')

asyncio.run(main())

我们试一下,大家可以看到它先 print 了hello,然后等了 1 秒钟,然后 print 了world,那这个asyncio.run就是我们从 synchronize 模式变到 asynchronize 模式的入口,它的参数是一个coroutine。

那只有一个 task 是没有什么意思的,对不对?我们刚才也说过, event loop 的核心是它有很多很多个task,然后它来决定哪个 task 要运行。所以我们很重要的一件事情是,当我们已经处于 async 模式下的时候,我们如何增加task?接下来我们就介绍几个把coroutine变成task,让它可以排队执行的方法。

import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%x')}")
await say_after(1, "hello")
await say_after(2,"world")
print(f"finished at {time.strftime('%x')}")

asyncio.run(main())

大家看,我们有一个 say_after 函数,它显然是一个coroutine,对不对?它前面是async def,那实际上asyncio.sleep,它返回的东西也是一个 core routine,那这里就涉及到了第一个把coroutine变成 task 的方法,就是用await。

那么当你 await 一个coroutine的时候(上述代码第8行),发生了如下几件事,第一,这个coroutine被包装成一个task,并且被告诉了 event loop,说这块儿有一个新的task。第二,它会告诉 event loop,我现在这个 task (main)需要等到say_after task 完成之后,我才能继续它,建立了这么一个依赖关系。第三,它会 yield 出去,也就是它会告诉 event loop,我这个 task 现在干不了了,你先让别的 task 干活去吧。最后当 event loop 再次安排它运行的时候,它会把 say after 这个say_after 里面真正的返回值拿出来保存起来。当然现在我们这里await前面没有写,因为我们的 say after 是没有返回值的,好,那我们运行一下这段程序,我们可以看到差不多间隔一秒 print 一个hello,然后在两秒钟之后 print 了一个world。

我们来还原一下整个过程发生了一个什么事情。首先 asyncio.run把这个main作为一个 task 给放到了 event loop 里。 event loop 寻找task,他发现只有一个 task main,然后他就开始让这个 task main运行。may 你在运行的时候,首先 print 了一个 start at,然后它运行了 say_after 这个coroutine function,得到了一个coroutine object。await 把这个coroutine object变成了一个task,放回了 event loop 里,同时告诉 event loop 我需要等待它,然后把控制权交还给了 event loop。

现在 event loop 里面有两个task,一个是main,一个是这个 say_after,但是main运行不了, main说我要等这个 say after,那event loop,就让这个say_after先运行。

await asyncio.sleep(delay)​ 这里其实做了一个很相似的事情,也是把它变成了一个task,然后告诉 event loop 说我得等这个 sleep 完成了我才能运行。然后 await 又把这个控制权转回给了 event loop。

event loop 一看现在有三个 task 对不对?有一个main,有一个 say_after,还有一个sleep,那这个 sleep 会告诉 event loop 说我一秒钟之后我就好了,所以这个 event loop 就等了一秒钟。一秒钟之后这个 sleep 就完成了。然后 event loop 看现在有两个task,一个main,一个 say after,这个 main 要等 say_after,但是这个 say after 等的东西已经完成了,于是 event loop 就说 say_after 你来运行吧,然后 say_after printed 一个what,然后 say_after 也完成了。于是又把控制权交还给 event loop。

event loop 一看,哎,现在只有一个 task 了,就成 main 了,然后再把控制权给main,这个时候 main就把第一个 say_after 完成了,然后再做第二个 say_after 做同样的事,又等了 2 秒之后它完成,所以整个过程用了 3 秒。

那在这个过程中,所有控制权的返回都是显式的,就是​ event loop 并没有办法强行的从一个 task 里边拿回控制权,必须要这个 task 主动把控制权交回去。交回去的方式有两种,第一个是await,会交回,第二个,当这个函数运行完毕之后,它会交回。所以说如果你有一个 task 里面有一个死循环,你的整个 event loop 就卡死了。

好,那刚才的运行过程中你可能发现了一个问题,对不对?这个 hello 要等 1 秒,这个 world 要等 2 秒,它俩为啥不能一起等呢?这就应该是我们协程的意义,对不对?在你第一个 hello 等 1 秒的时候,为什么这个 event loop 不能直接让这个 world 运行,然后你俩一起等着, 2 秒之后不就结束了吗?那这个呢?就是我们直接用 await 把一个 cooutine 变成 task 可能遇到的问题,因为 await 需要做的事太多了,它要变成task,它变成 task 之后会主动把控制权交出去,还需要等,这样后面的代码必须要等到它完成之后才能变成一个task。

那为了解决这个问题, asyncio给我们提供了 create task 函数。

import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(say_after(1,"hello"))
task2 = asyncio.create_task(say_after(2,'world'))
print(f"started at {time.strftime( '%x')}")
await task1
await task2
print(f"finished at {time.strftime('%x')}")

我们看这里用到的 create_task 函数,这个函数的参数也是一个coroutine。依然记住这个 say_after 调用之后返回的是一个coroutine,它不会运行任何 coroutine 里面的代码,那这个 create_task 函数会把这个coroutine变成一个task,并且把这个 task 注册到 event loop 里面。也就是说它分担了刚才 await 的一部分功能,它把它包装成了一个task,然后告诉 event loop 说OK,这个 task 已经可以开始执行了,但是现在 event loop 并没有办法执行这个task,因为控制权还在 main 的手里,对不对?那这个时候 main 趁着自己有控制权,他就做了第二个task。

task 2 也是告诉 main 这还有一个新的task,这叫 say_after 2 也可以运行了,那在这儿之后他才开始 await task_1 await task_2 那我们之前说过 await 后面是一个coroutine的时候,它有那么多个功能对不对?把它变成一个task,然后把控制权交出去,等它拿返回值。那当 await 后面是一个 task 而不是一个coroutine 的时候,它就省略了把 coroutine变成 task 这一步,它就只是告诉 event loop 我需要这个 task 完成,我把控制权交还给你,并且在控制权回来的时候,从这个 task 里面提取所需要的返回值。

好,我们跑一下这个程序试一下。我们看这一次这个程序就只用了 2 秒钟,因为当它 await task 1的时候, event loop 里面实际上已经有了 3 个task,是 main、task 1和 task 2。当 task 1跟 event loop 说完说我要等一秒钟才能完成,之后 event loop 闲来无事就能发现,这有一个 task 2 还可以执行,于是他又执行了 task 2,再跟他说我要两秒钟之后才能结束,这样两个 task 就可以同时进行等待了。

这也就是为什么asyncio很适合解决一些网络通讯的问题,因为网络通讯很多时间是在等待上的,也就是我们所谓的IO task,就是我真正的运算挺少的,我就是等他的回复,那等待的这个时间就可以其他的任务去干活。那这个函数我们没有涉及coroutine的return,所以我们把这个函数稍微修改一下。

import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
return f"(what) - (delay)"
async def main():
task1 = asyncio.create_task(say_after(1, "he11o"))
task2 = asyncio.create_task(say_after(2, "world"))
print(f"started at {time.strftime( '%x')}")
ret1 = await task1
ret2 = await task2
print(ret1)
print(ret2)
print(f"finished at {time.strftime( '%x')}")
asyncio.run(main())

我们看我们把函数修改了一下,主要就是告诉大家怎么拿到coroutine的 return。那当你要拿这个返回值的时候,一定是一个 variable 等于 await task,而不是一个 variable 等于 task。我们之前强调过, await 有一个功能是把这个 task 或者coroutine的这个返回值拿出来,如果你不用 await 是拿不到这个值的。

那有的人看到这可能说了,那如果我有很多 task 呢?我有,比如说 10 个task,我就写 10 行 await task 吗?这是不是太蠢了?正因如此, asyncio给我们提供了一个函数叫做gather。我们看 gather 这个函数,它不是一个coroutine,但是它会返回一个叫做 future 的东西,这个 future 也是可以用 await 的,那 gather 这个函数作用,首先它的参数是若干个 coroutine或者task,这俩都可以,甚至可以是future,也就是 gather 的 return 值。它还可以接着gather,那如果是coroutine的话,它首先会把它包装成task,并且注册到 event loop 里,然后它会返回一个 future 值。当你 await 这个 future 的时候,你就相当于告诉 event loop,我要等待这里面每一个 task 都完成,我才可以继续,同时会把这些 task 的 return 值放到一个 list 里,然后返回来。

import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
return f"(what) - {delay}"
async def main():
print(f"started at {time.strftime( '%x')}")
ret = await asyncio.gather(say_after(1, 'hello'),
say_after(2, "world"))
print(ret)
print(f"finished at {time.strftime( '%x')}")
asyncio.run(main())

我们看一下这程序的结果,可以看到这次的这个 return value 是一个list,里面分别是 hello和world,那么这个 list 顺序和里面 task 的顺序是一致的,这样呢,我们就不用一个个 task await 了。

那 gather 还有一个好处就是如果你给的是coroutine的话,它会把每一个coroutine首先都变成task,也就是说我们可以不用自己手动先去建立这些 task 了。我们看一下上述代码结果,可以看到这种方式在拿到了正确的返回值的同时,它也只用了 2 秒。因为 gather 首先把这两个 coroutine 都变成了task,然后在 main 做 await 的时候才把控制权交回给了 event loop,这样 event loop 在分配哪一个 task 可以工作的时候,它就有更多的 task 可以分配了。

尽管asyncio里面还有不少的功能,但是掌握了这几个,基本上你就理解了asyncio的核心理念了。那我再强调一下前面提到过的几个重点。

总结

  1. 要建立一个 event loop 的概念,上面儿有一个 event loop 作为大脑,下面是若干可执行的task。 task 是没有办法控制 event loop 去执行某一个 task 的,它只能告诉 event loop 说我在等这个task。最终是由 event loop 来决定下面要运行哪个task,而 event loop 一旦开始运行task,就必须要 task 显示的把控制权交还给 event loop。交还控制权的方式由 await 和函数运行完毕。所以尽管我们会说这种协程的方式是并发的,但是同时刻实际上只有一段代码在跑,它只是想办法再利用这些代码中间的等待时间。所以如果你的代码里面并没有等待这件事的话,协程对你的代码是没有帮助的。
  2. 务必分清什么是coroutine,什么是task。 coroutine 只有变成了 task 才开始被执行。要能知道什么时候 coroutine 被隐式的变成了task,在你直接await coroutine 的时候,给你放到 gather 里的时候,你放进去的都是coroutine,但是它们会偷偷把它变成 task 执行coroutine,不变成 task 是没有办法执行的。
  3. 最后拿到一个 coroutine 的返回值是需要用 await 的。一个变量等于 await xxx。

文章作者: Met Guo
文章链接: https://guoyujian.github.io/2023/09/03/Python-asyncio%E5%BC%82%E6%AD%A5%E7%BC%96%E7%A8%8B%E5%85%A5%E9%97%A8/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Gmet's Blog