马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
python基础代码、优化、扩展和监控的完整示例。此示例利用 Celery 共同 RabbitMQ 作为消息代理,实现异步使命的调度、重试、定时使命以及错误监控等功能。
项目结构
我们将项目结构组织如下,以便代码逻辑清楚且易于扩展:
- project/
- │
- ├── celery_app.py # Celery应用的配置和初始化
- ├── tasks.py # 异步任务的定义
- ├── monitor.py # 异常监控和报警
- └── main.py # 测试异步任务调用
复制代码 1. celery_app.py - 设置 Celery 应用
- # celery_app.py
- from celery import Celery
- from celery.schedules import crontab
- app = Celery('tasks', broker='amqp://localhost//', backend='redis://localhost')
- # 基础配置
- app.conf.update(
- result_expires=3600, # 任务结果过期时间
- task_acks_late=True, # 确保任务执行后才确认完成
- worker_prefetch_multiplier=1, # 单次预取任务数
- task_serializer='json', # 任务数据序列化格式
- result_serializer='json', # 任务结果序列化格式
- accept_content=['json'], # 仅接收json格式
- task_soft_time_limit=300, # 软超时时间
- task_time_limit=600, # 硬超时时间
- worker_hijack_root_logger=False, # 不劫持主日志
- worker_log_format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
- )
- # 定时任务配置
- app.conf.beat_schedule = {
- 'scheduled_add': {
- 'task': 'tasks.add',
- 'schedule': crontab(hour=7, minute=30, day_of_week=1),
- 'args': (16, 16),
- },
- }
- # 任务路由配置:不同的任务可以走不同的队列
- app.conf.task_routes = {
- 'tasks.add': {'queue': 'high_priority'},
- }
复制代码 2. tasks.py - 定义使命
- # tasks.py
- from celery_app import app
- from monitor import task_failure_handler
- import time
- # 定义基础任务
- @app.task(bind=True, max_retries=3)
- def add(self, x, y):
- try:
- time.sleep(5) # 模拟耗时任务
- return x + y
- except Exception as exc:
- raise self.retry(exc=exc, countdown=5) # 5秒后重试
复制代码 3. monitor.py - 监控与报警
- # monitor.py
- from celery.signals import task_failure
- @task_failure.connect
- def task_failure_handler(sender=None, exception=None, **kwargs):
- # 发送报警通知或记录错误日志
- print(f"[ALERT] Task {sender.name} failed due to {exception}")
复制代码 4. main.py - 测试使命调用
- # main.py
- from tasks import add
- from celery_app import app
- if __name__ == "__main__":
- # 启动异步任务
- result = add.delay(4, 6)
- print("Task state:", result.state) # 打印任务状态
- print("Result:", result.get()) # 获取任务结果(阻塞等待)
-
- # 组合任务示例:Group
- from celery import group
- group_tasks = group(add.s(i, i) for i in range(10))
- group_result = group_tasks.apply_async()
- print("Group Result:", group_result.get())
-
- # 链式任务示例:Chord
- from celery import chord
- callback = add.s(10, 20)
- chord_tasks = chord((add.s(i, i) for i in range(10)), callback)
- chord_result = chord_tasks.apply_async()
- print("Chord Result:", chord_result.get())
复制代码 运行和监控
- 启动 RabbitMQ 服务:
- sudo service rabbitmq-server start
复制代码 - 启动 Celery Worker:
运行以下命令,指定 high_priority 队列处理高优先级使命。
- celery -A celery_app worker -Q high_priority,default -l info
复制代码 - 启动 Celery Beat(用于调度定时使命):
- celery -A celery_app beat -l info
复制代码 - 启动 Flower 及时监控(可选):
- celery -A celery_app flower --port=5555
复制代码 访问 http://localhost:5555 进行使命和 worker 状态的及时监控。
- Prometheus 和 Grafana 监控(可选):
设置 Celery 的自定义事件,并利用 Prometheus 采集数据,再通过 Grafana 可视化 Celery 的性能指标。
此示例项目具有以下特性:
- 异步使命:通过 delay() 方法调用。
- 重试机制:在使命非常时自动重试。
- 使命调度:支持定时使命,利用 Celery Beat 实现周期性使命调度。
- 报警机制:在使命失败时发送报警或日志记录。
- 监控系统:利用 Flower 进行及时监控,支持 Prometheus 和 Grafana 扩展。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |