Celery+Redis:高效实现Python分布式任务队列与异步处理

打印 上一主题 下一主题

主题 1937|帖子 1937|积分 5811

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Celery+Redis:高效实现Python分布式任务队列与异步处理
   Celery 是一个简单、机动且可靠的分布式任务队列,专注于及时处理和任务调理。它可以或许高效处理大量消息,广泛应用于 Python 项目中,适用于发送电子邮件、数据清洗等耗时任务。
  Celery有广泛的用户与贡献者社区,Github地址https://github.com/celery/celery。
      第一部分 基础概述

一、Celery 核心概念


  • 分布式任务队列
    Celery 是一个基于 Python 的分布式任务队列系统,专注于及时任务处理和调理,通过异步实行任务提升系统性能。其核心计划支持多台盘算机协同工作,实现任务分发与并行处理。
  • 消息中间件(Broker)
    负责接收和传递任务消息,常用中间件包罗 Redis 和 RabbitMQ。它是 Celery 架构的核心组件,确保任务生产者与消耗者解耦。
  • 任务实行单元(Worker)
    Worker 是实际实行任务的进程,通过监听 Broker 队列获取任务并处理。支持动态扩展 Worker 数量以适应负载变化。
  • 结果存储(Backend)
    用于生存任务实行结果,可选组件如 Redis、数据库或内存。若不需结果存储,可设置为忽略。
  • 任务(Task)
    用户定义的具体操作逻辑,通过装饰器 @app.task 标记为异步任务。任务可以是函数或类方法,支持同步/异步调用。
二、安装

  1. pip install celery redis
复制代码
三、典型设置示例

  1. # celery_config.py
  2. broker_url = 'redis://localhost:6379/0'
  3. result_backend = 'redis://localhost:6379/1'
  4. task_serializer = 'json'
  5. result_serializer = 'json'
  6. accept_content = ['json']
  7. timezone = 'Asia/Shanghai'
  8. task_default_queue = 'default'
  9. task_routes = {
  10.     'tasks.add': {'queue': 'math'},
  11.     'tasks.send_email': {'queue': 'email'}
  12. }
  13. task_acks_late = True
  14. worker_prefetch_multiplier = 1  # 公平调度
复制代码
四、关键场景说明


  • 任务优先级
    Redis 传输支持优先级(Celery 4.0+),设置 broker_transport_options 中的 priority_steps 参数,定义优先级范围:
    1. app.conf.broker_transport_options = {
    2. 'priority_steps': [0, 1, 2]  # 支持0(低)、1(中)、2(高)三个优先级
    3. 'queue_order_strategy': 'priority',
    4. }
    复制代码
    路由设置,确保任务路由到同一队列,不同优先级通过参数区分:
    1. app.conf.task_routes = {
    2. 'high_priority_task': {'queue': 'default', 'priority': 2},
    3. 'medium_priority_task': {'queue': 'default', 'priority': 1},
    4. 'low_priority_task': {'queue': 'default', 'priority': 0}
    5. }
    复制代码
    指定任务优先级,发送任务时通过 priority 参数设置优先级:
    1. high_priority_task.apply_async(priority=2)  # 最高优先级
    复制代码
  • 防止内存泄漏
    启动 Worker 时限制子进程任务数:
    1. celery -A proj worker --max-tasks-per-child=100
    复制代码
  • 动态任务重试
    在任务内部利用 self.retry 控制重试逻辑:
    1. @app.task(bind=True, max_retries=5)
    2. def fetch_data(self, url):
    3.     try:
    4.         response = requests.get(url)
    5.         return response.json()
    6.     except requests.Timeout as e:
    7.         self.retry(exc=e, countdown=60)  # 60 秒后重试
    复制代码
第二部分 Celery Worker

一、基础下令布局

  1. celery -A <模块名> worker [参数]
复制代码
示例
  1. celery -A tasks worker --loglevel=info --concurrency=4
复制代码
二、核心参数说明

参数说明典型值--concurrency / -c工作进程数(默认=CPU核心数)。I/O密集型任务可增加,CPU密集型任务发起等于CPU数4, 10--queues / -Q监听的队列(默认监听全队伍列)。用于路由任务优先级high_priority,low_priority--loglevel / -l日志级别info, debug, warning--hostname / -nWorker名称(默认自动生成)。用于监控辨认worker1@%h--autoscale动态伸缩进程数。格式:最大进程数,最小进程数10,2--max-tasks-per-child子进程处理任务数上限。防内存泄漏100--time-limit任务硬超时时间(秒)。超时强制终止任务300--soft-time-limit任务软超时时间(秒)。超时抛出异常,任务可捕获处理240--without-gossip禁用节点间通讯。淘汰网络开销无值--without-mingle禁用启动时节点同步。加快启动无值--without-heartbeat禁专心跳检测。可能影响监控无值--prefetch-multiplier预取任务倍数(默认=4)。控制并发消耗速率1(公平调理)--pool / -P实行池类型。prefork(默认), eventlet, gevent, sologevent--detach后台运行(守护进程模式)无值 三、高级参数说明

参数说明默认值--statedb持久化 Worker 状态到文件(用于恢复)None--scheduler自定义调理器(如共同 redbeat 利用)None(利用默认调理逻辑)--exclude-queues排除特定队列None(不排除任何队列)--pidfile指定 PID 文件路径None(不生成 PID 文件)--heartbeat-interval心跳检测间隔(秒)30(秒) 四、场景化设置示例

1. 高并发处理(I/O密集型)

  1. celery -A tasks worker -P gevent -c 100 --prefetch-multiplier=1
复制代码


  • 利用 gevent 协程池,启动 100 个协程
  • prefetch-multiplier=1 确保公平调理
2. CPU密集型任务

  1. celery -A tasks worker -c 4 --max-tasks-per-child=100
复制代码


  • 进程数等于 CPU 核心数
  • 每进程处理 100 个任务后重启,克制内存泄漏
3. 生产情况尺度设置

  1. celery -A tasks worker -c 8 -Q pay,email --autoscale=16,4 --time-limit=600
复制代码


  • 监听 order_pay 和 email 队列
  • 进程数动态调解(4~16)
  • 任务超时 10 分钟强制终止
4. 调试模式

  1. celery -A tasks worker -l debug -P solo --without-gossip --without-mingle
复制代码


  • 日志级别 debug
  • 单进程运行(solo 池),克制多进程干扰调试
  • 禁用节点通讯加快启动
五、关键操作下令


  • 优雅关闭 Worker
    1. celery -A tasks control shutdown
    复制代码
  • 检察活泼 Worker
    1. # 查看活跃的工作进程
    2. celery -A tasks inspect active
    3. # 查看工作进程的状态
    4. celery -A tasks status
    5. # 查看当前正在执行的任务
    6. celery inspect active
    7. # 查看已注册的任务
    8. celery inspect registered
    复制代码
  • 长途清除队列
    1. celery -A tasks purge -Q queue_name
    复制代码
六、设置文件集成

在 celeryconfig.py 中定义参数:
  1. # celeryconfig.py
  2. worker_concurrency = 8
  3. worker_prefetch_multiplier = 2
  4. worker_max_tasks_per_child = 100
复制代码
启动时自动加载设置:
  1. celery -A tasks worker --config=celeryconfig
复制代码
七、官方文档参考



  • Celery Worker 官方文档
  • Celery Worker 参数列表
第三部份 Celery Beat

一、基础下令布局

  1. celery -A <模块名> beat [参数]
复制代码
示例
  1. celery -A tasks beat --loglevel=info --scheduler=redbeat.RedBeatScheduler
复制代码
二、核心参数说明

参数说明典型值--scheduler / -S指定调理器。默认 celery.beat.PersistentScheduler(本地文件存储)redbeat.RedBeatScheduler(Redis)
django_celery_beat.schedulersatabaseScheduler(Django)--schedule / -s调理文件路径(仅限默认调理器)。存储任务元数据 .db/var/run/celery/beat-schedule--max-interval最大调理循环间隔(秒)。默认 0(尽可能快)30(低落 CPU 占用)--loglevel / -l日志级别DEBUG INFO WARNING ERROR CRITICAL FATAL--logfile / -f日志 文件路径。默认 stderr/var/run/celery/beat.pid--pidfilePID 文件路径。防止多实例冲突/var/run/celery/beat.pid--detach后台运行(守护进程模式)无值 三、高级参数说明

参数说明默认值--timezone设置时区(需 celery 启用时区支持)UTC--enable-utc强制利用 UTC 时区True--sslTrue SSL 连接 BrokerFalse--workdir指定工作目录None(当前目录) 四、场景化设置示例


  • 利用默认调理器(本地文件存储)
    1. celery -A tasks beat --schedule=/data/celerybeat-schedule --loglevel=info
    复制代码

    • 调理记载存储在文件中
    • 适合单机部署

  • 利用 RedBeat(Redis 存储)
    1. celery -A tasks beat \
    2. --scheduler=redbeat.RedBeatScheduler \
    3. --redbeat_redis_url=redis://localhost:6379/2 \
    4. --max-interval=10
    复制代码

    • 任务设置持久化到 Redis
    • 支持分布式部署

  • 生产情况高可用
    1. celery -A tasks beat \
    2. --scheduler=redbeat.RedBeatScheduler \
    3. --redbeat_redis_url=redis://localhost:6379/2 \
    4. --pidfile=/var/run/celery/beat.pid \
    5. --max-interval=5 \
    6. --loglevel=warning
    复制代码

    • 利用 Redis 集群
    • 限制调理间隔为 5 秒
    • 日志级别为告诫

  • 调试模式
    1. celery -A tasks beat --loglevel=debug --max-interval=1
    复制代码

    • 及时打印调试日志
    • 每秒查抄一次任务

五、设置文件集成

在 celeryconfig.py 中定义参数:
  1. # celeryconfig.py
  2. beat_scheduler = "redbeat.RedBeatScheduler"
  3. redbeat_redis_url = "redis://localhost:6379/2"
  4. max_interval = 10
  5. timezone = "Asia/Shanghai"
复制代码
启动时自动加载:
  1. celery -A tasks beat --config=celeryconfig
复制代码

六、关键操作


  • 防止多实例冲突
    1. celery -A tasks beat --pidfile=/tmp/celerybeat.pid
    复制代码
  • 动态添加任务(RedBeat)
    1. from redbeat import RedBeatSchedulerEntry
    2. from celery.schedules import crontab
    3. entry = RedBeatSchedulerEntry(
    4.     name='daily-report',
    5.     task='tasks.generate_report',
    6.     schedule=crontab(hour=8, minute=0),
    7.     args=['sales']
    8. )
    9. entry.save()
    复制代码
  • 监控 Beat 状态
    1. celery -A tasks inspect scheduled
    复制代码
七、留意事项


  • 时区一致性
    Broker、Worker、Beat 需设置类似时区。
  • Redis 高可用
    利用 RedBeat 时发起设置 Redis 哨兵或集群。
  • 文件权限
    利用 --schedule 时确保进程有写入权限。
八、官方文档参考



  • Celery Beat 官方文档
  • Celery Beat 参数列表
第四部分 Celery RedBeat

一、RedBeat 核心特性



  • 分布式调理:基于 Redis 实现多 Beat 实例高可用
  • 动态任务管理:支持运行时增删改查定时任务
  • 持久化存储:任务设置存储在 Redis 中,重启不丢失
  • 精确调理:毫秒级任务触发精度
二、核心参数详解

参数默认值说明--schedulercelery.beat.PersistentScheduler必须显式指定为 redbeat.RedBeatScheduler--redbeat_redis_urlredis://localhost:6379/0Redis 连接字符串(支持哨兵/集群)--redbeat_key_prefixredbeat:Redis 键名前缀(发起按项目区分)--redbeat_lock_keyredbeat::lock分布式锁键名(多实例需类似)--redbeat_lock_timeout30 (秒)锁自动开释时间(需 > 调理间隔)--redbeat_schedule_interval30 (秒)任务查抄频率(影响 CPU 利用率)--redbeat_expire_seconds604800 (7天)任务元数据过期时间--redbeat_initial_task_loadTrue启动时预加载全部任务--redbeat_max_loop_interval300 (秒)最大休眠间隔(防止时钟漂移) 三、设置方法

安装
  1. pip install celery-redbeat
复制代码
方式一:下令行启动
  1. celery beat -A proj \
  2.   --scheduler=redbeat.RedBeatScheduler \
  3.   --redbeat_redis_url=redis://:密码@集群节点:6379/2 \
  4.   --redbeat_key_prefix=prod:celery:beat \
  5.   --redbeat_lock_timeout=60 \
  6.   --redbeat_schedule_interval=10
复制代码
方式二:设置文件(celery.py)
  1. app.conf.update(
  2.     # 基础配置
  3.     broker_url='redis://localhost:6379/0',
  4.     result_backend='redis://localhost:6379/1',
  5.    
  6.     # RedBeat 专用配置
  7.     redbeat_redis_url='redis://:密码@集群节点:6379/2',
  8.     redbeat_key_prefix='prod:celery:beat',
  9.     redbeat_lock_key='prod:celery:cluster_lock',
  10.     redbeat_lock_timeout=60,
  11.     redbeat_schedule_interval=10
  12. )
复制代码
四、核心操作示例

1. 动态任务管理

  1. from redbeat import RedBeatSchedulerEntry
  2. from celery.schedules import crontab
  3. # 创建每小时执行的任务
  4. entry = RedBeatSchedulerEntry(
  5.     name='hourly_cleanup',  # 全局唯一标识
  6.     task='tasks.cleanup',
  7.     schedule=crontab(minute=0),
  8.     args=('logs',),
  9.     kwargs={'retry': True},
  10.     options={'queue': 'system'}
  11. )
  12. entry.save()  # 持久化到Redis
  13. # 修改现有任务
  14. entry = RedBeatSchedulerEntry.from_key('redbeat:hourly_cleanup')
  15. entry.schedule = crontab(minute=0, hour='*/2')  # 改为每2小时
  16. entry.save()
  17. # 删除任务
  18. entry.delete()
复制代码
2. 多实例高可用部署

  1. # 实例1
  2. celery beat -A proj --scheduler=redbeat.RedBeatScheduler \
  3.   --redbeat_lock_key=cluster1 --redbeat_lock_timeout=60
  4. # 实例2(自动故障转移)
  5. celery beat -A proj --scheduler=redbeat.RedBeatScheduler \
  6.   --redbeat_lock_key=cluster1 --redbeat_lock_timeout=60
复制代码
五、监控与维护

1. Redis 数据检察

  1. # 列出所有任务
  2. redis-cli --scan --pattern 'redbeat:*'
  3. # 查看任务详情
  4. redis-cli HGETALL redbeat:my_task_name
  5. # 强制清除旧数据
  6. redis-cli --scan --pattern 'redbeat:*' | xargs redis-cli del
复制代码
2. 日志监控要点

  1. [INFO] redbeat: Starting redbeat scheduler.
  2. [DEBUG] redbeat: Acquired lock
  3. [WARNING] redbeat: Lock timeout reached, releasing lock.
  4. [ERROR] redbeat: Redis connection error: Error 111 connecting to...
复制代码
六、最佳实践


  • 容量规划
    Redis 内存发起预留:任务数 * 1KB,例如 1000 个任务约需 1MB
  • 安全设置
    1. # 禁用危险操作
    2. app.conf.redbeat_allow_unsafe_write = False
    复制代码
  • 监控指标
    Prometheus 监控发起采集:

    • redbeat_tasks_total (Counter)
    • redbeat_lock_acquired (Gauge)
    • redbeat_errors_total (Counter)

  • 灾备方案
    定期导出任务设置:
    1. import json
    2. from redbeat import RedBeatSchedulerEntry
    3. tasks = RedBeatSchedulerEntry.all()
    4. with open('backup.json', 'w') as f:
    5.     json.dump([task.to_dict() for task in tasks], f)
    复制代码
七、故障排查指南

现象可能原因解决方案任务不实行Redis 连接失败查抄 redbeat_redis_url 网络连通性多实例同时运行锁竞争失败增加 redbeat_lock_timeout任务重复实行系统时钟不同步部署 NTP 时间同步服务CPU 利用率高schedule_interval 过小适当增大查抄间隔 八、官方文档参考



  • RedBeat 官方文档
  • RedBeat GitHub 堆栈
  • Celery 分布式任务调理
第五部分 Celery Flower

一、Flower 核心特性



  • 及时监控:动态展示任务状态、Worker 心跳及队列详情
  • 任务追踪:记载任务历史,支持按结果、参数、状态筛选
  • 长途控制:支持终止任务、重启 Worker 等管理操作
  • Web API:提供 RESTful 接口,便于集成第三方监控系统
  • 可视化统计:生成任务实行时间分布图、成功率统计等图表
  • 安全机制:支持 Basic Auth、OAuth 等认证方式
  • 告警通知:可设置任务失败时触发邮件或 Webhook

二、核心参数详解

参数默认值说明--port5555监听端口--address0.0.0.0绑定 IP(0.0.0.0 表现全部网络接口)--authNone认证方式(格式:username:password)--basic_authNone多用户认证(格式:user1:pass1,user2:pass2)--persistentFalse启用持久化(生存状态至 SQLite)--dbflower持久化数据库文件名--max_tasks10000内存中生存的最大任务记载数--enable_eventsTrue及时接收 Celery 变乱--xheadersFalse支持署理服务器(如 Nginx)的 X-Forwarded 头--url_prefixNoneURL 前缀(用于反向署理场景,如 /flower)
三、设置方法

安装
  1. pip install flower
复制代码
下令行启动
  1. celery flower -A proj \
  2.   --port=5555 \
  3.   --basic_auth=admin:admin123,user:userpass \
  4.   --persistent=True \
  5.   --db=/data/flower.db
复制代码
四、核心操作示例

1. 任务管理

  1. # 通过 API 终止任务
  2. import requests
  3. resp = requests.post(
  4.     'http://localhost:5555/api/task/revoke/任务ID',
  5.     auth=('admin', 'admin123')
  6. )
  7. print(resp.json())
复制代码
2. 数据导出

  1. # 导出所有任务数据
  2. curl -u admin:admin123 http://localhost:5555/api/tasks > tasks.json
  3. # 查询指定 Worker 状态
  4. curl -u admin:admin123 http://localhost:5555/api/worker/worker1@host
复制代码
五、监控与告警

1. Prometheus 集成

  1. # Prometheus 配置
  2. scrape_configs:
  3.   - job_name: 'flower'
  4.     metrics_path: '/metrics'
  5.     static_configs:
  6.       - targets: ['flower-host:5555']
  7. # 关键指标:
  8. - `flower_workers_total` 存活 Worker 数
  9. - `flower_tasks_total` 历史任务总数
  10. - `flower_task_events` 任务状态变化计数器
复制代码
2. 告警规则

  1. # Alertmanager 配置示例
  2. - alert: CeleryWorkerDown
  3.   expr: flower_workers_total < 3
  4.   for: 5m
  5.   labels:
  6.     severity: critical
  7.   annotations:
  8.     summary: "Celery 节点宕机 (实例: {{ $labels.instance }})"
复制代码

六、最佳实践

1. 安全加固

  1. # 强制 HTTPS(需配合反向代理)
  2. flower --xheaders=true --url_prefix=flower
  3. # 使用 JWT 认证
  4. flower --auth_provider=flower.jwt.JWTAuth --auth=JWT_SECRET_KEY
复制代码
2. 高可用部署

  1. # 多实例负载均衡(需共享持久化存储)
  2. # 节点1
  3. flower --persistent --db=/shared-storage/flower.db --port=5555
  4. # 节点2
  5. flower --persistent --db=/shared-storage/flower.db --port=5555
复制代码
3. 数据清理策略

  1. # 自动清理旧数据(需自定义插件)
  2. from flower.models import Task
  3. from datetime import datetime, timedelta
  4. def cleanup_old_tasks():
  5.     cutoff = datetime.now() - timedelta(days=7)
  6.     Task.objects.filter(received__lt=cutoff).delete()
复制代码
七、故障排查指南

现象可能原因解决方案无法访问 UI防火墙限制查抄 --address 和 --port 设置图表数据不更新未启用变乱模式启动时添加 --enable_events=true认证失败特殊字符未转义利用 URL 编码处理密码中的 @ 和 :持久化数据丢失文件权限问题确保 --db 路径可写Prometheus 指标为空未袒露 metrics 端点确认启动参数无 --disable_metrics 八、官方文档参考



  • Flower 官方文档
  • Flower REST API 示例
  • Flower GitHub 堆栈
附录 Celery 详细设置参数说明

1. General settings(通用设置)

参数名默认值说明accept_content{'json'}答应的内容类型/序列化器列表result_accept_contentNone结果后端答应的内容类型/序列化器列表 2. Time and date settings(时间和日期设置)

参数名默认值说明enable_utcTrue是否利用 UTC 时间timezone"UTC"设置 Celery 利用的时区 3. Task settings(任务设置)

参数名默认值说明task_annotationsNone用于重写任务属性的注解task_compressionNone任务消息的默认压缩方式task_protocol2发送任务时利用的默认任务消息协议版本task_serializer"json"默认序列化方法task_publish_retryTrue发布任务消息时是否重试task_publish_retry_policy见文档重试发布任务消息的策略 4. Task execution settings(任务实行设置)

参数名默认值说明task_always_eagerFalse是否总是本地实行任务task_eager_propagatesFalse是否流传本地实行任务的异常task_store_eager_resultFalse是否生存本地实行任务的结果task_remote_tracebacksFalse是否在任务结果中包罗长途追踪信息task_ignore_resultFalse是否忽略任务的返回值task_store_errors_even_if_ignoredFalse是否即使忽略任务结果也存储错误task_track_startedFalse是否陈诉任务的 ‘started’ 状态task_time_limit无任务的硬时限task_soft_time_limit无任务的软时限task_acks_lateFalse是否在任务实行后确认消息task_acks_on_failure_or_timeoutTrue是否在任务失败或超时时确认消息task_reject_on_worker_lostFalse是否在工作进程丢失时拒绝任务task_default_rate_limit无全局默认任务速率限制 5. Task result backend settings(任务结果后端设置)

参数名默认值说明result_backend无存储任务结果的后端result_backend_always_retryFalse是否总是重试可恢复的异常result_backend_max_sleep_between_retries_ms10000重试之间的最大睡眠时间result_backend_base_sleep_between_retries_ms10重试之间的基础睡眠时间result_backend_max_retries无穷最大重试次数result_backend_thread_safeFalse后端对象是否线程安全result_backend_transport_options{}传递给底层传输的额外选项result_serializer"json"结果序列化格式result_compression无结果的可选压缩方式result_extendedFalse是否启用扩展任务结果属性result_expires86400存储的任务结果的过期时间result_cache_maxFalse客户端缓存结果的总数result_chord_join_timeout3.0在 chord 中连接结果的超时时间result_chord_retry_interval1.0chord 任务的默认重试间隔 6. Database backend settings(数据库后端设置)

