ToB企服应用市场:ToB评测及商务社交产业平台

标题: 网络安全-怎样设计一个安全的API(安全角度) [打印本页]

作者: 曂沅仴駦    时间: 2024-7-12 01:12
标题: 网络安全-怎样设计一个安全的API(安全角度)

API安全概述

使用API可进行以下常见的攻击:

有以下常见的防御方式:

当然,有些还是必要API后端代码来进行防御,例如下令注入、SSRF等。
本文以API身份认证为主要内容,浅谈各种认证的使用场景与优缺点,同时穿插部分攻击与防御。
设计一个安全的API

一个基本的API

主要代码

  1. import uuid
  2. import re
  3. import traceback
  4. from flask import Flask, request, jsonify
  5. from mysql import MysqlCli
  6. from log import log
  7. from setting import *
  8. app = Flask(__name__)
  9. log.set_file()
  10. # 验证username
  11. def validate_username(username:str)->bool:
  12.     if len(username) > 20:
  13.         return False
  14.     return True
  15. # 验证手机号
  16. def validate_phone_number(phone_number:str)->bool:
  17.     # 使用正则表达式检查手机号格式
  18.     pattern = re.compile(r'^1[3456789]\d{9}$')
  19.     if re.match(pattern, phone_number):
  20.         return True
  21.     else:
  22.         return False
  23. # 管理员注册用户接口
  24. @app.route('/api/v1.0/admin/add_user', methods=['POST'])
  25. def add_user():
  26.     resp = {
  27.         "requestid": uuid.uuid4()
  28.     }
  29.     if request.is_json:
  30.         data = request.get_json()
  31.         username = data.get('username')
  32.         if username is None:
  33.             resp['error'] = 'username is required'
  34.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  35.             return jsonify(resp),400
  36.         phone_number = data.get('phone_number')
  37.         if phone_number is None:
  38.             resp['error'] = 'phone_number is required'
  39.             log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
  40.             return jsonify(resp),400
  41.         if not validate_username(username):
  42.             resp['error'] = 'username length should not exceed 10'
  43.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  44.             return jsonify(resp),400
  45.         try:
  46.             cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
  47.             cli.insert_one("users",
  48.                 {
  49.                     "username": username,
  50.                     "phone_number": phone_number
  51.                 }
  52.             )
  53.             cli.close()
  54.             resp['message'] = f'success to add user:{username}!'
  55.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  56.             return jsonify(resp)
  57.         except Exception:
  58.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  59.             resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
  60.             return jsonify(resp),500
  61.     else:
  62.         resp['error'] = "Invalid JSON format in request"
  63.         return jsonify(resp),400
  64. # 用户获取信息接口
  65. @app.route('/api/v1.0/get_user_info', methods=['POST'])
  66. def get_user_info():
  67.     resp = {
  68.         "requestid": uuid.uuid4()
  69.     }
  70.     if request.is_json:
  71.         data = request.get_json()
  72.         username = data.get('username')
  73.         if username is None:
  74.             resp['error'] = 'username is required'
  75.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  76.             return jsonify(resp),400
  77.         if not validate_username(username):
  78.             resp['error'] = 'username length should not exceed 10'
  79.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  80.             return jsonify(resp),400
  81.         try:
  82.             cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
  83.             sql = f"select * from users where username = '{username}' limit 1"
  84.             user = cli.select_all(sql)
  85.             resp['message'] = f'success to get user:{user}.'
  86.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  87.             return jsonify(resp)
  88.         except Exception:
  89.             resp['error'] = f'failed to get user, error:{traceback.format_exc()}'
  90.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  91.             return jsonify(resp),500
  92.     else:
  93.         resp['error'] = "Invalid JSON format in request"
  94.         return jsonify(resp),400
  95. if __name__ == '__main__':
  96.     app.run()
复制代码
可以看到共有两个API,管理员注册用户接口、用户获取信息接口,拥有以下功能或安全步伐:

调用



API的一些问题


BasicAuth

认证流程

主要代码

  1. def basic_auth(f):
  2.     @wraps(f)
  3.     def decorated_function(*args, **kwargs):
  4.         resp = {
  5.             "requestid": uuid.uuid4()
  6.         }
  7.         auth_header = request.headers.get('Authorization')
  8.         if not auth_header or not auth_header.startswith('Basic '):
  9.             resp['error'] = 'basic auth is required'
  10.             log.logger.error(f"url:{request.url},resp:{resp}")
  11.             return jsonify(resp), 400
  12.         else:
  13.             try:
  14.                 encoded_credentials = auth_header.split(' ')[1]
  15.                 decoded_credentials = base64.b64decode(encoded_credentials).decode('utf-8')
  16.                 username, password = decoded_credentials.split(':')
  17.                 if not check_basic_auth(username, password):
  18.                     resp['error'] = 'basic auth failed,check your username or password is right'
  19.                     log.logger.error(f"url:{request.url},resp:{resp}")
  20.                     return jsonify(resp), 401
  21.             except Exception:
  22.                 resp['error'] = f'basic auth failed,err: {traceback.format_exc()}'
  23.                 log.logger.error(f"url:{request.url},resp:{resp}")
  24.                 return jsonify(resp), 500
  25.         return f(*args, **kwargs)
  26.     return decorated_function
  27. # 管理员注册用户接口 v2.0 增加密码
  28. @app.route('/api/v2.0/admin/basic_auth/add_user', methods=['POST'])
  29. @basic_auth
  30. def add_user_basic_auth():
  31.     resp = {
  32.         "requestid": uuid.uuid4()
  33.     }
  34.     # 请求是json格式
  35.     if request.is_json:
  36.         data = request.get_json()
  37.         username = data.get('username')
  38.         # username检查
  39.         if username is None:
  40.             resp['error'] = 'username is required'
  41.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  42.             return jsonify(resp),400
  43.         if not validate_username(username):
  44.             resp['error'] = 'username length should not exceed 10'
  45.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  46.             return jsonify(resp),400
  47.         # phone_number检查
  48.         phone_number = data.get('phone_number')
  49.         if phone_number is None:
  50.             resp['error'] = 'phone_number is required'
  51.             log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
  52.             return jsonify(resp),400
  53.         # 插入数据库
  54.         try:
  55.             cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
  56.             cli.insert_one("users",
  57.                 {
  58.                     "username": username,
  59.                     "phone_number": phone_number
  60.                 }
  61.             )
  62.             pwd = generate_random_password(secrets.choice(range(8, 17)))
  63.             cli.insert_one("passwords", {
  64.                 "username": username,
  65.                 "password": pwd
  66.             })
  67.             cli.close()
  68.             resp['message'] = f'success to add user:{username},password {pwd},remember it!'
  69.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  70.         # 异常返回
  71.         except Exception:
  72.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  73.             resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}",500
  74.             return jsonify(resp)
  75.         return jsonify(resp)
  76.     # 请求不是json格式
  77.     else:
  78.         resp['error'] = "Invalid JSON format in request"
  79.         return jsonify(resp),400
复制代码




问题


引入问题:
API Key

流程

主要代码

  1. def api_key_auth(f):
  2.     @wraps(f)
  3.     def decorated_function(*args, **kwargs):
  4.         resp = {
  5.             "requestid": uuid.uuid4()
  6.         }
  7.         # api key检查
  8.         if "api_key" not in request.headers:
  9.             resp['error'] = 'api_key is required in headers'
  10.             log.logger.error(f"url:{request.url},resp:{resp}")
  11.             return jsonify(resp), 400
  12.         if not check_api_key(request.headers["api_key"]):
  13.             resp['error'] = f'api_key {request.headers["api_key"]} is invalid'
  14.             log.logger.error(f"url:{request.url},resp:{resp}")
  15.             return jsonify(resp), 401
  16.         return f(*args, **kwargs)
  17.     return decorated_function
  18. # 管理员注册用户接口 v2.0 增加api token
  19. @app.route('/api/v2.0/admin/api_key/add_user', methods=['POST'])
  20. @api_key_auth
  21. def add_user_api_key():
  22.     resp = {
  23.         "requestid": uuid.uuid4()
  24.     }
  25.     # request是json
  26.     if request.is_json:
  27.         data = request.get_json()
  28.         username = data.get('username')
  29.         if username is None:
  30.             resp['error'] = 'username is required'
  31.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  32.             return jsonify(resp),400
  33.         phone_number = data.get('phone_number')
  34.         if phone_number is None:
  35.             resp['error'] = 'phone_number is required'
  36.             log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
  37.             return jsonify(resp),400
  38.         if not validate_username(username):
  39.             resp['error'] = 'username length should not exceed 10'
  40.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  41.             return jsonify(resp),400
  42.         try:
  43.             cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
  44.             cli.insert_one("users",
  45.                 {
  46.                     "username": username,
  47.                     "phone_number": phone_number
  48.                 }
  49.             )
  50.             key = secrets.token_hex(16)
  51.             cli.insert_one("keys",
  52.                            {
  53.                                "username": username,
  54.                                "key": key
  55.                            })
  56.             cli.close()
  57.             resp['message'] =f'success to add user:{username},key {key},remember it!'
  58.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  59.         except Exception:
  60.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  61.             resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
  62.             return jsonify(resp),500
  63.         return jsonify(resp)
  64.     else:
  65.         resp['error'] = "Invalid JSON format in request"
  66.         return jsonify(resp),400
复制代码
博主这里实现时,添加了自定义请求头api_key

问题


引入问题:
Bearer auth/Token auth

流程

使用 Bearer authentication 的优势在于令牌本身可以包罗更多的信息、具有较长的有效期,并且不必要在服务器端保存会话状态,这样可以减轻服务器负担并提高安全性。
这里就不实现了,后面通过jwt token,算是实现其中的一种。
Digest Auth

流程

涉及的几个常见参数如下:

