在flask微服务中使用调度器设置定时任务:BackgroundScheduler
- #!/usr/bin/env python3
- # -*- coding: UTF-8 -*-
- # filename: main.py
- import os
- import unittest
- from app import create_app, db
- from app.route import route
- from app.service.score_service import send_score_mail
- from apscheduler.schedulers.background import BackgroundScheduler
- # 环境变量
- env = os.getenv('RUNTIME_ENV') or 'dev'
- # 创建应用
- app = create_app(env)
- # 服务路由
- route(app)
- def start_scheduler():
- # 创建调度器
- scheduler = BackgroundScheduler(timezone='Asia/Shanghai')
- # 添加定时任务
- scheduler.add_job(send_score_mail, 'cron', hour=7, minute=0, day_of_week='mon')
- # 启动调度器
- scheduler.start()
- return scheduler
- # 启动调度器(放在一个函数中,但在这里仍然调用它以保持现有行为)
- _scheduler = start_scheduler()
- @app.cli.command()
- def create():
- """ Create all database. """
- db.drop_all()
- db.create_all()
- db.session.commit()
- @app.cli.command()
- def test():
- """ Run service unittest. """
- ut = unittest.TextTestRunner()
- ut.run(unittest.TestLoader().discover('app/test'))
- if __name__ == '__main__':
- app.run(debug=False, host='0.0.0.0', port=5003) # 此处debug为False时定时任务只会执行一次,debug为True时会执行2次
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |