干系术语
调用接口[wx.login()]获取登录根据(code)。通过根据进而变动用户登录态信息,包罗用户在当前小步伐的唯一标识(openid)、微信开放平台账号下的唯一标识(unionid,若当前小步伐已绑定到微信开放平台账号)及本次登录的会话密钥(session_key)等。
暂时登录根据 code 只能使用一次。
如果开辟者拥有多个移动应用、网站应用、和公众账号(包罗小步伐),可通过 UnionID 来区分用户的唯一性,由于只要是同一个微信开放平台账号下的移动应用、网站应用和公众账号(包罗小步伐),用户的 UnionID 是唯一的。换句话说,同一用户,对同一个微信开放平台下的差异应用,UnionID是类似的。
如今微信登录无法拿到微信昵称与头像,须要自己处置处罚该逻辑。
登录逻辑
登录逻辑紧张分两步:
起首这是对应的逻辑筹划图:
起首由前端天生一个code,这个code 须要返回给后端,以是后端须要拿到一个必须得一个code 参数,
后端拿到这个code 须要去拿到对应小步伐的三个appid,appsecret (这两个是小步伐注册时提供),code(前端传参)去哀求API:
- GET https://api.weixin.qq.com/sns/jscode2session?appid=APPID&secret=SECRET&js_code=JSCODE&grant_type=authorization_code
复制代码 哀求参数
属性范例必填阐明appidstring是小步伐 appIdsecretstring是小步伐 appSecretjs_codestring是登录时获取的 code,可通过wx.login获取grant_typestring是授权范例,此处只需填写 authorization_code返回参数
属性范例阐明session_keystring会话密钥unionidstring用户在开放平台的唯一标识符,若当前小步伐已绑定到微信开放平台账号下会返回,详见 UnionID 机制阐明。errmsgstring错误信息openidstring用户唯一标识errcodeint32错误码后端如今通过API拿到如上的参数,须要返回给前端对应的session_key,unionid(大概openid)这一步相当于前端使用 code 变动 openid、unionid、session_key 等信息。
如今如果须要通过这个信息拿得手机号照旧须要举行第二步的处置处罚。
前端拿到这个信息后,须要颠末一系列操纵返回给后端三个参数。
- data.encryptedData, data.Iv, data.session_key
复制代码 后端拿到这三个参数并可以解密手机号,该加密与解密思绪如下:
AES对称加密算法,而且接纳了 AES CBC(Cipher Block Chaining)模式。[反面会具体谈]
至此一个登录逻辑已经完成,但这里只是讲的是微信登录逻辑,下面进入到筹划模块。
登录筹划
任何一个登录模块都少不了数据库,以是这里还须要联合数据库JWT等举行报告。
起首是后端筹划的表,本次使用django 框架举行筹划表,除了django自带的User表以外,须要再筹划一个user_expand表,但由于可以做一个兼容,可以把三方登录信息在再单独做一个表出来,如下:
三方登岸信息表:
字段名范例阐明idint【主键】user_idint【逻辑外键】关联用户扩展表idplatformstring用户泉源platform_unique_idstring新增字段,微信的就是openid,谷歌的就是用户的邮箱,其他的类似【三方登岸唯一标识】ynbool这里须要把三方登录单独拿出来,由于是一个用户大概有多个三方登录信息,以是须要要筹划成一对多的情势。
登录逻辑如下(须要绑定手机号):
1.前端给后端一个code 2.后端通过API 得到openid 3.查找三方表,是否有这个openid ,如果有那么拿到对应的user_id,然后refresh = RefreshToken.for_user(user) 返回给前端一个access_token即可;如果没有查到这个openid,那么须要返回给前端一个信息就是该用户没有绑定过的信息,须要前端返回一个加密向量(以及后端会给前端一个openid)。 4. 后端拿到这个加密向量,解密出这个手机号信息后,把手机号信息以及openid插入到对应数据表中,返回给前端access_token 以表现绑定乐成。
登录代码
起首是微信登录使用的一些工具函数:
- class WeiXinMiniAppLogin:
- async def get_three_login_unique_id(self, param: dict):
- code = param.get("token")
- login_params = WeiXinMiniAppLoginParams()
- wx_info = await self.get_wx_miniapp_info(code, await login_params.get_params()) # 获取到wx_info 的信息
- logger.info(f"wx_info: {wx_info}")
- return wx_info
- @staticmethod
- def get_login_type():
- return UserSourceEnum.WEIXINMINIAPP.value
- @staticmethod
- async def get_wx_miniapp_info(code: str, login_params: dict):
- params = {
- "appid": login_params.get("appid", ""),
- "secret": login_params.get("secret", ""),
- "js_code": code,
- "grant_type": login_params.get("grant_type", ""),
- }
- jscode2session_url = login_params.get("jscode2session_url", "")
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(jscode2session_url, params=params, ssl=False) as response:
- if response.status != 200:
- logger.error(f"WeiXinMiniAppLogin.jscode2session HTTP error: {response.status}")
- return None
- a = await response.text() # Debug
- data = json.loads(a)
- # print(response)
- # return data
- # data = await response.json() # B端的写法是data = json.loads(response.text),异步优先使用这个方法
- except Exception as e:
- logger.error(f"WeiXinMiniAppLogin.jscode2session error for HTTP: {e}")
- return None
- error_code = data.get("errcode")
- if error_code:
- logger.error(f"WeiXinMiniAppLogin.jscode2session error_code: {data}")
- return None
- session_key = data.get("session_key")
- openid = data.get("openid")
- unionid = data.get("unionid", None) # 注意为空的情况
- # 如果为空,记录下,本小程序都是空的情况
- if unionid is None:
- # logger.error(f"WeiXinMiniAppLogin.jscode2session error: missing unionid in {data}")
- r_data = {
- "session_key": session_key,
- "openid": openid
- }
- return r_data
- # 如果不空的话就返回,实际情况就是只有openid
- r_data = {
- "unionid": unionid,
- "session_key": session_key,
- "openid": openid
- }
- return r_data
- @staticmethod
- async def get_phone_number(encrypted_data, aes_iv, session_key):
- try:
- session_key = session_key.replace("\", "")
- session_key_bytes = b64decode(session_key)
- aes_iv_bytes = b64decode(aes_iv)
- encrypted_data_bytes = b64decode(encrypted_data)
- cipher = AES.new(session_key_bytes, AES.MODE_CBC, aes_iv_bytes)
- decrypted_bytes = cipher.decrypt(encrypted_data_bytes)
- padding_len = decrypted_bytes[-1]
- decrypted_bytes = decrypted_bytes[:-padding_len]
- decrypted_str = decrypted_bytes.decode('utf-8')
- model = json.loads(decrypted_str)
- phone_number = model.get("phoneNumber", "")
- return phone_number
- except Exception as ex:
- logger.error(
- f"WeiXinMiniAppLogin.get_phone_number error,encrypted_data:{encrypted_data},aes_iv:{aes_iv},session_key:{session_key},errormsg:{ex}")
- return None
复制代码 这里对get_phone_number的解密方法做一些阐明:
这个函数使用的是 AES对称加密算法,而且接纳了 AES CBC(Cipher Block Chaining)模式。
具体分析:
- AES (Advanced Encryption Standard):一种对称加密算法,常用于掩护数据的安全性。对称加密意味着加密息争密都使用类似的密钥(即这里的 session_key)。
- AES.MODE_CBC (Cipher Block Chaining 模式):这是AES的一种工作模式,CBC模式是将前一个加密块的密文与当前块举行异或后再举行加密。解密时也要通过类似的初始化向量(IV,即这里的 aes_iv)来解密。
- b64decode:表现输入的 session_key、aes_iv 和 encrypted_data 是通过 Base64 编码的,在解密之前须要将其从 Base64 格式解码成原始字节流。
- 添补和去除添补:AES加密的数据块必须是固定长度(一样平常为128位,也就是16字节),如果数据长度不敷,就会主动添加添补字符。在解密后,代码通过 padding_len = decrypted_bytes[-1] 获取添补的长度,并通过 decrypted_bytes[:-padding_len] 去除添补。
- 终极数据解密:解密后的数据是一个 JSON 字符串,末了通过 json.loads(decrypted_str) 分析为字典对象,获取此中的 phoneNumber。
下面是登录接口的筹划,留意须要构造两个接口,
起首是三方登录回调接口:
- 前端传入参数:
- {
- "login_type": "string", # 可省略,做一个三方登录标识
- "token": "string"
- }
- 后端返回参数:
- //需要绑定手机号
- {
- "status": 200,
- "message": "OK",
- "data": {
- "third_login_unique_id": "wx_XXXXXXXXX",
- "need_binding": true, # 需要绑定手机号
- "session_key": "some_session_key"
- }
- }
- // 之前绑定手机号的
- // {
- // "status": 200,
- // "message": "OK",
- // "data": {
- // "need_binding": false,
- // "refresh": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.",
- // "access": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXV"
- // }
- // }
复制代码 然后是绑定手机号的逻辑:
- 前端传入参数:
- {
- "third_login_unique_id": "wx_onwXXXXXX",
- "encryptedData": "xxxx",
- "Iv": "xxxx",
- "session_key": "xxxx"
- }
- 后端返回参数:
- {
- "status": 200,
- "message": "OK",
- "data": {
- "refresh": "eyJhbGciOiJIUzI1N",
- "access": "eyJhbGciO"
- }
- }
复制代码- @api_controller('user/', tags=['login'], permissions=[])class LoginController: @http_post('third_login/, response=ThirdLoginBResponse) async def third_login(self, data: ThirdLoginBRequest): # 获取登录范例 login_type = data.login_type # weixinminiapp third_login_instances = third_login_initializer.get_instance(login_type) if third_login_instances is None: raise HttpError(status_code=400, detail="third_login_instances is None") # 获取微信信息 wx_info = await third_login_instances.get_three_login_unique_id({"token": data.token}) if wx_info is None: raise HttpError(status_code=400, detail="three_login_unique_id is None") # 获取 openid 和 unionid openid = wx_info.get("openid") unionid = wx_info.get("unionid", None) logger.info(f"ThirdLoginBView.post openid:{openid},unionid:{unionid},login_type:{login_type}") old_third_login_unique_id = f"wx_{openid}" # 小步伐只有openid无法拿到unionid # new_third_login_unique_id = f"wx_{unionid}" if unionid else old_third_login_unique_id # 先从三方表内里去查这个用户 user_social_account = await UserSocialAccount.objects.filter( platform_unique_id=old_third_login_unique_id, platform=login_type ).afirst() User = get_user_model() # 如果用户存在,直接返回对应的token值 if user_social_account: user_id = user_social_account.user_id user = await User.objects.aget(id=user_id) refresh = RefreshToken.for_user(user) response_data = { "status": 200, "message": "OK", "data": { "third_login_unique_id": old_third_login_unique_id, # 只有openid "need_binding": False, "refresh": str(refresh), "access": str(refresh.access_token), } } response_data["data"]["session_key"] = wx_info.get("session_key") # 调试解密,该参数只有在须要绑定的时间返回,这里做个测试生存 else: # 如果用户不存在,返回须要绑定的信息 response_data = { "status": 200, "message": "OK", "data": { "third_login_unique_id": old_third_login_unique_id, "need_binding": True } } if login_type == UserSourceEnum.WEIXINMINIAPP.value: response_data["data"]["session_key"] = wx_info.get("session_key") return ThirdLoginBResponse(**response_data) @http_post('wx_binding_phone/', response=WXBindingPhoneResponse) async def wx_binding_phone(self, data: WXBindingPhoneRequest): # logger.info("ThirdLoginView.post start:" + json.dumps(data.dict(), ensure_ascii=False)) # 解密获取手机号 wx_mini_app_login = WeiXinMiniAppLogin() phone_number = await wx_mini_app_login.get_phone_number(data.encryptedData, data.Iv, data.session_key
- ) if not phone_number: raise HttpError(status_code=400, detail="无法解密获取手机号") # 获取用户模子 User = get_user_model() # 查找或创建用户 user, created = await User.objects.aget_or_create(username=phone_number) if created: user.set_unusable_password() await user.asave() # 查找或创建用户扩展信息 user_expand, created = await UserExpand.objects.aget_or_create(user=user) if created: user_expand.phone = user.username user_expand.nick_name = phone_number user_expand.avatar = "https://thirdwx.qlogo.cn/mmopen/vi_32/POgEwh4mIHO4nibH0KlMECNjjGxQUq24ZEaGT4poC6icRiccVGKSyXwibcPq4BWmiaIGuG1icwxaQX6grC9VemZoJ8rg/132" # 缺头像,暂时不处置处罚,昵称就是手机号,如今无法拿到昵称和头像 await user_expand.asave() # 创建新的 UserSocialAccount 对象并生存 user_social_account = UserSocialAccount( user_id=user.id, platform=UserSourceEnum.WEIXINMINIAPP.value, platform_unique_id=data.third_login_unique_id, ) await user_social_account.asave() else: pass refresh = RefreshToken.for_user(user) # 构建相应数据 response_data = { "status": 200, "message": "OK", "data": { 'refresh': str(refresh), 'access': str(refresh.access_token), } } return WXBindingPhoneResponse(**response_data)
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金 |