使用 FastAPI、Celery 和 RabbitMQ 的异步架构
使用 FastAPI、Celery 和 RabbitMQ 的异步架构https://i-blog.csdnimg.cn/blog_migrate/b515e5e4e9260654804ded09045842ba.png
使用 RabbitMQ 和 Celery 的 FastAPI
在我之前的一篇教程中,我们已经相识了怎样使用Async IO优化FastAPI应用步调的性能。 要相识更多信息,可以参考使用 FastAPI 进行并发。
当我们想要执行小型 BackgroundTask 或使用并发和并行执行任务时,异步 IO 是符合的。但是,当执行繁重的后台计算或复杂任务时,我们抱负情况下不希望将它们作为同一进程的一部分来运行。因此,为了将这些复杂的任务作为单独的过程执行,我们需要专门的工具,比方芹菜。
Celery是一个分布式任务队列,可简化任务分配和处置处罚的管理。任务队列用作跨线程或呆板分配工作的机制。任务队列的输入是一个称为任务的工作单元。专用工作进程不停监视任务队列以执行新工作。
作为任务排队系统,Celery 非常得当长时间运行的进程或批量执行的小型可重复任务。Celery 处置处罚的问题类型是常见的异步任务。
Celery通过消息进行通讯,通常使用代理在客户端和工作器之间****进行调解。要启动任务,客户端会将消息添加到队列,然子女剖析将该消息转达给工作器。RabbitMQ和Redis是 Celery 完全支持的代理传输。在我们的示例中,我们将使用 RabbitMQ 作为代理传输。
RabbitMQ是部署最广泛的开源消息代理。RabbitMQ 轻量级且易于在本地和云中部署。它支持多种消息转达协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性要求。RabbitMQ 是 Celery 的默认代理,因此除了我们要使用的代理实例的 URL 位置之外,它不需要任何其他依赖项或初始配置。
有多种工具可用于监控和查抄 Celery 集群。Flower是一款用于Celery的实时 Web 应用步调监控和管理工具。
下图表现了各个组件怎样交互的简化图。在这里,我们使用 FastAPI 作为 Celery 客户端,使用 RabbitMQ 作为消息代理和结果后端。
https://i-blog.csdnimg.cn/blog_migrate/632081ab8b4cf0fd3cf01045ae966b5e.png
使用 Celery Flow 的 FastAPI
[*]客户端向我们的 FastAPI 应用步调发送请求。
[*]FastAPI 应用步调将任务消息发送到消息代理。
[*]Celery 工作器从消息代理中消费消息。任务完成后,它将结果保存到结果后端并更新任务状态。
[*]将任务发送到消息代理后,FastAPI 应用还可以从结果后端监控任务的状态。
[*]Flower 还可以通过处置处罚消息代理上的消息来监控芹菜应用步调任务。
在本教程中,我们将相识怎样将Celery集成到FastAPI应用步调中以执行异步任务而不阻止用户的请求。虽然我们在这里使用的示例非常简单,但它仅用于纯粹的演示目的。
带有 FastAPI 的 Celery 应用步调可能会有用的一些用例:
[*]在应用步调中将电子邮件作为后台任务发送电子邮件。
[*]在后台处置处罚上传的图像。
[*]ML 模子的离线训练。
[*]定期任务,比方报告天生或网页抓取。
以下步骤阐明怎样使用 Celery 和 FastAPI 执行异步任务:
[*]设置和安装
[*]设置消息代理
[*]添加芹菜
[*]添加 Celery 任务
[*]添加 APIRouter
[*]启动应用步调和 Celery Worker 服务器
[*]测试应用步调
[*]监控任务
先决条件
我们需要安装有Pipenv和Git 的Python 3.9.2。Pipenv是一个软件包和假造环境管理器,可PIP在后台使用。它提供更多高级功能,比方版本锁定和项目之间的依赖隔离。
1. 设置和安装
一旦满足先决条件,我们就可以开始创建我们的应用步调。
要开始我们的应用步调,请fastapi-celery-rabbitmq-application在磁盘上的任意目录中为我们的项目创建一个名为 的文件夹。
https://i-blog.csdnimg.cn/blog_migrate/ac21b6561bc1498d43bfcb4c3fe3af94.png
创建项目文件夹
导航到项目文件夹。
激活假造环境
进入项目文件夹后,执行以下命令来激活 VirtualEnv。
https://i-blog.csdnimg.cn/blog_migrate/a81b1e5cbe1bd0d4e5cfa7150f5e4c75.png
启动 pipenv shell
假造环境现在将被激活,它将提供所需的项目隔离和版本锁定。
安装依赖项
接下来,我们将使用 requirements.txt 中的 Pipenv 安装全部必需的依赖项。
https://i-blog.csdnimg.cn/blog_migrate/19bbc1e44df71e51688a88d48cbb9b49.png
我们的应用步调中使用的全部依赖项
https://i-blog.csdnimg.cn/blog_migrate/68a2b59ed06121c633df675d84d2502f.png
安装全部依赖项
当我们执行上述命令之后,所需的依赖项将被安装。
我们现在可以看到在我们的项目文件夹中创建的两个文件,分别是Pipfile和Pipfile.lock。
[*]Pipfile包罗我们刚刚安装的全部依赖项的名称。
[*]Pipfile.lock旨在根据中存在的依赖关系指定Pipfile应使用哪个特定版本,制止自动升级相互依赖的依赖关系并粉碎项目依赖关系树的风险。
*留意*:这里,我们安装了全部具有特定版本的依赖项,这些依赖项在编写本教程时在我的计算机上可以正常工作。假如我们不指定任何版本,则将安装该依赖项的最新版本,这可能与其他依赖项不兼容。
下一步是设置消息代理。
2. 设置消息代理
Celery 需要一个办理方案来发送和吸收消息;通常,这以称为消息代理的单独服务的情势出现。有几种选择,RabbitMQ就是此中之一。
在本地呆板上运行RabbitMQ 的最简单方法之一是使用Homebrew。Homebrew是MacOS上流行的软件包管理器。RabbitMQ 配方可从Homebrew的核心水龙头(开箱即用)得到。
[*]要使用 Homebrew 安装 RabbitMQ 服务器,只需执行以下命令:
https://i-blog.csdnimg.cn/blog_migrate/8b28f009fb584bf0265163d758975308.png
Brew 安装 RabbitMQ
[*]然后可以使用rabbitmq-server在前台启动服务器,大概使用brew services start rabbitmqlaunch 在后台运行该服务器。
[*]一旦我们启动服务器,我们就可以使用默认根据(Username: guest, Password: guest)登录到服务器http://localhost:15672。
https://i-blog.csdnimg.cn/blog_migrate/24571d298a3f38aaed52e312117dae6a.png
RabbitMQ 服务器
有关 RabbitMQ 的其他安装选项,请查看以下链接:
文档:目录
本页总结了最新补丁版本的 RabbitMQ 文档。请参阅下载和…
www.rabbitmq.com
我们的消息代理已启动并运行。现在让我们配置 Celery。
3. 加入芹菜
我们已经安装了 Celery 运行所需的全部依赖项。现在让我们配置它。在项目的根目录中,让我们创建一个名为 的文件main.py。
<iframe src="https://medium.com/media/7758a2744d039968bb5445c753ec977e" allowfullscreen="" frameborder="0" height="564" width="680" title="异步主程序" class="el n fd dy bg" scrolling="no" style="box-sizing: inherit; top: 0px; width: 680px; height: 563.993px; left: 0px;"></iframe>
主步调
[*]在上面的代码中,line 8我们16创建了一个名为 的工厂函数create_app(),它可以随时调用,并返回一个 FastAPI 应用实例供我们使用。在这个函数中,我们调用一个名为 的函数create_celery()来创建一个 Celery 应用实例。此外,我们还集成了大学路由模块,我们将在本教程中进一步解释它。
[*]23至行代码是使用Uvicorn24运行 FastAPI 应用步调所必需的。FastAPI 是我们用来构建 API 的框架,Uvicorn是我们用来处置处罚请求的服务器。
Celery 配置
现在通过添加文件来开始 celery 配置config/celery_config.py。
<iframe src="https://medium.com/media/7ed82baa3418caeede251bca4b3f3117" allowfullscreen="" frameborder="0" height="979" width="680" title="celery 配置" class="el n fd dy bg" scrolling="no" style="box-sizing: inherit; top: 0px; width: 680px; height: 978.993px; left: 0px;"></iframe>
Celery 配置
[*]上述文件包罗 Celery 运行所需的全部配置。从line 14到15我们定义了 Broker URL 和结果后端。在这里,我们使用在中安装的 RabbitMQstep 2作为代理和结果后端。
[*]从line 17到23我们定义了全部队列,我们将在应用步调中使用这些队列以及默认celery队列。这里我们将使用动态配置路由规则,而不是在每个任务级别手动配置CELERY_TASK_ROUTES路由规则。
[*]我们添加了一个辅助方法来line 6在10运行时获取每个任务的路由。
[*]从line 32到39我们定义了一个函数来在启动时加载芹菜配置。
Celery 应用实例
现在让我们再添加一个文件config/celery_utils.py,此中将包罗下面的代码来创建芹菜应用步调实例。
<iframe src="https://medium.com/media/a309ace9a24dc1efd3a57f557569c83a" allowfullscreen="" frameborder="0" height="740" width="680" title="celery 实用程序" class="el n fd dy bg" scrolling="no" style="box-sizing: inherit; top: 0px; width: 680px; height: 740px; left: 0px;"></iframe>
Celery 实用步调
上述文件定义了一个工厂函数,该函数配置然后返回一个 Celery 应用实例。它将读取之前定义的全部与 celery 相关的配置。我们还更新了一些默认的 celery 配置。在创建 FastAPI 应用实例时create_celery调用此函数。main.py
从line 22到32我们定义了另一个辅助函数get_task_info来获取通过 Celery 提交的异步任务的状态和结果。
4.添加 Celery 任务
现在,我们已经完成了全部与 Celery 相关的配置。现在让我们添加一些我们想要异步执行的任务。我们必须添加一个文件celery_tasks/tasks.py,此中包罗我们想要异步执行的全部任务。
<iframe src="https://medium.com/media/88bf7bdc1e862ce7812fc2ed9f667238" allowfullscreen="" frameborder="0" height="517" width="680" title="celery 任务" class="el n fd dy bg" scrolling="no" style="box-sizing: inherit; top: 0px; width: 680px; height: 516.997px; left: 0px;"></iframe>
任务.py
在上面的文件中,我们定义了两个任务get_all_universities_task和get_university_task。get_all_universities_task用于获取作为输入提供的国家列表中的全部大学。get_university_task用于获取给定国家/地区的大学。
在这里,我们习惯于shared_task使代码可重用,这需要在方法current_app内使用实例create_celery,而不是创建新的 Celery 实例。@shared_task 装饰器返回一个始终使用 current_app 中的任务实例的代理。它答应我们创建可由任何应用步调使用的任务。由于 Celery 内置了对重试失败任务的支持,因此我们已为我们的任务启用了它。使用重试选项,我们现在可以让异常冒泡并让装饰器处置处罚它。
5. 添加 APIRouter
现在,让我们添加一些入口点来测试我们的应用步调。我们将从一个简单的同步入口点开始。此入口点将调用一些外部 API 并将结果返回给客户端。在这里,我们使用我在之前的一个教程中使用过的相同免费 API。它在Apipheny中列出。我们将使用大学列表 API来获取指定国家/地区的大学列表。在本教程中,我们将获取作为 API 输入提供的大学列表。我们不会验证响应,由于我们只是模仿在我们的应用步调中执行一些长时间运行的任务来完成请求的场景,比如我们需要调用多个 API,在此基础上进行某些处置处罚,然后返回结果。
让我们添加一个专用文件作为模块来处置处罚大学请求/routers/universities.py。
<iframe src="https://medium.com/media/4d7b5cdf1a304118d3812d137a66939c" allowfullscreen="" frameborder="0" height="1374" width="680" title="大学路由器" class="el n fd dy bg" scrolling="no" style="box-sizing: inherit; top: 0px; width: 680px; height: 1373.99px; left: 0px;"></iframe>
大学_路由器.py
我们希望将与大学相关的路径操纵与别的代码分开,以保持代码井然有序。但它仍旧是同一个FastAPI应用步调的一部分。我们可以使用 为任何模块创建路径操纵APIRouter。我们已经在 中的应用步调中包罗了此路由器step 3。这里我们添加了 4 个不同的端点来测试我们的应用步调。
[*]get_universites :从line 14到22我们定义了一个简单的 API,它将国家列表作为输入,然后调用一些外部 API 来获取结果。每个国家的结果被组合起来然后返回给调用者。添加此 API 只是为了查抄 API 集成。它与 Celery 无关。
[*]get_universities_async:此 API 用于演示怎样使用 Celery 异步执行长时间运行的任务。当我们调用此 API 时,它会向代理提交一条任务消息并返回该消息的 taskID。然后,消费者可以使用其他 API 来查抄已提交任务的状态或获取任务结果。正如我们在这里看到的,我们只需要调用get_all_universities_task前面定义的方法step 4。只需在函数名后添加即可调用 Celery 任务.apply_async()。这将告诉 Celery 将新任务添加到队列。它在内部将任务提交到universities队列。将有一个工作进程监听这个队列。工作进程将吸收消息、处置处罚请求并将结果保存在结果后端。
[*]get_universities_parallel:此 API 用于演示怎样使用 Celery 将大任务拆分为较小的子任务并并行执行。调用此 API 时,我们会为输入的每个国家/地区创建一个任务。然后我们使用 Celery Group 对这些任务进行分组。Celery组是 惰性的,因此我们必须调用它来接纳行动并评估该组。然后我们执行该组并等候它完成。一旦全部任务完成,该组将完成,我们将得到响应。之后,我们网络组的响应并将其返回给调用者。这是一个同步流程,它使用 Celery 并行执行子任务。
[*]get_task_status:此API用于获取使用我们之前形貌的API之一提交的异步任务的状态和结果。
让我们看一下获取大学的代码:
<iframe src="https://medium.com/media/43597d4e1c113b5c234dadfb322d78c3" allowfullscreen="" frameborder="0" height="300" width="680" title="获取该国家/地区的所有大学" class="el n fd dy bg" scrolling="no" style="box-sizing: inherit; top: 0px; width: 680px; height: 299.983px; left: 0px;"></iframe>
我们line 5使用名为httpx 的 HTTP 客户端发送 HTTP 请求。HTTPX 是 Python 3 的功能齐备的 HTTP 客户端,它提供同步和异步 API,并支持 HTTP/1.1 和 HTTP/2。
在这里,我们使用同步 API 来获取大学数据。在上面的代码中,line 6我们调用大学 API 来获取结果。line 7我们将响应转换为JSON格式。然后从line 9到11,我们将对象映射JSON到Pydantic模子University。最后,我们返回字典,此中包罗国家名称作为键和大学列表作为值。
现在我们的代码已经完成,可以使用 Celery 运行 FastAPI。
6.启动应用步调和 Celery Worker 服务器
step 3我们已经添加了启动应用步调所需的代码。
FastAPI 是我们用来构建 API 的框架,Uvicorn是我们用来处置处罚请求的服务器。我们已经安装了Uvicorn。那将是我们的服务器。我们可以通过执行以下命令来启动应用步调:
python 主要.py
应用步调成功启动后,我们可以导航到http://localhost:9000/docs。系统将打开一个雷同这样的页面。我们可以在之前定义的 Swagger-UI 中看到端点。
https://i-blog.csdnimg.cn/blog_migrate/ec3dd5f291f8243af74b46b2493a9c7a.png
Swagger-ui
现在让我们启动 Celery 工作服务器。我们需要打开一个新的终端并进入项目根目录。然后我们需要使用以下命令安装全部依赖项:
Pipenv 安装-r 要求.txt
一旦安装了全部依赖项,我们就可以执行以下命令:
celery -A main.celery worker --loglevel=info -Q 大学,大学
universities, universityCelery 工作者将会监听我们应用步调中使用的自定义队列: 。
https://i-blog.csdnimg.cn/blog_migrate/2d8717f1c2bad57ec8326bc699f1730c.png
芹菜工
现在统统都已启动并运行!让我们测试我们的应用步调。
7.测试应用步调
让我们用一些示例国家测试我们的大学端点,以验证外部 API 集成。在这里,我们获取“土耳其”和“印度”的全部大学。我们不验证响应或数据。
https://i-blog.csdnimg.cn/blog_migrate/ac5ae5864c0112cb6172ae6f180c6980.png
获取大学请求
https://i-blog.csdnimg.cn/blog_migrate/026e8883c55a65f96273c7430732ab76.png
获取大学回应
如上所示,我们得到了请求国家/地区的大学列表。在这里,我们使用大学列表 API获取指定国家/地区的大学列表。
一旦我们测试了外部 API 交互是否正常工作,让我们使用 Celery 测试异步任务执行。在这里,我们将尝试获取作为输入提供的同一组国家/地区的大学列表。
https://i-blog.csdnimg.cn/blog_migrate/8fd222f3ccdb942da0ef3d0f7b18f038.png
异步请求
https://i-blog.csdnimg.cn/blog_migrate/37bbbd612d000ca1fa97448cb03038b5.png
异步响应
一旦提交请求,我们就会得到 task_id 作为确认响应。请求使用 Celery 作为单独的进程异步处置处罚。我们可以使用其他 API 查抄任务的状态。通过这种架构,我们可以处置处罚大量请求。我们可以将用户请求排入队列,并根据应用步调的资源可用性对其进行处置处罚。
让我们查抄一下上面提交的异步任务的状态。我们可以使用下面的 API,以 task_id 作为输入。
https://i-blog.csdnimg.cn/blog_migrate/d8e7a44ec0ec3a4779ae785da382d0f7.png
获取任务状态请求
https://i-blog.csdnimg.cn/blog_migrate/4c6a9fefe1d1016b8c9a6e63d8c09d11.png
获取任务状态响应
现在让我们测试一下将用户请求拆分为多个子任务、使用 Celery 并行处置处罚它们然后返回结果的场景。
https://i-blog.csdnimg.cn/blog_migrate/2c6a5a56bbdc3f43b2034d2d64bdba33.png
并行请求
https://i-blog.csdnimg.cn/blog_migrate/13c4e79d0a667ef99e766751c321cfcb.png
平行反应
这与之前测试的第一个 API 具有相同的请求和响应。但是,由于我们使用 Celery 并行获取每个国家的大学,因此执行速率会更快。在这里,我们在 0.58 秒内得到了响应,而第一个 API 则需要 1.67 秒。
8. 监控任务
到目前为止,我们可以看到使用 Celery 提交了一些要在后台执行的任务。假如我们可以使用 GUI 查看这些任务的进度,那就太好了。监控 Celery 应用步调的最简单方法是通过 Flower。使用它,我们可以轻松查抄全部任务的进度。我们已经安装了全部依赖项。我们只需要启动flower。要在本地计算机上运行flower服务器,请从项目根目录执行以下命令:
芹菜 -A main.celery 花 --port=5555
https://i-blog.csdnimg.cn/blog_migrate/33e724c553937fc2f9375d94f0d9fb15.png
花
我们现在可以看到我们的花草监控工具正在我们的本地服务器上运行http://localhost:5555/。
https://i-blog.csdnimg.cn/blog_migrate/7b1d1f662bb28fdd26ce02fc405bac90.png
花仪表板
在仪表板中,我们可以看到通过 Celery 执行的全部任务的择要。
一旦任务提交之后,我们还可以通过点击tasksFlower导航栏上的Tab来查看每个任务的进度。
比方,我们可以看到每个任务的状态,从“进行中”到“成功”或“失败”。我们还可以看到任务的启动时间、运行时间和响应等等。
https://i-blog.csdnimg.cn/blog_migrate/804c7233d253e556663a796fe967c310.png
Celery 任务
结论
花草监控工具正在我们的本地服务器上运行http://localhost:5555/。
[外链图片转存中…(img-cd6LYuX6-1722505743935)]
花仪表板
在仪表板中,我们可以看到通过 Celery 执行的全部任务的择要。
一旦任务提交之后,我们还可以通过点击tasksFlower导航栏上的Tab来查看每个任务的进度。
比方,我们可以看到每个任务的状态,从“进行中”到“成功”或“失败”。我们还可以看到任务的启动时间、运行时间和响应等等。
Celery 任务
结论
在本教程中,我们相识了怎样将 Celery 与 FastAPI 应用步调集成。将 Celery 与 FastAPI 应用步调一起使用不仅可以提高整体性能和效率,另有助于更好地处置处罚长时间运行的任务的后台作业。我们还相识了假如我们想监控通过 Celery 提交的任务,那么怎样使用 Flower 来做同样的事情。
博客原文:专业人工智能技术社区
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]