在 Django 中使用 Celery 主要是为了处理耗时或需要异步实验的使命,避免阻塞 Web 哀求的相应,提升用户体验和系统性能。
安装很简朴,打开终端
先来了解一下
async tasks——异步使命
- 用途:将耗时的操纵从 HTTP 哀求-相应周期中剥离,放到后台异步实验。
- 典范场景:
- 发送邮件或短信验证码(避免用户等候网络延迟)。
- 处理图片/视频(如缩略图生成、格式转换)。
- 调用第三方 API(如支付接口、舆图服务)。
- 大数据处理或复杂计算(如生成报表、数据分析)。
异步实现发送验证码流程
流程就是,把一些方法“摘”出来,放在tasks.py文件下,在视图函数中调用这个方法,这里举例发送邮箱验证码。
先看下目录结构,一清二楚,有人问tasks.py放app里还是哪?tasks.py 文件的最佳实践是 直接放在相干功能的应用目录下(例如这里验证码流程在 Login 应用,tasks就放在Login下)
- book_demo/
- ├── book_demo/ # 项目根目录
- │ ├── __init__.py
- │ ├── settings.py # Django 配置
- │ ├── urls.py
- │ └── celery.py # Celery 初始化配置(关键!)
- │
- └── Login/ # 你的登录/注册功能应用
- ├── __init__.py
- ├── tasks.py # Celery 任务定义(核心文件)
- ├── views.py # 处理验证码的视图函数
- ├── models.py
- └── ...
复制代码 0.情况准备
不消担心,总共需要安装Redis\Celery两个就行了
先安装一些设置才能启动Celery:也就是安装Celery 架构的焦点组成部分。
常看到一个描述是:使用Redis作为Broker ?????一头雾水,下面用人话讲讲,了解一下在开始安装:
1. 什么是 Broker?
- Broker(消息代理):可以理解为一个「使命中转站」,负责在 Celery 的各个组件之间通报消息。
- 类比:想象你去快递站寄包裹,Broker 就像快递公司的「分拣中央」——它负责接收你要寄的包裹(使命),并分发给快递员(Worker)行止理。
- 焦点作用:当你在代码中调用 .delay() 提交异步使命时,这个使命会被发送到 Broker,再由 Broker 转发给空闲的 Worker 实验。
2. 什么是 Redis?
- Redis:是一个开源的内存数据库,支持高性能的键值存储,常用于缓存、消息队列等场景。
- 在 Celery 中的角色:Redis 可以作为 Broker 的具体实现之一(即用 Redis 来充当使命中转站)。
- 其他常见 Broker:
- RabbitMQ:专为消息队列筹划的服务,稳固性更高(企业级首选)。
- Amazon SQS:云服务商提供的队列服务。
3. Celery 的完整工作流程
- 使命提交:你在 Django 代码中调用 task.delay(),提交使命到 Broker。
- 使命存储:Broker(如 Redis)将使命存入队列。
- 使命分发:Celery Worker 进程从 Broker 拉取使命。
- 使命实验:Worker 实验使命(如发送验证码)。
- 结果存储(可选):使命结果可存入 Result Backend(如另一个 Redis 数据库)。
- [Django] --> [Broker(Redis)] --> [Celery Worker] --> [执行任务]
复制代码 4. 为什么需要 Broker?
- 解耦生产者和消耗者:Django(生产者)无需关心使命如何实验,Worker(消耗者)也无需知道使命是谁提交的。
- 缓冲使命:当使命瞬间激增时,Broker 作为缓冲区,避免使命丢失。
- 支持分布式:多个 Worker 可以同时从 Broker 拉取使命,实现负载平衡。
了解完毕,我这里使用的是Redis充当Broker
安装Redis
1.下载
- 访问 Redis 官方推荐的 Windows 移植版(微软维护):
https://github.com/tporadowski/redis/releases
- 下载最新 .msi 安装包(如 Redis-x64-5.0.14.1.msi)。
2. 运行安装程序
- 双击 .msi 文件启动安装向导。
- 选择安装路径:
- 在安装界面中,点击 "Browse...",选择 D:\Redis(或其他自定义路径)。
- 勾选 "Add Redis installation folder to PATH"(将 Redis 添加到情况变量)。
- 完成安装。
3.验证安装
- # 检查 Redis 是否安装成功
- redis-server --version
- # 输出示例:Redis server v=5.0.14.1
- # 启动 Redis 服务端
- redis-server
复制代码 出现下面的图案代表启动成功:
这里如果报错出现下图,不消担心
可参考另一篇博客~
安装完Celery和Redis之后,可开始下面的:
1.设置 Celery
book_deml/settings.py下添加:
- CACHES = {
- 'default': {
- 'BACKEND': 'django.core.cache.backends.redis.RedisCache',
- 'LOCATION': 'redis://127.0.0.1:6379/1', # Redis 地址和数据库编号
- }
- }
- CELERY_BROKER_URL = 'redis://localhost:6379/0'
- CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
复制代码 确保项目根目录的 celery.py 和 __init__.py 已正确设置:
- # book_demo/book_demo/celery.py
- import os
- from celery import Celery
- # 设置 Django 环境变量
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'book_demo.settings')
- app = Celery('book_demo')
- # 从 Django 配置中读取 Celery 设置
- app.config_from_object('django.conf:settings', namespace='CELERY')
- # 自动发现所有应用中的 tasks.py
- app.autodiscover_tasks()
复制代码- # book_demo/book_demo/__init__.py
- from .celery import app as celery_app
- __all__ = ['celery_app']
复制代码 2.运行 Celery Worker
启动 Celery Worker 处理异步使命,说人话就是打开终端输入命令启动celery:
- celery -A book_demo worker --loglevel=info -P eventlet
复制代码 这里关于这个命令另一篇处理启动Celery的博客中有具体介绍。eventlet记得也先pip install 安装
出现下图则代表安装成功:
测试缓存功能
在 Django Shell 中手动测试缓存功能:
- 1.输入:
- python manage.py shell
- 2.输入:
- from django.core.cache import cache
- # 设置缓存
- cache.set('test_key', 'test_value', timeout=300)
- # 获取缓存
- value = cache.get('test_key')
- print(value) # 输出:test_value
复制代码 成功就长如许:
3.创建 tasks.py
在 Login/tasks.py 中定义发送验证码的异步使命:
- # book_demo/Login/tasks.py
- from celery import shared_task
- from django.core.mail import send_mail
- from django.conf import settings
- import random
- @shared_task
- def send_email_captcha_task(email):
- # 生成 6 位随机验证码
- captcha = ''.join(random.choices('0123456789', k=6))
- # 存储到缓存(有效期 5 分钟)这里我用的Redis
- cache_key = f'verification_code_{email}'
- cache.set(cache_key, captcha, timeout=300)
- stored_captcha = cache.get(cache_key) # 从缓存中获取验证码
- # 打印缓存键和验证码
- logger.info(f"缓存键:{cache_key}, 缓存值:{stored_captcha}")
- try:
- send_mail(
- subject='您的验证码',
- message=f'您的验证码是:{captcha},有效期 5 分钟。',
- from_email=None, # 使用配置的默认发件人
- recipient_list=[email],
- fail_silently=False,
- )
- return True # 可选:返回任务执行状态
- except Exception as e:
- # 记录错误日志
- logger.error(f'邮件发送失败: {str(e)}')
- return False # 可选:返回任务执行状态
复制代码 这里需要注意几点:
- 在 tasks.py 文件中不推荐直接使用 print,而是使用日志记载器(logger)来记载信息。因为celery本质就是分布式使命队列,使命大概在不同的进程、机器以致服务器上运行。直接使用 print 输出的内容很难被集中管理和监控。还因为print 输出的内容通常不会被持久化到日志文件中,而是直接显示在终端上。一旦使命实验完成,这些信息就会丢失,不利于后续的排查和分析。
- 如果logger.info无法正常打印,看一下是不是启动celery时没有添加:--loglevel=info
4.在视图函数中调用异步使命
修改 book_demo/Login/views.py,触发异步使命:
- # book_demo/Login/views.py
- from django.shortcuts import render, HttpResponse
- from .tasks import send_verification_code_task # 从当前应用导入任务
- from rest_framework.views import APIView
- class send_email_captcha(APIView):
- def post(self, request):
- email = request.POST.get('email')
-
- ##这里一般校验邮箱格式、是否为空、是否已经注册等
- try:
- # 异步触发发送验证码任务
- send_email_captcha_task.delay(email) # 关键:使用 .delay()
- # 使用.delay()确保任务在CeleryWorker中执行,避免阻塞请求。
- return JsonResponse(
- {'success': True, 'message': '验证码已发至您的邮箱'},
- status=status.HTTP_200_OK
- )
- except Exception as e:
- return JsonResponse(
- {'success': False, 'message': f'服务器错误:{str(e)}'},
- status=status.HTTP_500_INTERNAL_SERVER_ERROR
- )
复制代码 从缓存中获取验证码是否正确:
- class verify_email_captcha(APIView):
- def post(self, request):
- try:
- email = request.data.get('email')
- user_input_captcha = request.data.get('emailCaptcha')
- cache_key = f'verification_code_{email}'
- stored_captcha = cache.get(cache_key) # 从缓存中获取验证码
- # 打印缓存键和验证码
- print(f"缓存键:{cache_key}, 缓存值:{stored_captcha}")
- if not stored_captcha:
- return Response({'success': False, 'message': '验证码已过期或无效'}, status=status.HTTP_400_BAD_REQUEST)
- # 比较用户输入的验证码和存储的验证码
- if user_input_captcha == stored_captcha:
- return Response({'success': True, 'message': '验证码验证成功'}, status=status.HTTP_200_OK)
- else:
- return Response({'success': False, 'message': '验证码错误'}, status=status.HTTP_400_BAD_REQUEST)
- except Exception as e:
- return Response({'success': False, 'message': f'服务器错误:{str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
复制代码 总结
好了到这正常异步处理发验证码的流程就搞好了 ~
回首一下Celery 的焦点作用:
- 异步实验:将耗时的操纵(如发邮件、短信、复杂计算)从 Django 的 HTTP 哀求-相应周期中剥离,交给 Celery Worker 在后台实验。
- 解耦:Django 负责接收哀求和快速返反相应,Celery 负责处理耗时使命,二者通过 消息队列(如 Redis) 通信。
Redis 的双重角色:
- 消息队列(Broker):
- Celery 使用 Redis 作为使命通报的中转站(Django 提交使命到 Redis,Celery Worker 从 Redis 拉取使命)。
- 这是通过 CELERY_BROKER_URL 设置的。
- 缓存(Cache):
- 你手动将验证码存储到 Redis(通过 cache.set()),用于后续验证。
- 这是通过 CACHES 设置的(Django 缓存框架)。
人话就是:
Celery就是单独把发验证码的这个方法摘出来,异步实验(就是把方法写在task.py下),结果呢(好比验证码)就存在Redis里,在用到的时候就再从Redis中获取。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |