ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【Django技能深潜】揭秘Django定时任务利器:django_apscheduler全面解析与 [打印本页]

作者: 惊雷无声    时间: 2024-6-1 22:05
标题: 【Django技能深潜】揭秘Django定时任务利器:django_apscheduler全面解析与
在现代Web开辟中,定时任务是不可或缺的一部门,无论是定期数据分析、定时发送邮件、照旧系统维护脚本,都需要精准的定时调度。Django作为Python天下中强大的Web框架,其对定时任务的支持自然也是开辟者关注的重点。本文将深入探讨Django定时任务解决方案,特别是聚焦于django_apscheduler这一强大扩展库,带您领略其背后的运行原理与实战应用,助您在Django项目中高效驾驭定时任务。
一、Django定时任务组件概览

在Django天下中,实现定时任务重要有以下几种方式:
  1. import time
  2. # 定时任务1
  3. def task1():
  4.     print("hello,world")
  5. # 定时任务2
  6. def task2():
  7.     print("hello,world")
  8. if __name__ == '__main__':
  9.     while True:
  10.         task1()
  11.         task1()
  12.         time.sleep(7) # 调度频率:每7秒调度一次
复制代码
二、为何选择django_apscheduler库

相较于其他几种方案,django_apscheduler凭借其与Django的集成度、强大的功能、机动的配置等特点脱颖而出:
三、django_apscheduler运行原理探秘

django_apscheduler的核心在于将APScheduler与Django框架精密联合。其背后的工作机制重要包罗:
四、实战演练:用django_apscheduler构建定时任务

接下来,让我们通过一个简单示例,感受django_apscheduler的实战魅力:
  1. pip install django-apscheduler
复制代码
  1. INSTALLED_APPS = [
  2.     'django.contrib.admin',
  3.     'django.contrib.auth',
  4.     'django.contrib.contenttypes',
  5.     'django.contrib.sessions',
  6.     'django.contrib.messages',
  7.     'django.contrib.staticfiles',
  8.     # ....其他APP....
  9.     'django_apscheduler',
  10.     # ....其他APP....
  11. ]
  12. # 时间格式化
  13. APSCHEDULER_DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S.%f'
  14. # 任务超时时间(单位:秒)
  15. APSCHEDULER_RUN_NOW_TIMEOUT = 600
复制代码
  1. python manage.py migrate
复制代码

数据表的界说如下:
  1. -- SELECT sql FROM sqlite_master WHERE type='table' AND name='django_apscheduler_djangojob';
  2. CREATE TABLE "django_apscheduler_djangojob"
  3. (
  4.     "id"            varchar(255) NOT NULL PRIMARY KEY,
  5.     "next_run_time" datetime NULL,
  6.     "job_state"     BLOB         NOT NULL
  7. );
  8. -- SELECT sql FROM sqlite_master WHERE type='table' AND name='django_apscheduler_djangojobexecution';
  9. CREATE TABLE "django_apscheduler_djangojobexecution"
  10. (
  11.     "id"        integer      NOT NULL PRIMARY KEY AUTOINCREMENT,
  12.     "status"    varchar(50)  NOT NULL,
  13.     "run_time"  datetime     NOT NULL,
  14.     "duration"  decimal NULL,
  15.     "finished"  decimal NULL,
  16.     "exception" varchar(1000) NULL,
  17.     "traceback" text NULL,
  18.     "job_id"    varchar(255) NOT NULL REFERENCES "django_apscheduler_djangojob" ("id") DEFERRABLE INITIALLY DEFERRED,
  19.     CONSTRAINT "unique_job_executions" UNIQUE ("job_id", "run_time")
  20. );
复制代码
  1. # ./task/task_list.py
  2. from datetime import datetime
  3. def print_task():
  4.     print('Scheduler测试任务执行:{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
复制代码
在taskApp中,创建如下效果目次,配置Django自界说管理命令:
  1. $ tree management/
  2. management/
  3. ├── __init__.py
  4. └── commands
  5.     ├── __init__.py
  6.     └── start_tasks.py
复制代码
  1. # ./task/management/commands/start_tasks.py
  2. from datetime import datetime
  3. from apscheduler.executors.pool import ThreadPoolExecutor
  4. from apscheduler.schedulers.blocking import BlockingScheduler
  5. from apscheduler.triggers.interval import IntervalTrigger
  6. from django.conf import settings
  7. from django.core.management.base import BaseCommand
  8. from django_apscheduler import util
  9. from django_apscheduler.jobstores import DjangoJobStore
  10. from django_apscheduler.models import DjangoJobExecution
  11. from ...task_list import print_task
  12. #
  13. # Django manage.py命令:存储定时任务信息
  14. #
  15. class Command(BaseCommand):
  16.     help = '启动定时任务.'
  17.     def handle(self, *args, **options):
  18.         # 调度器
  19.         scheduler = BlockingScheduler(timezone=settings.TIME_ZONE) # 研发阶段使用
  20.         # scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE) # 生产阶段使用
  21.         # 任务存储
  22.         scheduler.add_jobstore(DjangoJobStore(), 'SQLiteJobStore')
  23.         # 配置线程池执行器,限制最大并发数为1,防止并发
  24.         executor = ThreadPoolExecutor(max_workers=1)
  25.         scheduler.executor = executor
  26.         # 注册定义任务
  27.         id_print_task =  'print_task__job'
  28.         print('开始-增加任务({})'.format(id_print_task))
  29.         scheduler.add_job(
  30.             print_task,
  31.             id=id_print_task,
  32.             name=id_print_task,
  33.             max_instances=1,
  34.             replace_existing=True,
  35.             trigger=IntervalTrigger(seconds=15, start_date=datetime.now(), ), # 从当前时间开始,每15秒钟调度一次
  36.         )
  37.         print('完成-增加任务({})'.format(id_print_task))
  38.         # 启动定时任务
  39.         try:
  40.             scheduler.start()
  41.         except KeyboardInterrupt:
  42.             scheduler.shutdown()
复制代码
至此,任务配置完成:每15秒钟调度一次我们自界说任务。
  1. python manage.py start_tasks
复制代码
由于我们用的调度器是BlockingScheduler,启动之后命令行不退出,在DEV研发阶段,建议采用这个调度器;生成环境,建议采用BackgroundScheduler调度器,通过配景守护进程执行定时任务。
五、总结

django_apscheduler以其高度集成、机动配置和强大的功能,成为Django项目中定时任务解决方案的优选。它不仅简化了定时任务的实现,还提升了任务管理的便捷性和系统的稳定性。无论你是初次接触定时任务的新手,照旧寻求高效解决方案的老手,django_apscheduler都是值得深入了解和掌握的工具。希望本文能为你在Django定时任务的探索之路上点亮一盏明灯。

关注本公众号,我们共同学习进步
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4