python中使用高并发分布式队列库celery的那些坑 [复制链接]
发表于 2025-6-29 16:59:00 来自手机 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
Celery 是一个用于 分布式任务队列 的 Python 库,常用于处理异步任务(即任务不需要立刻实验,后台慢慢做),尤其适合实验定时任务或耗时利用。

🌟 简单明白

   Celery 就是让你把“任务”扔到后台实验,而不是阻塞当前程序。
  
🛠️ 核心功能

功能说明异步任务实验比如发邮件、处理图片、生成报告等不需要立刻完成的利用。分布式任务调理可以运行在多台服务器上,实现任务负载均衡。定时任务(周期任务)雷同 crontab,可设置任务定时实验(如天天 8 点发日报)。任务重试机制失败任务可以自动重试,实用于网络波动等场景。与Django/Flask集成非常适合与这些 Web 框架共同使用,将长耗时任务下放到 Celery。
🚀 工作机制

Celery 一样平常由以下几部分构成:

  • Producer(生产者):你写的代码,会将任务“发”出去。
  • Broker(中心人):任务先存放在消息队列(如 Redis、RabbitMQ)中。
  • Worker(工人):后台运行的历程,专门“接收”和“实验”这些任务。
  • Result Backend(结果后端):可选,纪录任务结果,如实验成功或失败。

📦 示例代码(使用 Redis 作为 broker)

  1. # tasks.py
  2. from celery import Celery
  3. app = Celery('mytasks', broker='redis://localhost:6379/0')
  4. @app.task
  5. def add(x, y):
  6.     return x + y
复制代码
运行方式:
  1. celery -A tasks worker --loglevel=info
复制代码
调用方式(异步实验):
  1. add.delay(3, 5)  # 返回一个异步结果对象
复制代码

🔗 常见搭配



  • 消息中心件:Redis、RabbitMQ(推荐 Redis 简单易用)
  • Web框架集成:Django、Flask
  • 共同 Flower、Prometheus、Grafana 等工具可实现任务监控监控

如果你正在开辟一个 需要做“异步处理”或“后台任务”的系统,Celery 是 Python 中的主流选择之一。但是该库看似简单,却隐蔽着无数坑,本文就带大家了解一下我在使用过程中遇到的那些坑。
📦 我的环境



  • windows 10
  • python 3.12
  • celery 5.5.2
📦第一个标题

实验命令:
  1. celery -A main_async:celery_app worker --loglevel=info
复制代码
报错:
  1. [2025-05-29 19:40:22,107: INFO/MainProcess] Task main_async.background_content_similarity[4c84e1c8-6a13-4241-8e62-04e17b3884cb] received
  2. [2025-05-29 19:40:22,142: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)')
  3. billiard.einfo.RemoteTraceback:
  4. """
  5. Traceback (most recent call last):
  6.   File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362, in workloop
  7.     result = (True, prepare_result(fun(*args, **kwargs)))
  8.                                    ^^^^^^^^^^^^^^^^^^^^
  9.   File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640, in fast_trace_task
  10.     tasks, accept, hostname = _loc
  11.     ^^^^^^^^^^^^^^^^^^^^^^^
  12. ValueError: not enough values to unpack (expected 3, got 0)
  13. """
  14. The above exception was the direct cause of the following exception:
  15. Traceback (most recent call last):
  16.   File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362, in workloop
  17.     result = (True, prepare_result(fun(*args, **kwargs)))
  18.                                    ^^^^^^^^^^^^^^^^^^^^
  19.   File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640, in fast_trace_task
  20.     tasks, accept, hostname = _loc
  21.     ^^^^^^^^^^^^^^^^^^^^^^^
  22. ValueError: not enough values to unpack (expected 3, got 0)
复制代码
该标题是由于celery的默认并发网络编程线程库引起的,换成eventlet可以办理标题,只需修改启动命令即可,如下:
  1. celery -A main_async:celery_app worker --loglevel=info
  2. -P eventlet
复制代码
📦第二个标题

第二个标题是日记标题,报错雷同如下所示:
  1. 'LoggingProxy' object has no attribute 'encoding'"
复制代码
原因分析

Celery 在启动 worker 时,默认会将尺度输出和尺度错误重定向到其日记系统中。这意味着 sys.stdout 和 sys.stderr 被替换为 LoggingProxy 对象。然而,某些库或代码可能期望这些对象具有尺度文件对象的属性,如 encoding 或 fileno,从而导致 AttributeError。
此时只需要将worker_redirect_stdouts参数设置为False即可办理标题,代码如下:
  1. # Celery 配置
  2. celery_app.conf.update(
  3.     task_serializer="json",
  4.     accept_content=["json"],
  5.     result_serializer="json",
  6.     timezone="Asia/Shanghai",
  7.     enable_utc=True,
  8.     include=["main_async"],  # 显式指定任务模块
  9.     task_track_started=True,  # 跟踪任务开始状态
  10.     task_ignore_result=False,  # 保存任务结果
  11.     task_store_errors_even_if_ignored=True,  # 存储错误
  12.     worker_redirect_stdouts = False        # 禁止将stdout和stderr重定向到当前记录器。
  13. )
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表