主要代码

  1. # 将字符串保存到Redis中,并设置过期时间
  2. def save_nonce_with_expiry(key, value, expiry_seconds):
  3.     """
  4.     :param key: 键
  5.     :param value: 值
  6.     :param expiry_seconds: 过期时间(秒)
  7.     """
  8.     # 连接Redis数据库
  9.     redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD, port=REDIS_PORT, db=REDIS_DB)
  10.     redis_client.setex(key, expiry_seconds, value)
  11. def check_nonce(key):
  12.     """
  13.     检查字符串是否存在于Redis中
  14.     :param key: 键
  15.     :return: 布尔值,表示键是否存在
  16.     """
  17.     # 连接Redis数据库
  18.     redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD ,port=REDIS_PORT, db=REDIS_DB)
  19.     return redis_client.exists(key)
  20. # 校验username 返回密码
  21. def check_username(username):
  22.     try:
  23.         cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
  24.         sql = f"select * from passwords where username = '{username}' limit 1"
  25.         user = cli.select_one(sql)
  26.         return user['password'] if user else ''
  27.     except Exception:
  28.         log.logger.error(f"check_basic_auth failed,error:{traceback.format_exc()}")
  29.         return False
  30. # 校验response
  31. def check_response(response,realm,username,password,method,uri,req_nonce,nc,cnonce,qop):
  32.     try:
  33.         log.logger.info(f"check_response ,response:{response},realm:{realm},username:{username},"
  34.                         f"password:{password},method:{method},uri:{uri},req_nonce:{req_nonce},nc:{nc},"
  35.                         f"cnonce:{cnonce},qop:{qop}")
  36.         # 校验nonce
  37.         if not check_nonce(req_nonce):
  38.             log.logger.error(f"check_signature failed,error:{req_nonce} not exist!")
  39.             return False
  40.         ha1=hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest()
  41.         ha2=hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
  42.         return response == hashlib.md5(f"{ha1}:{req_nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode()).hexdigest()
  43.     except Exception:
  44.         log.logger.error(f"check_responce failed,error:{traceback.format_exc()}")
  45.         return False
  46. # 添加用户 用于添加用户时无需额外增加用户关联的密钥等信息时的接口
  47. def add_user():
  48.     resp = {
  49.         "requestid": uuid.uuid4()
  50.     }
  51.     log.logger.info(request.headers)
  52.     # request是json
  53.     if request.is_json:
  54.         data = request.get_json()
  55.         username = data.get('username')
  56.         if username is None:
  57.             resp['error'] = 'username is required'
  58.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  59.             return jsonify(resp)
  60.         phone_number = data.get('phone_number')
  61.         if phone_number is None:
  62.             resp['error'] = 'phone_number is required'
  63.             log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
  64.             return jsonify(resp)
  65.         if not validate_username(username):
  66.             resp['error'] = 'username length should not exceed 10'
  67.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  68.             return jsonify(resp)
  69.         try:
  70.             cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
  71.             cli.insert_one("users",
  72.                            {
  73.                                "username": username,
  74.                                "phone_number": phone_number
  75.                            }
  76.                            )
  77.             cli.close()
  78.             resp['message'] = f'success to add user:{username}!'
  79.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  80.         except Exception:
  81.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  82.             resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
  83.         return jsonify(resp)
  84.     else:
  85.         resp['error'] = "Invalid JSON format in request"
  86.         return jsonify(resp)
  87. # 管理员注册用户接口 v2.0增加digest算法
  88. @app.route('/api/v2.0/admin/digest_auth/add_user', methods=['POST'])
  89. @digest_auth
  90. def add_user_digest():
  91.     return add_user()
复制代码

输入账号密码后登录,发起请求

问题


优点:
引入问题:
JWT Token

流程

通常由三个部分构成:header、payload 和 signature。

代码

  1. def jwt_auth(f):
  2.     @wraps(f)
  3.     def decorated_function(*args, **kwargs):
  4.         resp = {
  5.             "requestid": uuid.uuid4()
  6.         }
  7.         auth_header = request.headers.get('Authorization')
  8.         if not auth_header or not auth_header.startswith("Bearer "):
  9.             resp["message"] = "no Authorization header or invalid format"
  10.             return jsonify(resp), 400
  11.         try:
  12.             token = auth_header.split(' ')[1]
  13.             jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
  14.         except jwt.ExpiredSignatureError:
  15.             resp["error"] = "token has expired"
  16.             log.logger.error(f"url:{request.url},token:{token},resp:{resp}")
  17.             return jsonify(resp), 401
  18.         except jwt.InvalidTokenError:
  19.             resp["message"] = f"invalid token {token}"
  20.             log.logger.error(f"url:{request.url},token:{token},resp:{resp}")
  21.             return jsonify(resp), 401
  22.         except Exception as e:
  23.             resp["message"] = f"interal error {e}"
  24.             log.logger.error(f"url:{request.url},token:{token},resp:{resp}")
  25.             return jsonify(resp), 500
  26.         return f(*args, **kwargs)
  27.     return decorated_function
  28. # 用户登录获取jwt token接口
  29. @app.route('/api/v2.0/admin/jwt/login', methods=['POST'])
  30. def get_jwt_token():
  31.     resp = {
  32.         "requestid": uuid.uuid4()
  33.     }
  34.     if request.is_json:
  35.         data = request.get_json()
  36.         username = data.get('username')
  37.         password = data.get('password')
  38.         if username is None or password is None:
  39.             resp['error'] = 'username or password is required'
  40.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  41.             return jsonify(resp),400
  42.         if not validate_username(username):
  43.             resp['error'] = 'username length should not exceed 10'
  44.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  45.             return jsonify(resp),400
  46.         try:
  47.             cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
  48.             sql = f"select * from passwords where username = '{username}' limit 1"
  49.             user = cli.select_one(sql)
  50.             if not user:
  51.                 resp['error'] = 'username not found'
  52.                 log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  53.                 return jsonify(resp), 400
  54.             if user['password'] != password:
  55.                 resp['error'] = 'password is not right'
  56.                 log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  57.                 return jsonify(resp), 401
  58.             payload = {
  59.                 "iss": "lady_killer9",
  60.                 "exp": datetime.now() + timedelta(seconds=5*60),
  61.                 "jti": str(uuid.uuid4())
  62.             }
  63.             resp['message'] = f'success to login :{user},token:{jwt.encode(payload,JWT_SECRET,algorithm="HS256")}'
  64.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  65.             return jsonify(resp), 200
  66.         except Exception:
  67.             resp['error'] = f'failed to get user, error:{traceback.format_exc()}'
  68.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  69.             return jsonify(resp),500
  70.     else:
  71.         resp['error'] = "Invalid JSON format in request"
  72.         return jsonify(resp),400
  73. # 管理员注册用户接口 v2.0增加jwt sha256算法
  74. @app.route('/api/v2.0/admin/jwt/add_user', methods=['POST'])
  75. @jwt_auth
  76. def add_user_jwt():
  77.     return add_user()
  78. # 用户获取信息接口 v2.0 增加jwt sha256算法
  79. @app.route('/api/v2.0/jwt/get_user_info', methods=['POST'])
  80. @jwt_auth
  81. def get_user_info_jwt():
  82.     return get_user_info()
复制代码



问题


优点:
引入问题:
bejson jwt在线解密

Hmac

流程


和Digest Auth差不多,可以由客户端生成随机数,这样请求一次即可,随机数不可重复。
主要代码

  1. # 验证摘要
  2. def check_signature(signture:str,username:str,nonce:int,data:dict):
  3.     try:
  4.         cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
  5.         sql = f"select * from `secrets` where `username` = '{username}' limit 1"
  6.         secret = cli.select_one(sql)
  7.         cli.close()
  8.         if secret:
  9.             server_signature = hmac.new(str(secret['secret']).encode('utf-8'), json.dumps(data).encode('utf-8'), hashlib.sha256).hexdigest()
  10.             log.logger.info(f"secret:{str(secret['secret'])},data:{data},server_signature:{server_signature}")
  11.             return  server_signature == signture
  12.         log.logger.error(f"check_signature failed,error:{username} {secret}")
  13.         return False
  14.     except Exception:
  15.         log.logger.error(f"check_signature failed,error:{traceback.format_exc()}")
  16.         return False
  17. def hmac_auth(f):
  18.     @wraps(f)
  19.     def decorated_function(*args, **kwargs):
  20.         resp = {
  21.             "requestid": uuid.uuid4()
  22.         }
  23.         # query检查 nonce username
  24.         nonce = request.args.get('nonce', type=int)
  25.         username = request.args.get('username')
  26.         if nonce is None or username is None:
  27.             resp['error'] = 'username or nonce not found in query string'
  28.             log.logger.error(f"url:{request.url},resp:{resp}")
  29.             return jsonify(resp)
  30.         # 随机数验证
  31.         if check_nonce(nonce):
  32.             log.logger.error(f"check_signature failed,error:{nonce} is in database")
  33.             resp['error'] = f"check_signature failed,error:{nonce} is in database"
  34.             log.logger.error(f"url:{request.url},resp:{resp}")
  35.             return jsonify(resp)
  36.         save_nonce_with_expiry(nonce,1,MIN*60)
  37.         # signature检查
  38.         if "Signature" not in request.headers:
  39.             resp['error'] = 'signature is required in headers'
  40.             log.logger.error(f"url:{request.url},resp:{resp}")
  41.             return jsonify(resp)
  42.         # request是json
  43.         if request.is_json:
  44.             data = request.get_json()
  45.             if not check_signature(request.headers['Signature'], username, nonce, data):
  46.                 resp['error'] = f'signature {request.headers["Signature"]} is invalid'
  47.                 log.logger.error(f"url:{request.url},resp:{resp}")
  48.                 return jsonify(resp),401
  49.         else:
  50.             resp['error'] = "Invalid JSON format in request"
  51.             return jsonify(resp),400
  52.         return f(*args, **kwargs)
  53.     return decorated_function
  54. # 管理员注册用户接口 v2.0增加hmac sha256算法
  55. @app.route('/api/v2.0/admin/hmac/add_user', methods=['POST'])
  56. @hmac_auth
  57. def add_user_hmac():
  58.     resp = {
  59.         "requestid": uuid.uuid4()
  60.     }
  61.     # request是json
  62.     if request.is_json:
  63.         data = request.get_json()
  64.         username = data.get('username')
  65.         if username is None:
  66.             resp['error'] = 'username is required'
  67.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  68.             return jsonify(resp)
  69.         phone_number = data.get('phone_number')
  70.         if phone_number is None:
  71.             resp['error'] = 'phone_number is required'
  72.             log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
  73.             return jsonify(resp)
  74.         if not validate_username(username):
  75.             resp['error'] = 'username length should not exceed 10'
  76.             log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
  77.             return jsonify(resp)
  78.         try:
  79.             cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
  80.             cli.insert_one("users",
  81.                 {
  82.                     "username": username,
  83.                     "phone_number": phone_number
  84.                 }
  85.             )
  86.             secret = secrets.token_hex(16)
  87.             cli.insert_one("secrets",
  88.                            {
  89.                                "username": username,
  90.                                "secret": secret
  91.                            })
  92.             cli.close()
  93.             resp['message'] =f'success to add user:{username},secret {secret},remember it!'
  94.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  95.         except Exception:
  96.             log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
  97.             resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
  98.         return jsonify(resp)
  99.     else:
  100.         resp['error'] = "Invalid JSON format in request"
  101.         return jsonify(resp)
  102. # 用户获取信息接口 v2.0 增加hmac
  103. @app.route('/api/v2.0/hmac/get_user_info', methods=['POST'])
  104. @hmac_auth
  105. def get_user_info_hmac():
  106.     return get_user_info()
复制代码


问题


引入问题:
OAuth

内容太多,留坑先不看了
比较

比较项Basic AuthAPI keyBearer Auth/Token authDigest AuthHmac AuthJWT身份认证√√√√√√密钥加密×××√√×服务端存储√√√√√×(token可以不存)防重放×××√√×(token失效前可重放)时效性×××√√√自定义××√√√√ 通过以上比较,假如设计一个签名具有以下优点会比较好:

自定义请求签名


身份认证&密钥加密

请求必要验证身份,就必要有账密,这里就用SecretId、SecretKey,其中SecretKey用于进行签名的盘算。
防重放

为了防重放,生成一个随机数Nonce,Nonce唯一,这样服务端收到携带该Nonce的请求后,还发送带该Nonce的请求,就拒绝掉。因此Nonce必要服务器保存,同时为了防止篡改,签名时必要。
那么问题来了,Nonce必要服务器保存,不能不停保存吧,随着时间推移,存储成本会越来越高,因此必要时间限制。
请求时效性

请求应该具有时效性,这里使用unix时间戳Timestamp。规定在1分钟内请求有效,这样Nonce保存时间在1分钟即可。
请求签名算法设计

密钥加密选择SHA-256算法,当然算法可以当做参数,由客户端指定,就用Algorithm吧
url类似:xxx?SecretId=xxx&Nonce=xxx&Timestamp=xxx&Algorithm=xxx
添加一个自定义请求头Signature,放上签名,待签名字符串规定格式如下
   {Nonce}:{Timestamp}:{Algorithm}:{HTTPMethod}:{base64(HTTPBody)}
  
当然,还可以添加更多到待签名字符串
客户端

服务端

代码

  1. def verify_signature(signature, nonce,timestamp,algorithm,method,body_base64):
  2.     format_str = f"{nonce}:{timestamp}:{algorithm}:{method}:{body_base64}"
  3.     server_signature = hashlib.sha256(format_str.encode()).hexdigest()
  4.     log.logger.info(format_str)
  5.     log.logger.info(server_signature)
  6.     return signature == server_signature
  7. def require_signature(f):
  8.     @wraps(f)
  9.     def decorated_function(*args, **kwargs):
  10.         resp = {
  11.             "requestid": uuid.uuid4()
  12.         }
  13.         secret_id = request.args.get('SecretId', type=str)
  14.         nonce = request.args.get('Nonce', type=str)
  15.         timestamp = request.args.get('Timestamp', type=int)
  16.         algorithm = request.args.get('Algorithm', type=str)
  17.         if secret_id is None:
  18.             resp['error'] = 'No SecretId in query string'
  19.             return jsonify(resp),400
  20.         if nonce is None:
  21.             resp['error'] = 'No Nonce in query string'
  22.             return jsonify(resp),400
  23.         if timestamp is None:
  24.             resp['error'] = 'No Timestamp in query string'
  25.             return jsonify(resp),400
  26.         if algorithm is None:
  27.             resp['error'] = 'No Algorithm in query string'
  28.             return jsonify(resp),400
  29.         if algorithm not in ["sha256"]:
  30.             resp['error'] = f'can not support {algorithm}'
  31.             return jsonify(resp), 400
  32.         if (datetime.now() - timedelta(minutes=MIN)).timestamp() > timestamp:
  33.             resp['error'] = 'Request is send before 5 mins ago, check Timestamp'
  34.             return jsonify(resp), 400
  35.         if ':' in nonce:
  36.             resp['error'] = 'can not contain : in Nonce, generate a new one'
  37.             return jsonify(resp), 400
  38.         cli = redis.Redis.from_url(REDIS_URL)
  39.         if cli.exists(nonce):
  40.             resp['error'] = 'can not request with same Nonce'
  41.             return jsonify(resp), 400
  42.         else:
  43.             cli.setex(nonce, MIN*60, 1)
  44.         signature = request.headers.get('Signature')
  45.         if not signature:
  46.             resp['error'] = 'no Signature in headers'
  47.             return jsonify(resp), 400
  48.         # 解析 Authorization header,验证签名
  49.         body_base64 = bytes.decode(b64encode(json.dumps(request.get_json(),ensure_ascii=False).encode()))
  50.         if not verify_signature(signature, nonce,timestamp,algorithm,request.method,body_base64):
  51.             resp['error'] = 'invalid signature'
  52.             return jsonify(resp), 401
  53.         return f(*args, **kwargs)
  54.     return decorated_function
复制代码
攻击与防御

SQL注入

例如,在v1.0的get_user_info接口,存在将用户输入拼接到sql的毛病,可以被SQL注入。

抓包如下:

防御方面可以通过预编译等方式来解决
v3.0已解决

敏感信息泄露

例如,在v1.0的get_user_info接口,用户手机号被完整返回,没有打码。

防御上可以通过加*打码或MFA等来解决
v3.0已解决

越权攻击

例如,在v2.0的api key认证接口,任意用户都能查询admin用户的信息,只必要知道username即可

防御方面可以通过添加RBAC等鉴权来解决
重放攻击

例如,v2.0的api key认证接口,设置burpsuite代理,放到重放器Reapter,发送多少次都可以。

v3.0通过自定义请求签名就解决了此类问题。

全部代码

Github-api_history
参考

API-Security Owasp top 10
API 鉴权都有哪些分类,这些重点不要错过
best-practices-for-authentication-and-authorization-for-rest-apis/
pyjwt
https://github.com/ticarpi/jwt_tool
rfc6750-The OAuth 2.0 Authorization Framework: Bearer Token Usage
rfc7616-HTTP Digest Access Authentication
rfc2617-HTTP Authentication: Basic and Digest Access Authentication
rfc7519-JSON web Token (JWT)
Github-jwt_tool

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4