Python异步编程(上):协程和任务

打印 上一主题 下一主题

主题 912|帖子 912|积分 2736

原文链接: https://mp.weixin.qq.com/s/dQOocc7wHaGv7_cf476Ivg
介绍

了解异步编程前先了解一些概念:协程(coroutine)、任务(task)和变乱循环(event loop),在3.7从前还需要关心Future这个东西,不外之后提供的高级API弱化了这个概念,你基本不需要关心Future是什么。
协程

协程等于一个人在工作之间切换,而线程则是等于两个人在工作(先不提GIL),比方烧水、煮饭还有切菜,只有一个人的话,你会把锅装满水放到炉子上烧,然后再去煮饭。而有的人他就是要等到水烧开了再去煮饭,这样要快速完成只能再叫一个人。
IO

这个烧水的过程被称为IO操作,在Python中像网络请求、文件读写等都是IO操作,特别是网络请求,等待服务器返回数据这个过程耗时黑白常长的,但它实际上不需要cpu的参与,只是单纯的等待。其实print也是IO操作,但是它耗时非常短基本不会影响,也就不需要在弄一个aioprint出来。
像切菜这种完全离不开人的工作则被称为cpu密集型程序,在Python中跟计算相关的程序都离不开cpu,这种使用异步操作就无法节省时间,必须得加一个人来做。
协程
  1. async def func():
  2.     pass
复制代码
func和func()都是协程,异步函数(协程函数)被调用只是返回协程对象,不会立刻实行。
任务

任务只是对协程的封装,用于处理协程的取消和获取结果等,创建任务一般都是通过asyncio.create_task来创建,比方
  1. task = asyncio.create_task(func)
复制代码
注意这个时间func已经在运行了,你可以使用result = await task大概asyncio.wait_for等待它运行完成,也可以通过task.cancel()来取消它
变乱循环

变乱循环这个概念没什么可说的,其实就是等于一个人在工作,每个线程只能运行一个变乱循环。可以在多线程中创建多个变乱循环,然后通过asyncio.run_coroutine_threadsafe指定协程运行在哪个变乱循环里
使用

运行变乱循环

创建并运行变乱循环一般是通过asyncio.run来实现的,不外有时间这个函数会有一些奇怪的错误。这个时间就需要使用低级API创建并运行
  1. loop = asyncio.get_event_loop()
  2. loop.run_until_complete(main())
复制代码
在这里get_event_loop相当于下面两行
  1. loop = asyncio.new_event_loop()
  2. asyncio.set_event_loop(loop)
复制代码
简单例子

下面的代码等同于同步编程
  1. import asyncio
  2. import time
  3. async def say_after(delay, what):
  4.     await asyncio.sleep(delay)
  5.     print(what)
  6. async def main():
  7.     print(f"started at {time.strftime('%X')}")
  8.     await say_after(1, 'hello')
  9.     await say_after(2, 'world')
  10.     print(f"finished at {time.strftime('%X')}")
  11. asyncio.run(main())
复制代码
一起运行
  1. import asyncio
  2. import time
  3. async def say_after(delay, what):
  4.     await asyncio.sleep(delay)
  5.     print(what)
  6.     return delay
  7. async def main():
  8.     # 创建并执行任务
  9.     task1 = asyncio.create_task(
  10.         say_after(1, 'hello'))
  11.     task2 = asyncio.create_task(
  12.         say_after(2, 'world'))
  13.     print(f"started at {time.strftime('%X')}")
  14.     # 等待任务完成
  15.     print(await task1)
  16.     print(await task2)
  17.     print(f"finished at {time.strftime('%X')}")
  18.    
  19. asyncio.run(main())
复制代码
也可以通过asyncio.gather来实现,函数原型: asyncio.gather(*aws, loop=None, return_exceptions=False)
  1. import asyncio
  2. import time
  3. async def say_after(delay, what):
  4.     await asyncio.sleep(delay)
  5.     print(what)
  6.     return delay
  7. async def main():
  8.     print(f"started at {time.strftime('%X')}")
  9.     print(await asyncio.gather(say_after(1, 'hello'), say_after(2, 'world')))
  10.     print(f"finished at {time.strftime('%X')}")
  11.    
  12. asyncio.run(main())
复制代码
gather也可以用来等待任务
  1. import asyncio
  2. import time
  3. async def say_after(delay, what):
  4.     await asyncio.sleep(delay)
  5.     print(what)
  6.     return delay
  7. async def main():
  8.     # 创建并执行任务
  9.     task1 = asyncio.create_task(
  10.         say_after(1, 'hello'))
  11.     task2 = asyncio.create_task(
  12.         say_after(2, 'world'))
  13.     print(f"started at {time.strftime('%X')}")
  14.     # 等待任务完成
  15.     print(await asyncio.gather(task1, task2))
  16.     print(f"finished at {time.strftime('%X')}")
  17.    
  18. asyncio.run(main())
复制代码
asyncio.wait_for

函数原型: asyncio.wait_for(aw, timeout, *, loop=None)
wait_for和直接await只有一个区别,可以指定超时时间
  1. import asyncio
  2. async def eternity():
  3.     # Sleep for one hour
  4.     await asyncio.sleep(3600)
  5.     print('yay!')
  6. async def main():
  7.     try:
  8.         await asyncio.wait_for(eternity(), timeout=1.0)
  9.     except asyncio.TimeoutError:
  10.         print('timeout!')
  11. asyncio.run(main())
复制代码
asyncio.wait

函数原型: asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
wait也是一起实行多个协程,与gather的区别可以从参数和返回值看出来,详细区别如下:

  • wait一般是用来等待create_task创建的任务,而gather可以直接运行协程
  • gather直接返回所有协程的结果(顺序也一样),而wait返回的是所有协程的运行状态
  • wait给定timeout并超时后,不会引发asyncio.TimeoutError非常,而是返回所有协程的运行状态
  • wait可以指定return_when,当某个协程实行完成或触发非常时立刻返回运行状态,而gather需要等待所有协程完成
return_when有三个值:

  • FIRST_COMPLETED: 其中某个协程实行完成或被取消时返回
  • FIRST_EXCEPTION: 其中某个协程触发非常
  • ALL_COMPLETED: 所有协程都完成大概被取消
  1. import asyncio
  2. import time
  3. async def say_after(delay, what):
  4.     await asyncio.sleep(delay)
  5.     print(what)
  6.     return delay
  7. async def main():
  8.     # 创建并执行任务
  9.     task1 = asyncio.create_task(
  10.         say_after(1, 'hello'))
  11.     task2 = asyncio.create_task(
  12.         say_after(2, 'world'))
  13.     print(f"started at {time.strftime('%X')}")
  14.     # 等待任务完成
  15.     done, pending = await asyncio.wait([task1, task2])
  16.     for task in done:
  17.         print(task.result())
  18.     print(f"finished at {time.strftime('%X')}")
  19.    
  20. asyncio.run(main())
复制代码
asyncio.as_completed

等待所有协程完成,以完成顺序做迭代器
  1. import asyncio
  2. import time
  3. async def say_after(delay, what):
  4.     await asyncio.sleep(delay)
  5.     print(what)
  6.     return delay
  7. async def main():
  8.     # 创建并执行任务
  9.     task1 = asyncio.create_task(
  10.         say_after(1, 'hello'))
  11.     task2 = asyncio.create_task(
  12.         say_after(2, 'world'))
  13.     print(f"started at {time.strftime('%X')}")
  14.     # 等待任务完成
  15.     for core in asyncio.as_completed([task1, task2]):
  16.         print(await core)
  17.     print(f"finished at {time.strftime('%X')}")
  18.    
  19. asyncio.run(main())
复制代码
asyncio.all_tasks

这个没啥说的,就是返回当前所有任务列表
  1. import asyncio
  2. async def say_after(delay, what):
  3.     await asyncio.sleep(delay)
  4.     print(what)
  5.     return delay
  6. async def main():
  7.     task1 = asyncio.create_task(
  8.         say_after(1, 'hello'), name="task1")
  9.     print(asyncio.all_tasks())
  10.     task2 = asyncio.create_task(
  11.         say_after(2, 'world'), name="task2")
  12.     print(asyncio.all_tasks())
  13.     await task1
  14.     print(asyncio.all_tasks())
  15.     await task2
  16.     print(asyncio.all_tasks())
  17.    
  18. asyncio.run(main())
复制代码
asyncio.Task


  • cancel(msg=None): 取消
  • cancelled(): 如果任务被取消,则返回True
  • done(): 如果任务完成则返回True
  • result(): 返回任务的结果
  • exception(): 返回任务的非常
  • add_done_callback(callback, *, context=None): 添加任务完成时的回调
  • remove_done_callback(callback): 从回调列表中删除回调
  • get_stack(*, limit=None): 返回此任务的堆栈帧列表
  • print_stack(*, limit=None, file=None): 打印此任务的堆栈或回溯
  • get_coro(): 返回由Task包装的协程对象
  • get_name(): 返回任务的名称
  • set_name(value): 设置任务的名称
asyncio.iscoroutine(obj)

如果obj是协程对象,则返回True
  1. import asyncioasync def func():
  2.     passasync def main():    print("iscoroutine1: ", asyncio.iscoroutine(func))    a = func()    print("iscoroutine2: ", asyncio.iscoroutine(a))    b = await a    print("iscoroutine3: ", asyncio.iscoroutine(b))asyncio.run(main())
复制代码

asyncio.iscoroutinefunction(func)

如果func是协程函数,则返回True
  1. import asyncioasync def func():
  2.     passasync def main():    print("iscoroutine1: ", asyncio.iscoroutinefunction(func))    a = func()    print("iscoroutine2: ", asyncio.iscoroutinefunction(a))    b = await a    print("iscoroutine3: ", asyncio.iscoroutinefunction(b))asyncio.run(main())
复制代码

本文由博客一文多发平台 OpenWrite 发布!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

尚未崩坏

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表