FastAPI集成APsecheduler的BackgroundScheduler+mongodb(精简)

[复制链接]
发表于 2025-7-9 02:25:31 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
项目架构:
    FastAPI(folder)
                 >app(folder)
                 >core(folder)
                 >models(folder)
                 >routers(folder)
                 >utils(folder)
                   main.py(file)
1 utils文件夹下新建schedulers.py   
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
#特别阐明:此处显示指定DB,会在DB里创建数据库apscheduler_db
jobstores={
        'default':MongoDBJobStore(
              database='apscheduler_db',
              collection='custom_jobs',  
              host='localhost',
              port=27017
        )
}
#特别阐明:replace_existing=True会覆盖同名的JOB,但不影响数据库中的,仅处理job_id相同的冲突
scheduler=BackgroundScheduler(jobstores=jobstores,replace_existing=True)
2  main.py中在lifespan上下文初始化和关闭scheduler
import uvicorn
from contextlib import asynccontextmanager
from app.utils.schedulers import jobstores

scheduler=None
#特别阐明:yield中可以监控监控到正常竣事好比ctrl+c,非常竣事不能执行yield后代码
@asynccontextmanager
async def lifespan(app:FastAPI):
    jobstores['default'].remove_all_jobs()
    from app.utils.schedulers import scheduler
    yield
    scheduler.remove_all_jobs()
    scheduler.shutdown(wait=False)

app=FastAPI(lifespan=lifespan)
 3 models文件夹新建scheduler.py文件配置基础参数类和默认值
from pydantic import BaseModel

class job_config(BaseModel):
        job_id:str="default"
        job_name:str="default"
        trigger_type:str="interval"
        trigger_kwargs:dict={}
        seconds:int=30
        pass
4 api文件夹下添加sechedulers.py配置添加创建job方法
from app.utils.schedulers import scheduler
from app.models.schedulers import job_config
from fastapi import APIRouter
from typing import Coroutine,Callable
from datetime import datetime

@router.get("create_job")
def create_job(jobconfig,func):
    try:
        if jobconfig is None:
           jobconfig=job_config()

        scheduler.add_job(
            func,
            trigger=jobconfig.trigger_type,
            kwargs=jobconfig.trigger_kwargs,#特别阐明:这里可以添加自定义参数
            id=jobconfig.job_id,
            name=jobconfig.job_name,
            seconds=jobconfig.seconds
        )
       
        scheduler.start()
    except Exception as e:
        raise e
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
 5 测试,调用test方法
@router.get("/function")
def function1():
    try:
        with open("D:\\demo.txt", "a") as file:
             print("写入文件"+ str(datetime.now()), file=file)
    except:
        pass

@router.get("/test")
def test():
    try:
        create_job(None,function1)
    except:
        pass


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告
回复

使用道具 举报

© 2001-2025 Discuz! Team. Powered by Discuz! X3.5

GMT+8, 2025-7-25 08:23 , Processed in 0.076558 second(s), 30 queries 手机版|qidao123.com技术社区-IT企服评测▪应用市场 ( 浙ICP备20004199 )|网站地图

快速回复 返回顶部 返回列表