马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
介绍
推荐:一个实现各个平台OAuth2的GitHub开源项目
实现利用第三方登录功能(例如 Google、GitHub、WeChat 等)通常涉及前后端的协同工作。
以下是一个根本的实现方案,利用 FastAPI 作为后端,React 作为前端。
后端(FastAPI)
用 PostgreSQL 作为数据库,并且登录 URL 将从后端动态获取。以下是详细的实现步骤:
- 安装依靠
- pip install fastapi uvicorn httpx python-jose passlib bcrypt sqlalchemy psycopg2
复制代码 - 设置 PostgreSQL 数据库
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
- SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"
- engine = create_engine(SQLALCHEMY_DATABASE_URL)
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
- Base = declarative_base()
复制代码 - 创建 FastAPI 应用
- from fastapi import FastAPI, HTTPException, Depends, Request
- from fastapi.security import OAuth2PasswordBearer
- from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker, relationship
- from passlib.context import CryptContext
- from jose import JWTError, jwt
- import httpx
- app = FastAPI()
- # 数据库配置
- SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"
- engine = create_engine(SQLALCHEMY_DATABASE_URL)
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
- Base = declarative_base()
- # 密码加密
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- # JWT 配置
- SECRET_KEY = "your-secret-key"
- ALGORITHM = "HS256"
- # OAuth2 配置
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
- class User(Base):
- __tablename__ = "users"
- id = Column(Integer, primary_key=True, index=True)
- email = Column(String, unique=True, index=True)
- hashed_password = Column(String)
- oauth_accounts = relationship("OAuthAccount", back_populates="user")
- class OAuthAccount(Base):
- __tablename__ = "oauth_accounts"
- id = Column(Integer, primary_key=True, index=True)
- user_id = Column(Integer, ForeignKey("users.id"))
- provider = Column(String, index=True)
- provider_id = Column(String, index=True)
- user = relationship("User", back_populates="oauth_accounts")
- Base.metadata.create_all(bind=engine)
- def get_db():
- db = SessionLocal()
- try:
- yield db
- finally:
- db.close()
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- credentials_exception = HTTPException(status_code=401, detail="Could not validate credentials")
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- email: str = payload.get("sub")
- if email is None:
- raise credentials_exception
- except JWTError:
- raise credentials_exception
- db = next(get_db())
- user = db.query(User).filter(User.email == email).first()
- if user is None:
- raise credentials_exception
- return user
- @app.post("/token")
- async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
- db = next(get_db())
- user = authenticate_user(db, form_data.username, form_data.password)
- if not user:
- raise HTTPException(status_code=400, detail="Incorrect username or password")
- access_token = create_access_token(data={"sub": user.email})
- return {"access_token": access_token, "token_type": "bearer"}
- def authenticate_user(db, email: str, password: str):
- user = db.query(User).filter(User.email == email).first()
- if not user:
- return False
- if not verify_password(password, user.hashed_password):
- return False
- return user
- def verify_password(plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
- def create_access_token(data: dict):
- to_encode = data.copy()
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- @app.get("/users/me")
- async def read_users_me(current_user: User = Depends(get_current_user)):
- return current_user
- @app.get("/oauth/{provider}/login-url")
- async def get_oauth_login_url(provider: str):
- redirect_uri = "http://localhost:3000/oauth/{provider}/callback"
- if provider == "google":
- return {
- "url": f"https://accounts.google.com/o/oauth2/v2/auth?client_id=your-google-client-id&redirect_uri={redirect_uri}&response_type=code&scope=email profile"
- }
- elif provider == "github":
- return {
- "url": f"https://github.com/login/oauth/authorize?client_id=your-github-client-id&redirect_uri={redirect_uri}&scope=user:email"
- }
- elif provider == "wechat":
- return {
- "url": f"https://open.weixin.qq.com/connect/qrconnect?appid=your-wechat-app-id&redirect_uri={redirect_uri}&response_type=code&scope=snsapi_login"
- }
- else:
- raise HTTPException(status_code=404, detail="Provider not found")
- @app.get("/oauth/{provider}/callback")
- async def oauth_callback(provider: str, request: Request):
- code = request.query_params.get("code")
- if not code:
- raise HTTPException(status_code=400, detail="Missing code")
- # 获取 access_token
- async with httpx.AsyncClient() as client:
- response = await client.post(f"https://{provider}.com/oauth/token", data={
- "client_id": "your-client-id",
- "client_secret": "your-client-secret",
- "code": code,
- "redirect_uri": "your-redirect-uri"
- })
- token_data = response.json()
- access_token = token_data.get("access_token")
- # 获取用户信息
- async with httpx.AsyncClient() as client:
- response = await client.get(f"https://{provider}.com/user", headers={"Authorization": f"Bearer {access_token}"})
- user_data = response.json()
- db = next(get_db())
- oauth_account = db.query(OAuthAccount).filter(OAuthAccount.provider == provider, OAuthAccount.provider_id == user_data["id"]).first()
- if oauth_account:
- user = oauth_account.user
- else:
- user = db.query(User).filter(User.email == user_data["email"]).first()
- if not user:
- user = User(email=user_data["email"], hashed_password=pwd_context.hash("default_password"))
- db.add(user)
- db.commit()
- db.refresh(user)
- oauth_account = OAuthAccount(provider=provider, provider_id=user_data["id"], user=user)
- db.add(oauth_account)
- db.commit()
- db.refresh(oauth_account)
- access_token = create_access_token(data={"sub": user.email})
- return {"access_token": access_token, "token_type": "bearer"}
复制代码 前端(React)
- 安装依靠
- npm install axios react-router-dom
复制代码 - 创建 React 组件
- import React from 'react';
- import { BrowserRouter as Router, Route, Switch, Link } from 'react-router-dom';
- import axios from 'axios';
- const Login = () => {
- const [loginUrls, setLoginUrls] = React.useState({});
- React.useEffect(() => {
- const fetchLoginUrls = async () => {
- const providers = ['google', 'github', 'wechat'];
- const urls = {};
- for (const provider of providers) {
- const response = await axios.get(`/oauth/${provider}/login-url`);
- urls[provider] = response.data.url;
- }
- setLoginUrls(urls);
- };
- fetchLoginUrls();
- }, []);
- const handleLogin = (provider) => {
- window.location.href = loginUrls[provider];
- };
- return (
- <div>
- <button onClick={() => handleLogin('google')}>Login with Google</button>
- <button onClick={() => handleLogin('github')}>Login with GitHub</button>
- <button onClick={() => handleLogin('wechat')}>Login with WeChat</button>
- </div>
- );
- };
- const Callback = ({ match }) => {
- const { provider } = match.params;
- React.useEffect(() => {
- const params = new URLSearchParams(window.location.search);
- const code = params.get('code');
- axios.get(`/oauth/${provider}/callback?code=${code}`)
- .then(response => {
- const { access_token } = response.data;
- localStorage.setItem('access_token', access_token);
- window.location.href = '/profile';
- })
- .catch(error => {
- console.error(error);
- });
- }, [provider]);
- return <div>Loading...</div>;
- };
- const Profile = () => {
- const [user, setUser] = React.useState(null);
- React.useEffect(() => {
- const access_token = localStorage.getItem('access_token');
- axios.get('/users/me', { headers: { Authorization: `Bearer ${access_token}` } })
- .then(response => {
- setUser(response.data);
- })
- .catch(error => {
- console.error(error);
- });
- }, []);
- if (!user) return <div>Loading...</div>;
- return (
- <div>
- <h1>Profile</h1>
- <p>Email: {user.email}</p>
- </div>
- );
- };
- const App = () => {
- return (
- <Router>
- <nav>
- <Link to="/">Home</Link>
- <Link to="/profile">Profile</Link>
- </nav>
- <Switch>
- <Route path="/" exact component={Login} />
- <Route path="/oauth/:provider/callback" component={Callback} />
- <Route path="/profile" component={Profile} />
- </Switch>
- </Router>
- );
- };
- export default App;
复制代码 总结
- 后端:利用 FastAPI 处理 OAuth2 回调,获取用户信息,并将其与现有用户关联或创建新用户。同时,提供动态获取登录 URL 的接口。
- 前端:利用 React 处理登录按钮和回调逻辑,从后端动态获取登录 URL,并将获取的 access_token 存储在 localStorage 中,并在用户访问个人资料页面时利用该 access_token 获取用户信息。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |