利用 `Celery` 共同 `RabbitMQ` 作为消息代理,实现异步使命的调度、重试、 ...

  金牌会员 | 2025-3-2 07:44:53 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 987|帖子 987|积分 2961

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
python基础代码、优化、扩展和监控的完整示例。此示例利用 Celery 共同 RabbitMQ 作为消息代理,实现异步使命的调度、重试、定时使命以及错误监控等功能。

项目结构

我们将项目结构组织如下,以便代码逻辑清楚且易于扩展:
  1. project/
  2. ├── celery_app.py        # Celery应用的配置和初始化
  3. ├── tasks.py             # 异步任务的定义
  4. ├── monitor.py           # 异常监控和报警
  5. └── main.py              # 测试异步任务调用
复制代码
1. celery_app.py - 设置 Celery 应用

  1. # celery_app.py
  2. from celery import Celery
  3. from celery.schedules import crontab
  4. app = Celery('tasks', broker='amqp://localhost//', backend='redis://localhost')
  5. # 基础配置
  6. app.conf.update(
  7.     result_expires=3600,                     # 任务结果过期时间
  8.     task_acks_late=True,                     # 确保任务执行后才确认完成
  9.     worker_prefetch_multiplier=1,            # 单次预取任务数
  10.     task_serializer='json',                  # 任务数据序列化格式
  11.     result_serializer='json',                # 任务结果序列化格式
  12.     accept_content=['json'],                 # 仅接收json格式
  13.     task_soft_time_limit=300,                # 软超时时间
  14.     task_time_limit=600,                     # 硬超时时间
  15.     worker_hijack_root_logger=False,         # 不劫持主日志
  16.     worker_log_format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
  17. )
  18. # 定时任务配置
  19. app.conf.beat_schedule = {
  20.     'scheduled_add': {
  21.         'task': 'tasks.add',
  22.         'schedule': crontab(hour=7, minute=30, day_of_week=1),
  23.         'args': (16, 16),
  24.     },
  25. }
  26. # 任务路由配置:不同的任务可以走不同的队列
  27. app.conf.task_routes = {
  28.     'tasks.add': {'queue': 'high_priority'},
  29. }
复制代码
2. tasks.py - 定义使命

  1. # tasks.py
  2. from celery_app import app
  3. from monitor import task_failure_handler
  4. import time
  5. # 定义基础任务
  6. @app.task(bind=True, max_retries=3)
  7. def add(self, x, y):
  8.     try:
  9.         time.sleep(5)  # 模拟耗时任务
  10.         return x + y
  11.     except Exception as exc:
  12.         raise self.retry(exc=exc, countdown=5)  # 5秒后重试
复制代码
3. monitor.py - 监控与报警

  1. # monitor.py
  2. from celery.signals import task_failure
  3. @task_failure.connect
  4. def task_failure_handler(sender=None, exception=None, **kwargs):
  5.     # 发送报警通知或记录错误日志
  6.     print(f"[ALERT] Task {sender.name} failed due to {exception}")
复制代码
4. main.py - 测试使命调用

  1. # main.py
  2. from tasks import add
  3. from celery_app import app
  4. if __name__ == "__main__":
  5.     # 启动异步任务
  6.     result = add.delay(4, 6)
  7.     print("Task state:", result.state)  # 打印任务状态
  8.     print("Result:", result.get())      # 获取任务结果(阻塞等待)
  9.    
  10.     # 组合任务示例:Group
  11.     from celery import group
  12.     group_tasks = group(add.s(i, i) for i in range(10))
  13.     group_result = group_tasks.apply_async()
  14.     print("Group Result:", group_result.get())
  15.    
  16.     # 链式任务示例:Chord
  17.     from celery import chord
  18.     callback = add.s(10, 20)
  19.     chord_tasks = chord((add.s(i, i) for i in range(10)), callback)
  20.     chord_result = chord_tasks.apply_async()
  21.     print("Chord Result:", chord_result.get())
复制代码

运行和监控


  • 启动 RabbitMQ 服务
    1. sudo service rabbitmq-server start
    复制代码
  • 启动 Celery Worker
    运行以下命令,指定 high_priority 队列处理高优先级使命。
    1. celery -A celery_app worker -Q high_priority,default -l info
    复制代码
  • 启动 Celery Beat(用于调度定时使命):
    1. celery -A celery_app beat -l info
    复制代码
  • 启动 Flower 及时监控(可选):
    1. celery -A celery_app flower --port=5555
    复制代码
    访问 http://localhost:5555 进行使命和 worker 状态的及时监控。
  • Prometheus 和 Grafana 监控(可选)
    设置 Celery 的自定义事件,并利用 Prometheus 采集数据,再通过 Grafana 可视化 Celery 的性能指标。

此示例项目具有以下特性:


  • 异步使命:通过 delay() 方法调用。
  • 重试机制:在使命非常时自动重试。
  • 使命调度:支持定时使命,利用 Celery Beat 实现周期性使命调度。
  • 报警机制:在使命失败时发送报警或日志记录。
  • 监控系统:利用 Flower 进行及时监控,支持 Prometheus 和 Grafana 扩展。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表