fastapi+vue实现按钮级别的权限控制

打印 上一主题 下一主题

主题 898|帖子 898|积分 2694

一、前端部分

1.1 自界说指令
  1. import store from '@/store'
  2. // 判断是否有权限
  3. const hasPermission = (value, el) => {
  4.   // 检查是否配置了权限参数
  5.   if (!Array.isArray(value) || value.length === 0) {
  6.     throw new Error(`v-permission 需要配置权限,例如 v-permission="['xxx']"`)
  7.   }
  8.   // 获取用户权限,登录后从store中获取到
  9.   const ruleNames = store.getters['permissions'] || []
  10.   if (!Array.isArray(ruleNames)) {
  11.     console.warn('权限数据 "menu/getRuleNames" 格式不正确,请检查 store 配置。')
  12.     return
  13.   }
  14.   // 判断是否有权限
  15.   const hasAuth = value.some((val) => ruleNames.includes(val))
  16.   if (!hasAuth) {
  17.     el.style.display = 'none'
  18.   }
  19.   return hasAuth
  20. }
  21. export default {
  22.   install(Vue) {
  23.     Vue.directive('permission', {
  24.       bind(el, binding) {
  25.         hasPermission(binding.value, el)
  26.       },
  27.       updated(el, binding) {
  28.         hasPermission(binding.value, el)
  29.       }
  30.     })
  31.   }
  32. }
复制代码
1.2 注册自界说指令
  1. import permission from '@/utils/utils'
  2. // 注册自定义指令
  3. Vue.use(permission)
复制代码
1.3 在组件中还用自界说指令
  1.     <el-button v-permission="['add_dept']" type="primary" size="medium" @click="addDeptBtn">新增部门</el-button>
复制代码
二、后端部分

整理思绪为:用户登录后生成token,然后根据fastapi的oauth2编写依靠项,并将其注入到所有的路由函数中表现需要token才能进行访问,然后再在每一个接口函数中,利用依靠性注入判断权限标识的方法,判断根据token中的用户id是否存在此接口的权限标识
2.1 生成token的方法
  1. def create_token(payload: dict, expires: timedelta = None):
  2.     """
  3.     根据用户的电话号码和密码生成token
  4.     :param payload: 载荷-用户的电话号码和密码
  5.     :param expires: 过期时间
  6.     :return: token
  7.     """
  8.     if expires:
  9.         expire = datetime.now() + expires
  10.     else:
  11.         expire = datetime.now() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
  12.     payload.update({"exp": expire})
  13.     token = jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  14.     return token
复制代码
2.2 登录接口需要的一些工具方法
  1. from datetime import datetime
  2. from sqlalchemy import select, update
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from starlette import status
  5. from starlette.exceptions import HTTPException
  6. from starlette.requests import Request
  7. from apps.vadmin.auth.models import VadminUser, VadminRole
  8. from apps.vadmin.auth.schemas.auth import LoginSchema
  9. from apps.vadmin.auth.schemas.role import RoleOutSchema
  10. from apps.vadmin.record.models import VadminLoginRecord, VadminRecordAction
  11. from apps.vadmin.record.schemas.login import LoginForm
  12. # 获取用户信息的函数
  13. async def get_user_by_telephone(username: str, db: AsyncSession):
  14.     stmt = select(VadminUser).where(VadminUser.telephone == username).filter(VadminUser.is_delete == False)
  15.     return await db.scalar(stmt)
  16. # 获取用户权限
  17. async def get_user_permissions(user: VadminUser, db: AsyncSession):
  18.     permissions = []
  19.     user_role_list = user.roles
  20.     for role in user_role_list:
  21.         role_stmt = select(VadminRole).where(VadminRole.id == role.id, VadminRole.is_delete == False)
  22.         role_model = await db.scalar(role_stmt)
  23.         if not role_model:
  24.             raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='当前用户的角色不存在!')
  25.         role_info = RoleOutSchema.from_orm(role_model).model_dump()
  26.         permissions.extend(menu.get('perms') for menu in role_info.get('menus', []))
  27.     return permissions
  28. # 记录登录操作
  29. async def create_login_record(data: LoginSchema, request: Request, response_data: dict, db: AsyncSession):
  30.     login_form = LoginForm(telephone=data.username, password=data.password, method="0", platform="0")
  31.     await VadminLoginRecord.create_login_record(db, login_form, True, request, response_data)
  32.     await VadminRecordAction.create_action_record(
  33.         db=db,
  34.         action_type='登录操作',
  35.         action_user=data.username,
  36.         action_tag='登录模块',
  37.         action_description='用户登录',
  38.         data={'telephone': data.username},
  39.         req=request,
  40.         resp=response_data,
  41.         method=request.method,
  42.         status=True
  43.     )
  44.     # 更新登录时间
  45.     login_stmt = update(VadminUser).where(VadminUser.telephone == data.username).values(last_login=datetime.now())
  46.     await db.execute(login_stmt)
复制代码
2.2 校验token并获取token中的用户信息
  1. from fastapi import APIRouter, Depends
  2. from sqlalchemy.ext.asyncio import AsyncSession
  3. from starlette import status
  4. from starlette.requests import Request
  5. from apps.vadmin.auth.models import VadminUser
  6. from apps.vadmin.auth.schemas.auth import LoginSchema
  7. from apps.vadmin.auth.schemas.user import OutUsrSchema
  8. from apps.vadmin.auth.validate.auth import get_user_by_telephone, get_user_permissions, create_login_record
  9. from core.database import db_getter
  10. from core.utils import create_token
  11. from utils.response import ErrorResponse
  12. app = APIRouter()
  13. @app.post('/login', summary="登录")
  14. async def login(request: Request, data: LoginSchema, db: AsyncSession = Depends(db_getter)):
  15.     # 1. 校验用户是否存在
  16.     user = await get_user_by_telephone(data.username, db)
  17.     if not user or not VadminUser.verify_password(data.password, user.password):
  18.         return ErrorResponse(status_code=status.HTTP_401_UNAUTHORIZED, msg="手机号或密码错误!!!")
  19.     # 2. 校验用户状态
  20.     if not data.is_active:
  21.         return ErrorResponse(status_code=status.HTTP_403_FORBIDDEN, msg="该用户已被禁用,请联系管理员!!!")
  22.     if not data.is_staff:
  23.         return ErrorResponse(status_code=status.HTTP_403_FORBIDDEN, msg="该用户无权限,请联系管理员!!!")
  24.     # 3. 整理用户信息并返回
  25.     permissions = await get_user_permissions(user, db)
  26.     # 4. 生成token
  27.     token = create_token({'telephone': data.username, 'user_id': user.id})
  28.     # 5. 返回用户信息和 token
  29.     user_info = OutUsrSchema.from_orm(user).model_dump()
  30.     user_info.update({'permissions': permissions})
  31.     response_data = {'user': user_info, 'token': token}
  32.     response = {"code": 200, "data": response_data, "message": "登录成功!!!"}
  33.     await create_login_record(data, request, response_data, db)
  34.     return response
复制代码
2.3 装饰器校验权限
  1. # 权限依赖项
  2. def check_permissions(required_roles: List):
  3.     def permission_dependency(user: Dict = Depends(get_current_user)):
  4.         user_permissions_list = user.get('permissions', [])
  5.         for required_role in required_roles:
  6.             if required_role not in user_permissions_list:
  7.                 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='您无权限操作此权限!')
  8.         return user
  9.     return permission_dependency
复制代码
2.4 利用依靠注入校验接口是否有权限
  1. @app.get('/dept', summary='获取部门列表', response_model=DeptSimpleResponse,
  2.          dependencies=[Depends(check_permissions(['get_dept']))])
  3. async def get_dept_list(params: DeptQuerySchema = Depends(), db: AsyncSession = Depends(db_getter)):
  4.     dept_data_list = await get_dept_tree_or_list_curd(params, db)
  5.     return SuccessResponse(data=[dept_data.model_dump() for dept_data in dept_data_list], msg='获取部门列表成功!')
  6. @app.post('/dept', summary='创建部门', response_model=DeptSimpleResponse,
  7.           dependencies=[Depends(check_permissions(['add_dept']))])
  8. async def create_dept(data: DeptCreateSchema, db: AsyncSession = Depends(db_getter)):
  9.     new_dept = await create_dept_curd(data, db)
  10.     return SuccessResponse(data=new_dept.model_dump(), msg='创建部门成功!')
  11. @app.put('/dept/{dept_id}', summary='更新部门信息', response_model=DeptSimpleResponse,
  12.          dependencies=[Depends(check_permissions(['update_dept']))])
  13. async def update_dept(dept_id: int, data: DeptUpdateSchema, db: AsyncSession = Depends(db_getter)):
  14.     updated_dept = await update_dept_curd(dept_id, data, db)
  15.     return SuccessResponse(data=updated_dept.model_dump(), msg='更新部门成功!')
  16. @app.delete('/dept/{dept_id}', summary='删除部门', response_model=DeptSimpleResponse,
  17.             dependencies=[Depends(check_permissions(['delete_dept']))])
  18. async def delete_dept(dept_id: int, db: AsyncSession = Depends(db_getter)):
  19.     await delete_dept_curd(dept_id, db)
  20.     return SuccessResponse(msg='删除部门成功!')
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

慢吞云雾缓吐愁

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表