参数名默认值说明database_engine_options{}SQLAlchemy 数据库引擎的额外选项database_short_lived_sessionsFalse是否利用短生命周期的会话database_table_schemas{}数据库结果后端的表架构自定义database_table_names{}数据库结果后端的表名自定义 7. RPC backend settings(RPC 后端设置)

参数名默认值说明result_persistentFalse是否持久化结果消息 8. Cache backend settings(缓存后端设置)

参数名默认值说明cache_backend_options{}传递给缓存后端的额外选项 9. MongoDB backend settings(MongoDB 后端设置)

mongodb_backend_settings = {设置字典}
参数名默认值说明database"celery"连接的数据库名称taskmeta_collection"celery_taskmeta"存储任务元数据的聚集名称max_pool_size10PyMongo 连接的最大池大小options{}传递给 MongoDB 连接构造函数的其他关键字参数 10. Redis backend settings(Redis 后端设置)

参数名默认值说明result_backend未设置Redis 后端连接 'redis://username:password@host:port/db'redis_backend_health_check_interval未设置Redis 后端支持健康查抄,设置为两次健康查抄之间的秒数redis_backend_use_sslFalseRedis 后端是否利用 SSL 连接redis_max_connections无穷制Redis 连接池的最大连接数redis_socket_connect_timeoutNoneRedis 套接字连接超时时间(秒)redis_socket_timeout120.0 秒Redis 套接字读写操作的超时时间redis_retry_on_timeoutFalse在 Redis 套接字超时时是否重试读写操作redis_socket_keepaliveFalse是否保持 Redis 套接字连接的活动状态 10. File-system backend settings(文件系统 后端设置)

参数名默认值说明result_backend未设置文件系统后端连接 'file:///var/celery/results' 11. Message Routing(消息路由)

参数名默认值说明task_queuesNone工作进程将从中消耗的任务队列列表task_routesNone用于将任务路由到队列的路由表task_queue_max_priorityNone队列的最大优先级task_default_priorityNone任务的默认优先级task_inherit_parent_priorityFalse子任务是否继承父任务的优先级worker_directFalse是否启用直接队列task_create_missing_queuesTrue是否自动创建缺失的队列task_default_queue"celery"默认队列名称task_default_exchange利用 task_default_queue 的值默认交换机名称task_default_exchange_type"direct"默认交换机类型task_default_routing_key利用 task_default_queue 的值默认路由键task_default_delivery_mode"persistent"默认投递模式 12. Broker settings(消息中间件设置)

参数名默认值说明broker_url"amqp://localhost"消息中间件地址broker_read_url从 broker_url 继承用于读取操作的 broker 连接地址broker_write_url从 broker_url 继承用于写入操作的 broker 连接地址broker_failover_strategy"round-robin"broker 连接失败时的备用策略broker_heartbeat120.0broker 连接的心跳间隔broker_heartbeat_checkrate2.0broker 心跳查抄的频率broker_use_sslFalse是否利用 SSL 连接 brokerbroker_pool_limit10broker 连接池的最大连接数broker_connection_timeout4.0broker 连接超时时间broker_connection_retryTrue是否在连接失败时重试连接broker_connection_retry_on_startupTrue在启动时是否重试连接broker_connection_max_retries100连接失败时的最大重试次数broker_channel_error_retryFalse是否在通道错误时重试连接broker_login_method"AMQPLAIN"broker 登录方法broker_transport_options{}传递给底层传输的额外选项 13. Worker(工作进程)

参数名默认值说明imports[]工作进程启动时导入的模块列表include[]与 imports 类似的模块导入列表worker_deduplicate_successful_tasksFalse是否查抄重复的任务worker_concurrencyCPU 核心数并发工作进程/线程数worker_prefetch_multiplier4每个工作进程预取的消息数worker_enable_prefetch_count_reductionTrue是否在连接丢失后淘汰预取数worker_lost_wait10.0等待丢失工作进程的时间worker_max_tasks_per_child无穷制每个工作进程的最大任务数worker_max_memory_per_child无穷制每个工作进程的最大内存worker_disable_rate_limitsFalse是否禁用全部任务的速率限制worker_state_dbNone存储持久工作进程状态的文件名worker_timer_precision1.0定时器的精度worker_enable_remote_controlTrue是否启用长途控制worker_proc_alive_timeout4.0等待新工作进程启动的时间worker_cancel_long_running_tasks_on_connection_lossFalse是否在连接丢失时取消长运行任务 14. Events(变乱)

参数名默认值说明worker_send_task_eventsFalse是否发送任务干系变乱task_send_sent_eventFalse是否发送任务发送变乱event_queue_ttl5.0变乱队列中消息的过期时间event_queue_expires60.0变乱队列的过期时间event_queue_prefix"celeryev"变乱接收队列名称的前缀event_exchange"celeryev"变乱交换机名称event_serializer"json"变乱消息的序列化格式events_logfileNone变乱日志文件路径events_pidfileNone变乱进程 ID 文件路径events_uidNone变乱进程的用户 IDevents_gidNone变乱进程的组 IDevents_umaskNone变乱进程创建文件时的 umaskevents_executableNone变乱进程利用的 Python 可实行路径 15. Remote Control Commands(长途控制下令)

参数名默认值说明control_queue_ttl300.0长途控制下令队列中消息的过期时间control_queue_expires10.0长途控制下令队列的过期时间control_exchange"celery"长途控制下令交换机名称 16. Logging(日志)

| worker_hijack_root_logger | True | 是否劫持根日志记载器 |
| worker_log_color | True | 是否在日志输出中利用颜色 |
| worker_log_format | "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s" | 工作进程日志格式 |
| worker_task_log_format | "[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s" | 任务日志格式 |
| worker_redirect_stdouts | True | 是否将尺度输出和尺度错误重定向到日志 |
| worker_redirect_stdouts_level | WARNING | 重定向的尺度输出和尺度错误的日志级别 |
17. Security(安全)

参数名默认值说明security_keyNone用于消息署名的私钥文件路径security_key_passwordNone私钥文件的密码security_certificateNone用于消息署名的证书文件路径security_cert_storeNone存储 X.509 证书的目录路径security_digestsha256消息署名利用的加密择要算法 18. Custom Component Classes(自定义组件类)

参数名默认值说明worker_pool"prefork"工作进程池的类名worker_pool_restartsFalse是否答应重启工作进程池worker_autoscaler"celery.worker.autoscale:Autoscaler"自动缩放器类名worker_consumer"celery.worker.consumer:Consumer"消耗者类名worker_timer"kombu.asynchronous.hub.timer:Timer"定时器类名worker_logfileNone工作进程日志文件路径worker_pidfileNone工作进程 ID 文件路径worker_uidNone工作进程的用户 IDworker_gidNone工作进程的组 IDworker_umaskNone工作进程创建文件时的 umaskworker_executableNone工作进程利用的 Python 可实行路径 19. Beat settings(定时任务设置)

参数名默认值说明beat_schedule{}定时任务调理表beat_scheduler"celery.beatersistentScheduler"定时任务调理器类名beat_schedule_filename"celerybeat-schedule"定时任务调理文件名beat_sync_every0定时任务同步间隔beat_max_loop_interval0定时任务循环的最大间隔beat_cron_starting_deadlineNone定时任务的启动克制时间beat_logfileNone定时任务日志文件路径beat_pidfileNone定时任务进程 ID 文件路径beat_uidNone定时任务进程的用户 IDbeat_gidNone定时任务进程的组 IDbeat_umaskNone定时任务进程创建文件时的 umaskbeat_executableNone定时任务进程利用的 Python 可实行路径 官方文档参考



  • Celery 参数列表

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

傲渊山岳

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表