FastAPI 结合 JWT

打印 上一主题 下一主题

主题 974|帖子 974|积分 2932

FastAPI 结合 JWT

   JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被窜改。JWT 通常用于认证和授权流程。
  步调

   在 FastAPI 中,JWT 主要用于保护 API 路由,使其只答应颠末身份验证的用户访问。认证流程大致如下:
  

  • **用户登录:**用户通过提交用户名和密码获取 JWT。
  • **获取 Token:**服务器验证用户凭据后,生成并返回 JWT 给用户。
  • **访问受保护的路由:**用户在访问受保护的路由时,必要在请求头中携带该 JWT。
  • **Token 验证:**服务器验证 JWT 的正当性和有效性,答应或拒绝访问受保护资源。
安装

  1. pip install fastapi uvicorn pyjwt
复制代码
步调

导入必要的模块

  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  3. from pydantic import BaseModel
  4. from datetime import datetime, timedelta, timezone
  5. import jwt
复制代码
设置配置和初始化应用

  1. SECRET_KEY = "your_secret_key"  # 用于签名 JWT 的密钥
  2. ALGORITHM = "HS256"             # 加密算法
  3. ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 过期时间
  4. app = FastAPI()
  5. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
复制代码
创建数据模型

  1. class Token(BaseModel):
  2.     access_token: str
  3.     token_type: str
  4. class TokenData(BaseModel):
  5.     username: str | None = None
  6. class User(BaseModel):
  7.     username: str
  8.     email: str | None = None
  9.     full_name: str | None = None
  10.     disabled: bool | None = None
  11. class UserInDB(User):
  12.     hashed_password: str
复制代码
实现辅助函数

生成 JWT Token

  1. def create_access_token(data: dict, expires_delta: timedelta | None = None):
  2.     to_encode = data.copy()
  3.     if expires_delta:
  4.         expire = datetime.now(timezone.utc) + expires_delta
  5.     else:
  6.         expire = datetime.now(timezone.utc) + timedelta(minutes=15)
  7.     to_encode.update({"exp": expire})
  8.     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  9.     return encoded_jwt
复制代码
获取用户数据

  1. fake_users_db = {
  2.     "testuser": {
  3.         "username": "testuser",
  4.         "full_name": "Test User",
  5.         "email": "testuser@example.com",
  6.         "hashed_password": "fakehashedpassword",
  7.         "disabled": False,
  8.     }
  9. }
  10. def get_user(db, username: str):
  11.     if username in db:
  12.         user_dict = db[username]
  13.         return UserInDB(**user_dict)
复制代码
验证密码

  1. def verify_password(plain_password, hashed_password):
  2.     return plain_password == hashed_password
复制代码
获取当前用户

  1. async def get_current_user(token: str = Depends(oauth2_scheme)):
  2.     credentials_exception = HTTPException(
  3.         status_code=status.HTTP_401_UNAUTHORIZED,
  4.         detail="Could not validate credentials",
  5.         headers={"WWW-Authenticate": "Bearer"},
  6.     )
  7.     try:
  8.         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  9.         username: str = payload.get("sub")
  10.         if username is None:
  11.             raise credentials_exception
  12.         token_data = TokenData(username=username)
  13.     except jwt.PyJWTError:
  14.         raise credentials_exception
  15.     user = get_user(fake_users_db, username=token_data.username)
  16.     if user is None:
  17.         raise credentials_exception
  18.     return user
  19. async def get_current_active_user(current_user: User = Depends(get_current_user)):
  20.     if current_user.disabled:
  21.         raise HTTPException(status_code=400, detail="Inactive user")
  22.     return current_user
复制代码
用户登录获取 Token

  1. @app.post("/token", response_model=Token)
  2. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
  3.     user = get_user(fake_users_db, form_data.username)
  4.     if not user or not verify_password(form_data.password, user.hashed_password):
  5.         raise HTTPException(
  6.             status_code=status.HTTP_401_UNAUTHORIZED,
  7.             detail="Incorrect username or password",
  8.             headers={"WWW-Authenticate": "Bearer"},
  9.         )
  10.     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  11.     access_token = create_access_token(
  12.         data={"sub": user.username}, expires_delta=access_token_expires
  13.     )
  14.     return {"access_token": access_token, "token_type": "bearer"}
复制代码
受保护的路由示例

  1. @app.get("/users/me/", response_model=User)
  2. async def read_users_me(current_user: User = Depends(get_current_active_user)):
  3.     return current_user
复制代码
全部代码

  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  3. from pydantic import BaseModel
  4. from datetime import datetime, timedelta, timezone
  5. import uvicorn
  6. import jwt
  7. import os
  8. # JWT 相关配置
  9. SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3"  # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中)
  10. ALGORITHM = "HS256"  # 使用的加密算法
  11. ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 的有效时间,以分钟为单位
  12. app = FastAPI()  # 创建 FastAPI 应用实例
  13. # OAuth2PasswordBearer 实例,用于依赖项
  14. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  15. # 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息
  16. fake_users_db = {
  17.     "testuser": {
  18.         "username": "testuser",
  19.         "full_name": "Test User",
  20.         "email": "testuser@example.com",
  21.         "hashed_password": "fakehashedpassword",  # 在实际应用中,存储经过哈希处理的密码
  22.         "disabled": False,  # 用户是否被禁用
  23.     }
  24. }
  25. # Pydantic 模型,用于定义请求和响应的数据结构
  26. class Token(BaseModel):
  27.     access_token: str  # Token 字符串
  28.     token_type: str  # Token 类型(一般为 "bearer")
  29. class TokenData(BaseModel):
  30.     username: str | None = None  # 从 Token 中提取的用户名
  31. class User(BaseModel):
  32.     username: str  # 用户名
  33.     email: str | None = None  # 邮箱地址,可选
  34.     full_name: str | None = None  # 用户全名,可选
  35.     disabled: bool | None = None  # 用户是否被禁用,可选
  36. class UserInDB(User):
  37.     hashed_password: str  # 存储在数据库中的哈希密码
  38. # 生成 JWT Token 的函数
  39. def create_access_token(data: dict, expires_delta: timedelta | None = None):
  40.     """
  41.     生成 JWT Token。
  42.     参数:
  43.     - data (dict): 要编码到 JWT 中的数据。
  44.     - expires_delta (timedelta, 可选): Token 的过期时间。
  45.     返回:
  46.     - str: 编码后的 JWT 字符串。
  47.     """
  48.     to_encode = data.copy()  # 创建副本,以避免修改原始数据
  49.     if expires_delta:
  50.         expire = datetime.now(timezone.utc) + expires_delta
  51.     else:
  52.         expire = datetime.now(timezone.utc) + timedelta(minutes=15)
  53.     to_encode.update({"exp": expire})  # 添加过期时间到 JWT 数据中
  54.     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # 生成 JWT
  55.     return encoded_jwt
  56. # 从假数据库获取用户信息
  57. def get_user(db, username: str):
  58.     """
  59.     从数据库中获取用户信息。
  60.     参数:
  61.     - db (dict): 用户数据库(在本例中为假数据)。
  62.     - username (str): 用户名。
  63.     返回:
  64.     - UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。
  65.     """
  66.     if username in db:
  67.         user_dict = db[username]
  68.         return UserInDB(**user_dict)
  69. # 验证用户密码
  70. def verify_password(plain_password, hashed_password):
  71.     """
  72.     验证用户密码。
  73.     参数:
  74.     - plain_password (str): 用户输入的明文密码。
  75.     - hashed_password (str): 存储在数据库中的哈希密码。
  76.     返回:
  77.     - bool: 密码匹配返回 True,否则返回 False。
  78.     """
  79.     return plain_password == hashed_password  # 在实际应用中,这里应该使用哈希函数进行比较
  80. # 验证 Token 并获取当前用户
  81. async def get_current_user(token: str = Depends(oauth2_scheme)):
  82.     """
  83.     从 JWT Token 中提取用户信息,并验证 Token 的合法性。
  84.     参数:
  85.     - token (str): JWT Token。
  86.     返回:
  87.     - User: 返回当前用户信息。
  88.     抛出:
  89.     - HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。
  90.     """
  91.     credentials_exception = HTTPException(
  92.         status_code=status.HTTP_401_UNAUTHORIZED,
  93.         detail="Could not validate credentials",
  94.         headers={"WWW-Authenticate": "Bearer"},
  95.     )
  96.     try:
  97.         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])  # 解码 JWT
  98.         username: str = payload.get("sub")  # 获取 JWT 中的用户名
  99.         if username is None:
  100.             raise credentials_exception
  101.         token_data = TokenData(username=username)
  102.     except jwt.PyJWTError:
  103.         raise credentials_exception
  104.     user = get_user(fake_users_db, username=token_data.username)  # 获取用户信息
  105.     if user is None:
  106.         raise credentials_exception
  107.     return user
  108. # 验证用户是否被禁用
  109. async def get_current_active_user(current_user: User = Depends(get_current_user)):
  110.     """
  111.     验证当前用户是否被禁用。
  112.     参数:
  113.     - current_user (User): 当前用户信息。
  114.     返回:
  115.     - User: 如果用户未被禁用,返回用户信息。
  116.     抛出:
  117.     - HTTPException: 当用户被禁用时抛出 400 错误。
  118.     """
  119.     if current_user.disabled:
  120.         raise HTTPException(status_code=400, detail="Inactive user")
  121.     return current_user
  122. # 用户登录获取 Token
  123. @app.post("/token", response_model=Token)
  124. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
  125.     """
  126.     用户登录接口,用于获取 JWT Token。
  127.     参数:
  128.     - form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。
  129.     返回:
  130.     - dict: 包含 access_token 和 token_type 的响应数据。
  131.     抛出:
  132.     - HTTPException: 当用户名或密码错误时抛出 401 错误。
  133.     """
  134.     user = get_user(fake_users_db, form_data.username)
  135.     if not user or not verify_password(form_data.password, user.hashed_password):
  136.         raise HTTPException(
  137.             status_code=status.HTTP_401_UNAUTHORIZED,
  138.             detail="Incorrect username or password",
  139.             headers={"WWW-Authenticate": "Bearer"},
  140.         )
  141.     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # 设置 Token 过期时间
  142.     access_token = create_access_token(
  143.         data={"sub": user.username}, expires_delta=access_token_expires
  144.     )
  145.     return {"access_token": access_token, "token_type": "bearer"}
  146. # 受保护的路由示例
  147. @app.get("/users/me/", response_model=User)
  148. async def read_users_me(current_user: User = Depends(get_current_active_user)):
  149.     """
  150.     获取当前用户信息的受保护路由。
  151.     参数:
  152.     - current_user (User): 当前登录的用户信息(通过 JWT 验证)。
  153.     返回:
  154.     - User: 返回当前用户的信息。
  155.     """
  156.     return current_user
  157. # 运行应用
  158. if __name__ == "__main__":
  159.     uvicorn.run(
  160.         f"{os.path.basename(__file__).split('.')[0]}:app",
  161.         host="127.0.0.1",
  162.         port=8000,
  163.         reload=True,  # 启用自动重载
  164.     )
复制代码
测试

获取 Token


访问受保护的路由

token正确


token错误


总结

   JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。
    在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的正当性后,答应用户访问受保护的资源。
    实现 JWT 认证的步调包括安装必要的依靠项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。
   

  • JWT 配置:包括密钥(SECRET_KEY)和加密算法(ALGORITHM),用于生成和验证令牌。
  • 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处置处罚认证逻辑。
  • API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只答应携带有效 JWT 的请求访问。
    使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有颠末身份验证的用户才气访问受保护的 API 路由。
  注意

   示例中密码并没有颠末hash加密,现实应用中要加密的。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

来自云龙湖轮廓分明的月亮

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表