协程 & asyncio & 异步
1. 协程 (coroutine)
协程(coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块互相切换运行。例如:- def func1():
- print(1)
- ...
- print(2)
- def func2():
- print(3)
- ...
- print(4)
- func1()
- func2()
- greenlet,早期模块。
- yield 关键字。
- asyncio 装饰器(python 3.4)
- async、await 关键字(python 3.5)
1.1 greenlet 实现协程
- from greenlet import greenlet
- def func1():
- print(1) # 第 1 步:输出 1
- gr2.switch() # 第 3 步:切换到 func2 函数
- print(2) # 第 6 步:输出 2
- gr2.switch() # 第 7 步 切换到 func2 函数,从上一次执行的位置继续向后执行
- def func2():
- print(3) # 第 4 步:输出 3
- gr1.switch() # 第 5 步:切换到 func1 函数,从上一次执行的位置继续向后执行
- print(4) # 第 8 步:输出 4
- gr1 = greenlet(func1)
- gr2 = greenlet(func2)
- gr1.switch() # 第 1 步:去执行 func1 函数
1.2 yield 关键字
- def func1():
- yield 1
- yield from func2()
- yield 2
- def func2():
- yield 3
- yield 4
- f1 = func1()
- for item in f1:
- print(item)
复制代码 伪实现,仅能实现协程的功能。
1.3 asyncio
在 python 3.4 及之后的版本。- import asyncio
- @asyncio.coroutine
- def func1():
- print(1)
- yield from asyncio.sleep(2) # 遇到 IO 耗时操作,自动化切换到 tasks 中其它任务
- print(2)
- @asyncio.coroutine
- def func2():
- print(3)
- yield from asyncio.sleep(2) # 遇到 IO 耗时操作,自动化切换到 tasks 中其它任务
- print(4)
- tasks = [
- asyncio.ensure_future(func1())
- asyncio.ensure_future(func2())
- ]
- loop = asyncio.get_event_loop()
- loop.run_until_complete(asyncio.wait(tasks))
复制代码 注意:遇到 IO 阻塞自动切换。
1.4 aynsc & await 关键字
在 python 3.5 及之后的版本。- import asyncio
- async def func1():
- print(1)
- # 网络 IO 请求:下载一张图片
- await asyncio.sleep(2) # 遇到 IO 耗时操作,自动化切换到 tasks 中的其它任务。
- print(2)
- async def func2():
- print(3)
- # 网络 IO 请求:下载一张图片
- await asyncio.sleep(2) # 遇到 IO 耗时操作,自动化切换到 tasks 中的其它任务。
- print(4)
- tasks = [
- asyncio.ensure_future(func1())
- asyncio.ensure_future(func2())
- ]
- loop = asyncio.get_event_loop()
- loop.run_until_complete(asyncio.wait(tasks))
2. 协程的意义
在一个线程中如果遇到 IO 等待时间,线程不会傻等,而是利用空闲时间再去干点其它事情。
案例:下载三张图片(网络 IO):
- 普通方式(同步)
- import requests
- def download_image(url):
- print("开始下载:", url)
- response = requests.get(url)
- print("下载完成")
- file_name = url.rsplit("_")[-1]
- with open(file_name, mode="wb") as file_object:
- file_object.write(response.content)
- url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
- "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
- "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg",
- ]
- for item in url_list:
- download_image(item)
- 协程方式(异步)
- import aiohttp
- import asyncio
- async def fetch(session, url):
- print("发送请求:", url)
- async with session.get(url, verify_ssl=False) as response:
- content = await response.content.read()
- file_name = url.rsplit("_")[-1]
- with open(file_name, mode="wb") as file_object:
- file_object.write(content)
- async def main():
- async with aiohttp.ClientSession() as session:
- url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
- "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
- "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg",
- ]
- tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
- await asyncio.wait(tasks)
- if __name__ == "__main__":
- aynscio.run(main())
3. 异步编程
3.1 事件循环(Event Loop)
理解成一个死循环,去检测并执行某些代码。- task_list = [task1, task2, task3, ...]
- while True:
- executables, completes = [...], [...] # 在 task_list 中检查所有任务,将可执行和已完成返回
- for executable in executables:
- execute executable
- for complete in completes:
- remove complete from task_list
- if task_list == []: # 如果 task_list 中的任务都已完成,则终止循环
- break
复制代码- import asyncio
- # 去生成或获取一个事件循环
- loop = asyncio.get_event_loop()
- # 将任务放到任务列表
- loop.run_until_complete(asyncio.wait(tasks))
3.2 快速上手
协程函数(coroutine function):定义函数时 async def (加上 async 关键字)。
协程对象(coroutine object):执行协程函数得到的协程对象。- async def func():
- pass
- result = func()
复制代码 注意到 result = func() 中 call 了 func(),但并不会执行 func() 内部代码,只是得到了 func() 的协程对象。
若要执行协程函数内部代码,需要事件循环去处理协程函数得到的协程对象。- async def func():
- print("come here.")
- result = func()
- loop = async.get_event_loop()
- loop.run_until_complete(result)
到了 python 3.7 之后,还有更简便的写法:- async def func():
- print("come here.")
- result = func()
- # loop = async.get_event_loop()
- # loop.run_until_complete(result)
- async.run(result) # python 3.7
3.3 await 关键字
await 一般要加上 可等待的对象(协程对象、Future 对象、Task 对象),可以简单理解为 IO 等待(但实际上并没有这么简单)。
示例 1:- import asyncio
- async def func():
- print("come here.")
- response = await asyncio.sleep(2) # 没有什么意义,假设这是一个 IO 等待(例如网络请求)
- print("terminate", response)
- asyncio.run(func())
复制代码 在事件循环内部,执行协程对象 func() 时会先执行 print("come here."),接下来会进入 IO 等待,此时事件循环会跳出 func() 函数去执行其它任务,一旦 response 得到返回值(即结束 IO 等待),事件循环会在下一次循环中检测到 IO 等待已经结束,此刻才会继续执行 func() 后面的代码(即 print("terminate", response))。
示例 2(协程对象之间可以嵌套):- async def others():
- print("start")
- await asyncio.sleep(2)
- print("end")
- return "返回值"
- async def func():
- print("执行协程函数内部代码")
- # 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其它协程(任务)。
- response = await others()
- print("IO 请求结束,结果为:", response)
- asyncio.run(func())
示例 3:- async def others():
- print("start")
- await asyncio.sleep(2)
- print("end")
- return "返回值"
- async def func():
- print("执行协程函数内部代码")
- # 遇到 IO 操作挂起当前协程(任务),等 IO 操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其它协程(任务)。
- response_1 = await others()
- print("IO 请求结束,结果为:", response_1)
- response_2 = await others()
- print("IO 请求结束,结果为:", response_2)
- asyncio.run(func())
复制代码 await 关键字的含义就是,等待对象的值得到返回结果之后再继续向下运行。
3.4 Task 对象
Tasks are used to schedule coroutines concurrently.
When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon.
Tasks 用于并发调度协程,通过 asyncio.create_task(协程对象) 的方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外的,还可以用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。
示例 1(这种代码写得比较少):- import asyncio
- async def func():
- print(1)
- await asyncio.sleep(2)
- print(2)
- return "返回值"
- async def main():
- print("main 开始")
- # 创建协程,将协程封装到一个 Task 对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
- task1 = asyncio.create_task(func())
- task2 = asyncio.create_task(fucn())
- # 当执行某协程遇到 IO 操作时,会自动化切换执行其它任务。
- # 此处的 await 时等待相对应的协程全都执行完毕并获取结果。
- result_1 = await task1
- result_2 = await task2
- print(result_1, result_2)
- asyncio.run(main())
示例 2(这种代码应用得比较多):- import asyncio
- async def func():
- print(1)
- await asyncio.sleep(2)
- print(2)
- return "返回值"
- async def main():
- print("main 开始")
- # 创建协程任务列表
- task_list = [
- asyncio.create_task(func(), name="n1"), # 给 task 命名,会在返回集中显示
- asyncio.create_task(func(), name="n2")
- ]
- # 不能直接把 task_list 以列表的形式加在 await 之后
- # 注意 await 关键字只接受 coroutine object, task object, future object
- # 此处 done 是一个集合,为 task_list 的返回值
- # pending 在 timeout 不为 None 时有意义,timeout 规定了最长等待时间,
- # 如果超过 timeout,那么还未完成的任务将添加到 pending 中。
- done, pending = await asyncio.wait(task_list, timeout=1)
- print(done)
- asyncio.run(main())
示例 3:- import asyncio
- async def func():
- print(1)
- await asyncio.sleep(2)
- print(2)
- return "返回值"
- # 创建协程任务列表
- task_list = [
- asyncio.create_task(func(), name="n1"), # 给 task 命名,会在返回集中显示
- asyncio.create_task(func(), name="n2")
- ]
- done, pending = asyncio.run(asyncio.wait(task_list))
- print(done)
注意到以上代码会导致程序报错。原因是:asyncio.create_task() 会将协程对象立即添加到事件循环中,但是,事件循环是在 asyncio.run() 中被创造,因此此时并不存在事件循环。应该如此修改:- import asyncio
- async def func():
- print(1)
- await asyncio.sleep(2)
- print(2)
- return "返回值"
- # 创建协程对象列表
- task_list = [
- func(),
- func()
- ]
- # 此时 asyncio 会在创建事件循环之后,在内部将 task_list 中的协程对象添加到事件循环中
- done, pending = asyncio.run(asyncio.wait(task_list))
- print(done)
3.5 Future 对象
Future 类是 Task 类的父类,即 Task 类继承自 Future 类,Task 对象内部 await 结果的处理基于 Future 对象而来。
A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation.
示例 1:- import asyncio
- async def main():
- # 获取当前事件循环
- loop = asyncio.get_running_loop()
- # 创建一个任务(Future 对象),这个任务什么都不干。
- future = loop.create_future()
- # 等待任务最终结果(Future 对象),没有结果则会一直等下去。
- await future
- asyncio.run(main())
复制代码 在上述代码中,由于创建的 Future 对象什么也不干,因此 await future 将一直卡住,无法获得返回结果,所以上述代码是没有实际意义的。但注意,如果某一个时刻突然给 future 赋值,那么 future 立刻可以获得返回结果,并且跳出 await。
示例 2(没什么意义,用于理解 Future 对象的作用,即帮助我们等待结果):- async def set_after(future):
- await asyncio.sleep(2)
- future.set_result("666")
- async def main():
- # 获取当前事件循环
- loop = asyncio.get_running_loop()
- # 创建一个任务(Future 对象),没有绑定任何行为,则这个任务永远不知道什么时候结束。
- future = loop.create_future()
- # 创建一个任务(Task 对象),绑定了 set_after 函数,函数内部在 2s 之后会给 future 赋值。
- # 即手动设置 future 任务的最终结果,那么 future 就可以结束了。
- await loop.create_task(set_after(future))
- # 等待 Future 对象获取最终结果,否则一直等待下去。
- data = await future
- print(data)
- asyncio.run(main())
3.6 concurrent 中的 Future 对象
首先注意到,concurrent 中的 Future 对象(concurrent.futures.Future)和 asyncio 中的 Future 对象没有关系。concurrent 中的 Future 对象是当使用线程池、进程池实现异步操作时使用到的对象。- import time
- from concurrent.futures import Future
- from concurrent.futures.thread import ThreadPoolExecutor
- from concurrent.futures.process import ProcessPoolExecutor
- def func(value):
- time.sleep(1)
- return value
- # 创建线程池
- pool = ThreadPoolExecutor(max_workers=5)
- # 或创建进程池
- # pool = ProcessPoolExecutor(max_workers=5)
- for i in range(10):
- # 让 pool 拿出一个线程去执行 func 函数
- future = pool.submit(func, i)
- print(future)
复制代码 实际中可能会存在两种 Future 对象交叉使用。例如:crm 项目中 80% 都基于协程异步编程 + MySQL,但 MySQL 不支持异步,因此在 MySQL 中使用进程池、线程池做异步编程。
示例 1:- import time
- import asyncio
- import concurrent.futures
- def func1():
- # 某个耗时操作
- time.sleep(2)
- return "complete"
- async def main():
- loop = asyncio.get_running_loop()
- # 1. Run in the default loop's executor (default to ThreadPoolExecutor)
- # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去
- # 执行 func1 函数,并返回一个 concurrent.futures.Future 对象
- # 第二步:调用 asyncio.wrap_future 将 concurrent.future.Future 对象
- # 包装为 asyncio.Future 对象。
- # 因为 concurrent.futures.Future 对象不支持 await 语法,所以需要包装为
- # asyncio.Future 对象才能使用。
- future = loop.run_in_executor(None, func1) # 返回一个 Future
- # 上面这一步内部会调用 asyncio.wrap_future 将返回的 concurrent.futures.Future
- # 对象转换为 asyncio.Future 对象
- # 默认 None 意味着创建线程池,若想使用进程池请参考以下注释代码
- result = await future
- print("default thread pool", result)
- # 2. Run in a custom thread pool:
- # with concurrent.futures.ThreadPoolExecutor() as pool:
- # result = await loop.run_in_executor(pool, func1)
- # print("custom thread pool", result)
- # 3. Run in a custom process pool:
- # with concurrent.futures.ProcessPoolExecutor() as pool:
- # result = await loop.run_in_executor(pool, func1)
- # print("custom process pool", result)
- asyncio.run(main())
案例:asyncio + 不支持异步的模块(爬虫)- import asyncio
- import requests
- async def download_image(url):
- # 发送网络请求,下载图片(遇到网络下载图片的 IO 请求,自动化切换到其它任务)
- print("开始下载:", url)
- loop = asyncio.get_event_loop()
- # requests 模块默认不支持异步操作,所以就用线程池配合实现了
- future = loop.run_in_executor(None, requests.get, url)
- response = await future
- print("下载完成")
- # 图片保存到本地文件
- file_name = url.rsplit("_")[-1]
- with open(file_name, mode="wb") as file_object:
- file_object.write(response.content)
- if __name__ == "__main__":
- url_list = [ "https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg",
- "https://www3.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg",
- "https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vk0sGY913.jpg",
- ]
- tasks = [download_image(url) for url in url_list]
- loop = asyncio.get_event_loop()
- loop.run_until_complete(asyncio.wait(tasks))
复制代码 耗费资源更大,不得已而为之。
3.7 异步迭代器
- 什么是异步迭代器?
实现了 __aiter__() 和 __anext__() 方法的对象。__anext__() 必须返回一个 awaitable 对象。async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。由 PEP 492 引入。
- 什么是异步可迭代对象?
可在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator。由 PEP 492 引入。
示例:- import asyncio
- class Reader(object):
- """
- 自定义异步迭代器(同时也是异步可迭代对象)
- """
- def __init__(self):
- self.count = 0
- async def readline(self):
- # await asyncio.sleep(1)
- self.count += 1
- if self.count == 100:
- return None
- return self.count
- def __aiter__(self):
- return self
- async def __anext__(self):
- val = await self.readline()
- if val is None:
- raise StopAsyncIteration
- return val
- # 以下代码会报错,因为 async for 必须写在协程函数内。
- # obj = Reader()
- # async for item in obj:
- # print(item)
- async def func():
- obj = Reader()
- async for item in obj:
- print(item)
- asyncio.run(func())
3.8 异步上下文管理器
- 什么是异步上下文管理器?
此种对象通过定义 __aenter__() 和 __aexit__() 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。
示例:- import asyncio
- class AsyncContextManager(object):
- def __init__(self):
- self.conn = conn
- async def do_something(self):
- # 异步操作数据库
- return 666
- async def __aenter__(self):
- # 异步连接数据库
- self.conn = await asyncio.sleep(1) # 可以换成连接数据库代码
- return self
- async def __aexit__(self, exc_type, exc, tb):
- # 异步关闭数据库链接
- await asyncio.sleep(1)
- # 以下代码会报错,因为 async with 必须写在协程函数内。
- # obj = AsyncContextManager()
- # async with obj:
- # result = await obj.do_something()
- # print(result)
- # 或者
- # async with AsyncContextManager() as f:
- # result = await f.do_something()
- # print(result)
- async def func():
- async with AsyncContextManager() as f:
- result = await f.do_something()
- print(result)
- asyncio.run(func())
4. uvloop
uvloop 是 asyncio 事件循环的替代方案,可以提高事件循环效率,性能接近于 go 语言。- import asyncio
- import uvloop
- asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
- # Your asyncio code here.
- # 内部的事件循环自动会变为 uvloop
- asyncio.run()
复制代码 注意:asgi 是支持异步的 wsgi 网关接口(e.g. uvicorn,内部使用的就是 uvloop)。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |