一文把握异步web框架FastAPI(六)-- 安全(HTTP验证、Bearer Token、Sessi ...

打印 上一主题 下一主题

主题 967|帖子 967|积分 2901

目录

  
九、安全
1、HTTP 基自己份验证(Basic Authentication)
2、Bearer Token
3、Session 管理
1)简单实现
2)当需要session验证的接口会有多个时,使用中间件来做验证。
3)当有大量接口,此中一些需要登录而另一些不需要时,可以使用FastAPI的依赖注入系统来管理这些接口的访问控制。
4、OAuth2 和 OpenID Connect在fastapi中的应用
1)OAuth2
2)OpenID Connect
5、HTTPS 和 TLS 加密
1)获取 SSL/TLS 证书
自署名证书生成(仅用于开辟测试)
2)设置 FastAPI 使用 HTTPS
3)设置 TLS 加密
4)生产环境摆设发起
6、Rate Limiting (速率限定)


九、安全

1、HTTP 基自己份验证(Basic Authentication)

这是最简单的身份验证形式之一,它要求客户端发送带有用户名和暗码的 HTTP 头部。FastAPI 可以很容易地集成这种身份验证方式。使用 hashlib 对暗码进行哈希处理处罚以提高安全性。
  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from fastapi.security import HTTPBasic, HTTPBasicCredentials
  3. from hashlib import sha256
  4. app = FastAPI()
  5. security = HTTPBasic()
  6. # 假设我们存储的是密码的哈希值
  7. STORED_USERNAME = "john"
  8. STORED_PASSWORD_HASH = sha256("secret".encode()).hexdigest()
  9. def get_current_username(credentials: HTTPBasicCredentials = Depends(security)):
  10.     """获取当前用户名,并验证提供的凭据是否正确。"""
  11.     correct_username = credentials.username == STORED_USERNAME
  12.     password_hash = sha256(credentials.password.encode()).hexdigest()
  13.     correct_password = password_hash == STORED_PASSWORD_HASH
  14.     if not (correct_username and correct_password):
  15.         raise HTTPException(
  16.             status_code=status.HTTP_401_UNAUTHORIZED,
  17.             detail="Incorrect user or password",
  18.             headers={"WWW-Authenticate": "Basic"},
  19.         )
  20.     return credentials.username
  21. @app.get("/users/me")
  22. def read_user_me(current_username: str = Depends(get_current_username)):
  23.     """获取当前用户的用户名。"""
  24.     return {"username": current_username}
复制代码
请求:
  1. import requests
  2. from requests.auth import HTTPBasicAuth
  3. # 定义服务器地址
  4. BASE_URL = "http://127.0.0.1:8000"
  5. def test_get_user_me():
  6.     # 正确的凭据
  7.     correct_credentials = HTTPBasicAuth('john', 'secret')
  8.     response = requests.get(f"{BASE_URL}/users/me", auth=correct_credentials)
  9.     print("Response with correct credentials:")
  10.     print(response.status_code, response.json())
  11.     # 错误的凭据
  12.     incorrect_credentials = HTTPBasicAuth('john', 'wrongpassword')
  13.     response = requests.get(f"{BASE_URL}/users/me", auth=incorrect_credentials)
  14.     print("Response with incorrect credentials:")
  15.     print(response.status_code, response.text)
  16. if __name__ == "__main__":
  17.     test_get_user_me()
复制代码

2、Bearer Token

Bearer token 身份验证是另一种常见的身份验证方法,通常与 OAuth 2.0 团结使用。FastAPI 提供了内置的支持来处理处罚 bearer tokens。
  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from fastapi.security import OAuth2PasswordBearer
  3. app = FastAPI()
  4. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  5. # 假设我们有一个函数来验证 token 是否有效
  6. def fake_decode_token(token):
  7.     """模拟解码令牌的函数,检查令牌是否有效并返回用户信息"""
  8.     if token == "validtoken":
  9.         return {"username": "john", "scope": "admin"}
  10.     else:
  11.         return None
  12. async def get_current_user(token: str = Depends(oauth2_scheme)):
  13.     """获取当前用户,如果令牌无效则抛出未授权异常"""
  14.     user = fake_decode_token(token)
  15.     if user is None:
  16.         raise HTTPException(
  17.             status_code=status.HTTP_401_UNAUTHORIZED,
  18.             detail="Invalid token",
  19.             headers={"WWW-Authenticate": "Bearer"},
  20.         )
  21.     return user
  22. @app.get("/users/me")
  23. def read_user_me(current_user=Depends(get_current_user)):
  24.     """读取当前用户的信息"""
  25.     return current_user
复制代码
请求:
  1. import requests
  2. # 定义服务器地址
  3. BASE_URL = "http://127.0.0.1:8000"
  4. def test_get_user_me():
  5.     # 正确的令牌
  6.     valid_token = "validtoken"
  7.     headers = {"Authorization": f"Bearer {valid_token}"}
  8.     response = requests.get(f"{BASE_URL}/users/me", headers=headers)
  9.     print("Response with valid token:")
  10.     print(response.status_code, response.json())
  11.     # 错误的令牌
  12.     invalid_token = "invalidtoken"
  13.     headers = {"Authorization": f"Bearer {invalid_token}"}
  14.     response = requests.get(f"{BASE_URL}/users/me", headers=headers)
  15.     print("Response with invalid token:")
  16.     print(response.status_code, response.text)
  17. if __name__ == "__main__":
  18.     test_get_user_me()
复制代码

3、Session 管理

1)简单实现

  1. from fastapi import FastAPI, Request, HTTPException, status
  2. from fastapi.responses import JSONResponse
  3. from itsdangerous import URLSafeTimedSerializer
  4. import hashlib
  5. import os
  6. def hash_password(password: str) -> str:
  7.     # 生成随机盐
  8.     salt = os.urandom(16)
  9.     # 创建新的哈希对象
  10.     hasher = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
  11.     # 返回盐和哈希值的组合
  12.     return salt.hex() + hasher.hex()
  13. def verify_password(stored_password: str, provided_password: str) -> bool:
  14.     # 从存储的密码中提取盐
  15.     salt = bytes.fromhex(stored_password[:32])
  16.     # 提取哈希值
  17.     stored_hash = stored_password[32:]
  18.     # 重新计算哈希值
  19.     hasher = hashlib.pbkdf2_hmac('sha256', provided_password.encode('utf-8'), salt, 100000)
  20.     # 比较哈希值
  21.     return hasher.hex() == stored_hash
  22. app = FastAPI()
  23. serializer = URLSafeTimedSerializer("SECRET_KEY")
  24. # 假设的用户数据库
  25. fake_users_db = {
  26.     "alice": {
  27.         "username": "alice",
  28.         "hashed_password": hash_password("secret_password")
  29.     }
  30. }
  31. def authenticate_user(fake_db, username: str, password: str):
  32.     user = fake_db.get(username)
  33.     if not user or not verify_password(user["hashed_password"], password):
  34.         return False
  35.     return user
  36. @app.post("/login")
  37. async def login(request: Request):
  38.     data = await request.json()
  39.     username = data.get('username')
  40.     password = data.get('password')
  41.     user = authenticate_user(fake_users_db, username, password)
  42.     if not user:
  43.         raise HTTPException(
  44.             status_code=status.HTTP_401_UNAUTHORIZED,
  45.             detail="Incorrect username or password",
  46.             headers={"WWW-Authenticate": "Bearer"},
  47.         )
  48.     session_token = serializer.dumps(username)
  49.     response = JSONResponse({"message": "Login successful"})
  50.     response.set_cookie(key="session_token", value=sessi
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

笑看天下无敌手

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表