FastAPI 结合 JWT
JWT(JSON Web Token)是一种基于 JSON 的开放标准(RFC 7519),用于在各方之间传递安全可靠的信息。JWT 可以签名(使用 HMAC 算法或 RSA 等),从而可以验证内容是否被窜改。JWT 通常用于认证和授权流程。
步调
在 FastAPI 中,JWT 主要用于保护 API 路由,使其只答应颠末身份验证的用户访问。认证流程大致如下:
- **用户登录:**用户通过提交用户名和密码获取 JWT。
- **获取 Token:**服务器验证用户凭据后,生成并返回 JWT 给用户。
- **访问受保护的路由:**用户在访问受保护的路由时,必要在请求头中携带该 JWT。
- **Token 验证:**服务器验证 JWT 的正当性和有效性,答应或拒绝访问受保护资源。
安装
- pip install fastapi uvicorn pyjwt
复制代码 步调
导入必要的模块
- from fastapi import FastAPI, Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from pydantic import BaseModel
- from datetime import datetime, timedelta, timezone
- import jwt
复制代码 设置配置和初始化应用
- SECRET_KEY = "your_secret_key" # 用于签名 JWT 的密钥
- ALGORITHM = "HS256" # 加密算法
- ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 过期时间
- app = FastAPI()
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
复制代码 创建数据模型
- class Token(BaseModel):
- access_token: str
- token_type: str
- class TokenData(BaseModel):
- username: str | None = None
- class User(BaseModel):
- username: str
- email: str | None = None
- full_name: str | None = None
- disabled: bool | None = None
- class UserInDB(User):
- hashed_password: str
复制代码 实现辅助函数
生成 JWT Token
- def create_access_token(data: dict, expires_delta: timedelta | None = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.now(timezone.utc) + expires_delta
- else:
- expire = datetime.now(timezone.utc) + timedelta(minutes=15)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
复制代码 获取用户数据
- fake_users_db = {
- "testuser": {
- "username": "testuser",
- "full_name": "Test User",
- "email": "testuser@example.com",
- "hashed_password": "fakehashedpassword",
- "disabled": False,
- }
- }
- def get_user(db, username: str):
- if username in db:
- user_dict = db[username]
- return UserInDB(**user_dict)
复制代码 验证密码
- def verify_password(plain_password, hashed_password):
- return plain_password == hashed_password
复制代码 获取当前用户
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- token_data = TokenData(username=username)
- except jwt.PyJWTError:
- raise credentials_exception
- user = get_user(fake_users_db, username=token_data.username)
- if user is None:
- raise credentials_exception
- return user
- async def get_current_active_user(current_user: User = Depends(get_current_user)):
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
复制代码 用户登录获取 Token
- @app.post("/token", response_model=Token)
- async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
- user = get_user(fake_users_db, form_data.username)
- if not user or not verify_password(form_data.password, user.hashed_password):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"WWW-Authenticate": "Bearer"},
- )
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
- return {"access_token": access_token, "token_type": "bearer"}
复制代码 受保护的路由示例
- @app.get("/users/me/", response_model=User)
- async def read_users_me(current_user: User = Depends(get_current_active_user)):
- return current_user
复制代码 全部代码
- from fastapi import FastAPI, Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from pydantic import BaseModel
- from datetime import datetime, timedelta, timezone
- import uvicorn
- import jwt
- import os
- # JWT 相关配置
- SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3" # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中)
- ALGORITHM = "HS256" # 使用的加密算法
- ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token 的有效时间,以分钟为单位
- app = FastAPI() # 创建 FastAPI 应用实例
- # OAuth2PasswordBearer 实例,用于依赖项
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
- # 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息
- fake_users_db = {
- "testuser": {
- "username": "testuser",
- "full_name": "Test User",
- "email": "testuser@example.com",
- "hashed_password": "fakehashedpassword", # 在实际应用中,存储经过哈希处理的密码
- "disabled": False, # 用户是否被禁用
- }
- }
- # Pydantic 模型,用于定义请求和响应的数据结构
- class Token(BaseModel):
- access_token: str # Token 字符串
- token_type: str # Token 类型(一般为 "bearer")
- class TokenData(BaseModel):
- username: str | None = None # 从 Token 中提取的用户名
- class User(BaseModel):
- username: str # 用户名
- email: str | None = None # 邮箱地址,可选
- full_name: str | None = None # 用户全名,可选
- disabled: bool | None = None # 用户是否被禁用,可选
- class UserInDB(User):
- hashed_password: str # 存储在数据库中的哈希密码
- # 生成 JWT Token 的函数
- def create_access_token(data: dict, expires_delta: timedelta | None = None):
- """
- 生成 JWT Token。
- 参数:
- - data (dict): 要编码到 JWT 中的数据。
- - expires_delta (timedelta, 可选): Token 的过期时间。
- 返回:
- - str: 编码后的 JWT 字符串。
- """
- to_encode = data.copy() # 创建副本,以避免修改原始数据
- if expires_delta:
- expire = datetime.now(timezone.utc) + expires_delta
- else:
- expire = datetime.now(timezone.utc) + timedelta(minutes=15)
- to_encode.update({"exp": expire}) # 添加过期时间到 JWT 数据中
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 生成 JWT
- return encoded_jwt
- # 从假数据库获取用户信息
- def get_user(db, username: str):
- """
- 从数据库中获取用户信息。
- 参数:
- - db (dict): 用户数据库(在本例中为假数据)。
- - username (str): 用户名。
- 返回:
- - UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。
- """
- if username in db:
- user_dict = db[username]
- return UserInDB(**user_dict)
- # 验证用户密码
- def verify_password(plain_password, hashed_password):
- """
- 验证用户密码。
- 参数:
- - plain_password (str): 用户输入的明文密码。
- - hashed_password (str): 存储在数据库中的哈希密码。
- 返回:
- - bool: 密码匹配返回 True,否则返回 False。
- """
- return plain_password == hashed_password # 在实际应用中,这里应该使用哈希函数进行比较
- # 验证 Token 并获取当前用户
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- """
- 从 JWT Token 中提取用户信息,并验证 Token 的合法性。
- 参数:
- - token (str): JWT Token。
- 返回:
- - User: 返回当前用户信息。
- 抛出:
- - HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。
- """
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # 解码 JWT
- username: str = payload.get("sub") # 获取 JWT 中的用户名
- if username is None:
- raise credentials_exception
- token_data = TokenData(username=username)
- except jwt.PyJWTError:
- raise credentials_exception
- user = get_user(fake_users_db, username=token_data.username) # 获取用户信息
- if user is None:
- raise credentials_exception
- return user
- # 验证用户是否被禁用
- async def get_current_active_user(current_user: User = Depends(get_current_user)):
- """
- 验证当前用户是否被禁用。
- 参数:
- - current_user (User): 当前用户信息。
- 返回:
- - User: 如果用户未被禁用,返回用户信息。
- 抛出:
- - HTTPException: 当用户被禁用时抛出 400 错误。
- """
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
- # 用户登录获取 Token
- @app.post("/token", response_model=Token)
- async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
- """
- 用户登录接口,用于获取 JWT Token。
- 参数:
- - form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。
- 返回:
- - dict: 包含 access_token 和 token_type 的响应数据。
- 抛出:
- - HTTPException: 当用户名或密码错误时抛出 401 错误。
- """
- user = get_user(fake_users_db, form_data.username)
- if not user or not verify_password(form_data.password, user.hashed_password):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"WWW-Authenticate": "Bearer"},
- )
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 设置 Token 过期时间
- access_token = create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
- return {"access_token": access_token, "token_type": "bearer"}
- # 受保护的路由示例
- @app.get("/users/me/", response_model=User)
- async def read_users_me(current_user: User = Depends(get_current_active_user)):
- """
- 获取当前用户信息的受保护路由。
- 参数:
- - current_user (User): 当前登录的用户信息(通过 JWT 验证)。
- 返回:
- - User: 返回当前用户的信息。
- """
- return current_user
- # 运行应用
- if __name__ == "__main__":
- uvicorn.run(
- f"{os.path.basename(__file__).split('.')[0]}:app",
- host="127.0.0.1",
- port=8000,
- reload=True, # 启用自动重载
- )
复制代码 测试
获取 Token
访问受保护的路由
token正确
token错误
总结
JWT(JSON Web Token)是一种用于安全地在各方之间传递信息的开放标准,通常用于用户认证和授权。它将用户信息编码为一个签名的令牌,客户端可以使用该令牌访问受保护的资源。
在 FastAPI 中,JWT 用于保护 API 路由。用户通过提交用户名和密码获取 JWT,客户端在后续请求中使用该令牌进行身份验证。服务器验证令牌的正当性后,答应用户访问受保护的资源。
实现 JWT 认证的步调包括安装必要的依靠项、配置 JWT 设置(如密钥和算法)、定义数据模型、实现辅助函数(如生成和验证 JWT 的函数),并设置受保护的 API 路由。
- JWT 配置:包括密钥(SECRET_KEY)和加密算法(ALGORITHM),用于生成和验证令牌。
- 辅助函数:如生成 JWT、验证密码、获取用户信息等,用于处置处罚认证逻辑。
- API 路由:包含登录接口,用于生成令牌,以及受保护的路由,只答应携带有效 JWT 的请求访问。
使用 Postman 或其他工具可以测试 JWT 的生成和验证过程,确保只有颠末身份验证的用户才气访问受保护的 API 路由。
注意
示例中密码并没有颠末hash加密,现实应用中要加密的。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |