ToB企服应用市场:ToB评测及商务社交产业平台
标题:
网络安全-怎样设计一个安全的API(安全角度)
[打印本页]
作者:
曂沅仴駦
时间:
2024-7-12 01:12
标题:
网络安全-怎样设计一个安全的API(安全角度)
API安全概述
使用API可进行以下常见的攻击:
注入攻击(SQL注入、下令注入、XSS等)
DOS/DDOS攻击
SSRF
未授权/水平(垂直)越权
敏感数据泄露
中间人攻击
更改请求方法调用
并发攻击
重放攻击
数据篡改和伪造
有以下常见的防御方式:
资源请求限制,通过限频等手段来解决DOS、DDOS攻击
线程加锁来解决并发攻击
权限控制,通过ABAC、RBAC等方式解决越权攻击
敏感信息防泄露,通太过类分级引擎,数据库加密存储等方式来解决敏感信息泄露
防重放,通过API认证解决重放攻击
加密,例如HTTPS来解决中间人攻击
安全产物,例如API网关、WAF等来解决大部分攻击
当然,有些还是必要API后端代码来进行防御,例如下令注入、SSRF等。
本文以
API身份认证
为主要内容,浅谈各种认证的使用场景与优缺点,同时穿插部分攻击与防御。
设计一个安全的API
一个基本的API
主要代码
import uuid
import re
import traceback
from flask import Flask, request, jsonify
from mysql import MysqlCli
from log import log
from setting import *
app = Flask(__name__)
log.set_file()
# 验证username
def validate_username(username:str)->bool:
if len(username) > 20:
return False
return True
# 验证手机号
def validate_phone_number(phone_number:str)->bool:
# 使用正则表达式检查手机号格式
pattern = re.compile(r'^1[3456789]\d{9}$')
if re.match(pattern, phone_number):
return True
else:
return False
# 管理员注册用户接口
@app.route('/api/v1.0/admin/add_user', methods=['POST'])
def add_user():
resp = {
"requestid": uuid.uuid4()
}
if request.is_json:
data = request.get_json()
username = data.get('username')
if username is None:
resp['error'] = 'username is required'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
phone_number = data.get('phone_number')
if phone_number is None:
resp['error'] = 'phone_number is required'
log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
return jsonify(resp),400
if not validate_username(username):
resp['error'] = 'username length should not exceed 10'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
try:
cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
cli.insert_one("users",
{
"username": username,
"phone_number": phone_number
}
)
cli.close()
resp['message'] = f'success to add user:{username}!'
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp)
except Exception:
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
return jsonify(resp),500
else:
resp['error'] = "Invalid JSON format in request"
return jsonify(resp),400
# 用户获取信息接口
@app.route('/api/v1.0/get_user_info', methods=['POST'])
def get_user_info():
resp = {
"requestid": uuid.uuid4()
}
if request.is_json:
data = request.get_json()
username = data.get('username')
if username is None:
resp['error'] = 'username is required'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
if not validate_username(username):
resp['error'] = 'username length should not exceed 10'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
try:
cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
sql = f"select * from users where username = '{username}' limit 1"
user = cli.select_all(sql)
resp['message'] = f'success to get user:{user}.'
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp)
except Exception:
resp['error'] = f'failed to get user, error:{traceback.format_exc()}'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),500
else:
resp['error'] = "Invalid JSON format in request"
return jsonify(resp),400
if __name__ == '__main__':
app.run()
复制代码
可以看到共有两个API,管理员注册用户接口、用户获取信息接口,拥有以下功能或安全步伐:
版本控制
日记记载
请求方法校验
请求数据校验
调用
API的一些问题
没有身份认证,只要有人知道api地址、方法、参数就能调用
没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
没有对重放攻击的防御,抓包后可能重放包
越权问题,管理员接口任何人都可以调用
SQL注入,没有做参数化查询等防止SQL操纵,可能被拖库
BasicAuth
认证流程
客户端发送请求头Authorization为Basic username:password(base64编码)的数据包
服务端对请求头Authorization判断,解码后从数据库查询判断账号密码是否正确
主要代码
def basic_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
resp = {
"requestid": uuid.uuid4()
}
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Basic '):
resp['error'] = 'basic auth is required'
log.logger.error(f"url:{request.url},resp:{resp}")
return jsonify(resp), 400
else:
try:
encoded_credentials = auth_header.split(' ')[1]
decoded_credentials = base64.b64decode(encoded_credentials).decode('utf-8')
username, password = decoded_credentials.split(':')
if not check_basic_auth(username, password):
resp['error'] = 'basic auth failed,check your username or password is right'
log.logger.error(f"url:{request.url},resp:{resp}")
return jsonify(resp), 401
except Exception:
resp['error'] = f'basic auth failed,err: {traceback.format_exc()}'
log.logger.error(f"url:{request.url},resp:{resp}")
return jsonify(resp), 500
return f(*args, **kwargs)
return decorated_function
# 管理员注册用户接口 v2.0 增加密码
@app.route('/api/v2.0/admin/basic_auth/add_user', methods=['POST'])
@basic_auth
def add_user_basic_auth():
resp = {
"requestid": uuid.uuid4()
}
# 请求是json格式
if request.is_json:
data = request.get_json()
username = data.get('username')
# username检查
if username is None:
resp['error'] = 'username is required'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
if not validate_username(username):
resp['error'] = 'username length should not exceed 10'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
# phone_number检查
phone_number = data.get('phone_number')
if phone_number is None:
resp['error'] = 'phone_number is required'
log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
return jsonify(resp),400
# 插入数据库
try:
cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
cli.insert_one("users",
{
"username": username,
"phone_number": phone_number
}
)
pwd = generate_random_password(secrets.choice(range(8, 17)))
cli.insert_one("passwords", {
"username": username,
"password": pwd
})
cli.close()
resp['message'] = f'success to add user:{username},password {pwd},remember it!'
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
# 异常返回
except Exception:
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}",500
return jsonify(resp)
return jsonify(resp)
# 请求不是json格式
else:
resp['error'] = "Invalid JSON format in request"
return jsonify(resp),400
复制代码
问题
没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
没有对重放攻击的防御,抓包后可能重放包
越权问题,管理员接口任何人都可以调用
SQL注入,没有做参数化查询等防止SQL操纵,可能被拖库
引入问题:
请求的密码进行base64编码,轻易获得并解码获得明文
数据库密码明文存储
数据库sql语句执行没有按事件处理,可能用户插入到数据库但密码没有入库
API Key
流程
客户端发送请求时通过query string、请求头(X-API-Key或其他自定义请求头)、Cookie中携带apikey
服务端根据约定的方式获取后查询数据库判断是否存在
主要代码
def api_key_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
resp = {
"requestid": uuid.uuid4()
}
# api key检查
if "api_key" not in request.headers:
resp['error'] = 'api_key is required in headers'
log.logger.error(f"url:{request.url},resp:{resp}")
return jsonify(resp), 400
if not check_api_key(request.headers["api_key"]):
resp['error'] = f'api_key {request.headers["api_key"]} is invalid'
log.logger.error(f"url:{request.url},resp:{resp}")
return jsonify(resp), 401
return f(*args, **kwargs)
return decorated_function
# 管理员注册用户接口 v2.0 增加api token
@app.route('/api/v2.0/admin/api_key/add_user', methods=['POST'])
@api_key_auth
def add_user_api_key():
resp = {
"requestid": uuid.uuid4()
}
# request是json
if request.is_json:
data = request.get_json()
username = data.get('username')
if username is None:
resp['error'] = 'username is required'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
phone_number = data.get('phone_number')
if phone_number is None:
resp['error'] = 'phone_number is required'
log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
return jsonify(resp),400
if not validate_username(username):
resp['error'] = 'username length should not exceed 10'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
try:
cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
cli.insert_one("users",
{
"username": username,
"phone_number": phone_number
}
)
key = secrets.token_hex(16)
cli.insert_one("keys",
{
"username": username,
"key": key
})
cli.close()
resp['message'] =f'success to add user:{username},key {key},remember it!'
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
except Exception:
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
return jsonify(resp),500
return jsonify(resp)
else:
resp['error'] = "Invalid JSON format in request"
return jsonify(resp),400
复制代码
博主这里实现时,添加了自定义请求头api_key
问题
没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
没有对重放攻击的防御,抓包后可能重放包
越权问题,管理员接口任何人都可以调用
SQL注入,没有做参数化查询等防止SQL操纵,可能被拖库
引入问题:
API key明文发送,容器抓包获取
API key明文存储
数据库sql语句执行没有按事件处理,可能用户插入到数据库但api key没有入库
Bearer auth/Token auth
流程
请求端通过某种认证机制(好比用户名密码登录、OAuth 认证等)获取令牌。 在发起 HTTP 请求时,客户端将这个令牌添加到Authorization 请求头中,格式为 “Bearer token”。
服务器接收到请求后,会验证这个令牌的有效性,假如有效则允许请求继续处理,否则拒绝访问。
使用 Bearer authentication 的优势在于令牌本身可以包罗更多的信息、具有较长的有效期,并且不必要在服务器端保存会话状态,这样可以减轻服务器负担并提高安全性。
这里就不实现了,后面通过jwt token,算是实现其中的一种。
Digest Auth
流程
客户端发送一个未经认证的请求给服务器。 服务器返回一个
401 Unauthorized
响应,有一个响应头
WWW-Authenticate
,其中包罗一个随机数(nonce)和其他认证必要的信息。
客户端收到 401响应后,会向用户提示输入用户名和密码,然后根据特定的算法(通常是 MD5)对用户名、密码、随机数(nonce)、HTTP 方法和请求的URI 进行
摘要盘算
。
客户端将盘算出的摘要放在
Authorization
请求头中发送给服务器。
服务器收到请求后,会根据事先约定好的算法再次盘算摘要,假如两个摘要匹配,则验证通过,否则拒绝访问。
涉及的几个常见参数如下:
realm,必选。是一个掩护空间的名称,用于向用户表明请求的资源属于哪个掩护空间。它通常用于表现一组受掩护的资源,用于构造摘要字符串。
nonce,必选。是一个
唯一
的字符串,401时由服务器生成并发送给客户端。它用于防止重放攻击(replay attack)。每次认证请求都会使用一个新的 nonce 值,使得每次请求的摘要都是不同的,从而提高了安全性。
qop(Quality of Protection) ,必选。可以是 “auth” 或 “auth-int”。auth 代表身份验证,而 auth-int 代表身份验证和消息完整性掩护。Qop 的存在使得摘要认证更加灵活和安全。
algorithm,可选,默认MD5。指定了用于盘算摘要的哈希算法,通常是 MD5。服务器在 WWW-Authenticate 响应头中指定,客户端按照这个算法进行摘要盘算。还有MD5-sess、SHA、SHA-256、SHA-512等。
nc,可选,是一个计数器,用于跟踪特定 nonce 值的使用次数。每次客户端发送请求时,Nc 都会递增,帮助防止重放攻击。
cnonce(Client Nonce) ,可选。是客户端生成的随机字符串,用于与服务器的 nonce 一起使用,以增加请求的独特性和安全性。
opaque,可选。是服务器生成的字符串,客户端在后续请求中必须原样返回。它用来保持服务器状态或防止某些类型的攻击。
charset,可选。默认utf-8,编码方式。
userhash,可选。默认false。服务端返回的是否支持username哈希。
主要代码
# 将字符串保存到Redis中,并设置过期时间
def save_nonce_with_expiry(key, value, expiry_seconds):
"""
:param key: 键
:param value: 值
:param expiry_seconds: 过期时间(秒)
"""
# 连接Redis数据库
redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD, port=REDIS_PORT, db=REDIS_DB)
redis_client.setex(key, expiry_seconds, value)
def check_nonce(key):
"""
检查字符串是否存在于Redis中
:param key: 键
:return: 布尔值,表示键是否存在
"""
# 连接Redis数据库
redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD ,port=REDIS_PORT, db=REDIS_DB)
return redis_client.exists(key)
# 校验username 返回密码
def check_username(username):
try:
cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
sql = f"select * from passwords where username = '{username}' limit 1"
user = cli.select_one(sql)
return user['password'] if user else ''
except Exception:
log.logger.error(f"check_basic_auth failed,error:{traceback.format_exc()}")
return False
# 校验response
def check_response(response,realm,username,password,method,uri,req_nonce,nc,cnonce,qop):
try:
log.logger.info(f"check_response ,response:{response},realm:{realm},username:{username},"
f"password:{password},method:{method},uri:{uri},req_nonce:{req_nonce},nc:{nc},"
f"cnonce:{cnonce},qop:{qop}")
# 校验nonce
if not check_nonce(req_nonce):
log.logger.error(f"check_signature failed,error:{req_nonce} not exist!")
return False
ha1=hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest()
ha2=hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
return response == hashlib.md5(f"{ha1}:{req_nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode()).hexdigest()
except Exception:
log.logger.error(f"check_responce failed,error:{traceback.format_exc()}")
return False
# 添加用户 用于添加用户时无需额外增加用户关联的密钥等信息时的接口
def add_user():
resp = {
"requestid": uuid.uuid4()
}
log.logger.info(request.headers)
# request是json
if request.is_json:
data = request.get_json()
username = data.get('username')
if username is None:
resp['error'] = 'username is required'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp)
phone_number = data.get('phone_number')
if phone_number is None:
resp['error'] = 'phone_number is required'
log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
return jsonify(resp)
if not validate_username(username):
resp['error'] = 'username length should not exceed 10'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp)
try:
cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
cli.insert_one("users",
{
"username": username,
"phone_number": phone_number
}
)
cli.close()
resp['message'] = f'success to add user:{username}!'
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
except Exception:
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
return jsonify(resp)
else:
resp['error'] = "Invalid JSON format in request"
return jsonify(resp)
# 管理员注册用户接口 v2.0增加digest算法
@app.route('/api/v2.0/admin/digest_auth/add_user', methods=['POST'])
@digest_auth
def add_user_digest():
return add_user()
复制代码
输入账号密码后登录,发起请求
问题
没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
没有对重放攻击的防御,抓包后可能重放包
越权问题,管理员接口任何人都可以调用
SQL注入,没有做参数化查询等防止SQL操纵,可能被拖库
优点:
可以防止重放攻击
引入问题:
发送两个请求,更加斲丧资源
必要存储nonce,这里有设置逾期时间
实现比较复杂
JWT Token
流程
通常由三个部分构成:header、payload 和 signature。
header:包罗两个部分:令牌类型(即 “JWT”)和所使用的签名算法(如 HMAC SHA256 或 RSA)。
payload:包罗声明(claims)。声明是关于实体(通常是用户)和其他数据的声明。
iss(Issuer):该声明标识了 JWT 的发行者。
sub(Subject):该声明标识了 JWT 的主题,即所形貌的实体。
aud(Audience):该声明标识了 JWT 的受众,即预期的接收者。
exp(Expiration Time):该声明指定了 JWT 的逾期时间,在此时间之后,JWT 将被认为是无效的。
nbf(Not Before):该声明指定了 JWT 的生效时间,在此时间之前,JWT 将被认为是无效的。
iat(Issued At):该声明指定了 JWT 的签发时间。
jti(JWT ID):该声明为 JWT 提供了一个唯一标识符。
signature:为了确保 JWT 未被篡改,必要对编码后的 header 和 payload 使用指定的签名算法和一个密钥进行签名。
代码
def jwt_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
resp = {
"requestid": uuid.uuid4()
}
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith("Bearer "):
resp["message"] = "no Authorization header or invalid format"
return jsonify(resp), 400
try:
token = auth_header.split(' ')[1]
jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
resp["error"] = "token has expired"
log.logger.error(f"url:{request.url},token:{token},resp:{resp}")
return jsonify(resp), 401
except jwt.InvalidTokenError:
resp["message"] = f"invalid token {token}"
log.logger.error(f"url:{request.url},token:{token},resp:{resp}")
return jsonify(resp), 401
except Exception as e:
resp["message"] = f"interal error {e}"
log.logger.error(f"url:{request.url},token:{token},resp:{resp}")
return jsonify(resp), 500
return f(*args, **kwargs)
return decorated_function
# 用户登录获取jwt token接口
@app.route('/api/v2.0/admin/jwt/login', methods=['POST'])
def get_jwt_token():
resp = {
"requestid": uuid.uuid4()
}
if request.is_json:
data = request.get_json()
username = data.get('username')
password = data.get('password')
if username is None or password is None:
resp['error'] = 'username or password is required'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
if not validate_username(username):
resp['error'] = 'username length should not exceed 10'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),400
try:
cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
sql = f"select * from passwords where username = '{username}' limit 1"
user = cli.select_one(sql)
if not user:
resp['error'] = 'username not found'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp), 400
if user['password'] != password:
resp['error'] = 'password is not right'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp), 401
payload = {
"iss": "lady_killer9",
"exp": datetime.now() + timedelta(seconds=5*60),
"jti": str(uuid.uuid4())
}
resp['message'] = f'success to login :{user},token:{jwt.encode(payload,JWT_SECRET,algorithm="HS256")}'
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp), 200
except Exception:
resp['error'] = f'failed to get user, error:{traceback.format_exc()}'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp),500
else:
resp['error'] = "Invalid JSON format in request"
return jsonify(resp),400
# 管理员注册用户接口 v2.0增加jwt sha256算法
@app.route('/api/v2.0/admin/jwt/add_user', methods=['POST'])
@jwt_auth
def add_user_jwt():
return add_user()
# 用户获取信息接口 v2.0 增加jwt sha256算法
@app.route('/api/v2.0/jwt/get_user_info', methods=['POST'])
@jwt_auth
def get_user_info_jwt():
return get_user_info()
复制代码
问题
没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
没有对重放攻击的防御,抓包后可能重放包
越权问题,管理员接口任何人都可以调用
SQL注入,没有做参数化查询等防止SQL操纵,可能被拖库
优点:
token可以设置时间限制,逾期后不可调用
payload的aud等参数可以用于鉴权
可添加自定义payload,方便做其他的功能
引入问题:
jwt token轻易破解
bejson jwt在线解密
Hmac
流程
和Digest Auth差不多,可以由
客户端生成随机数
,这样
请求一次
即可,随机数不可重复。
主要代码
# 验证摘要
def check_signature(signture:str,username:str,nonce:int,data:dict):
try:
cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
sql = f"select * from `secrets` where `username` = '{username}' limit 1"
secret = cli.select_one(sql)
cli.close()
if secret:
server_signature = hmac.new(str(secret['secret']).encode('utf-8'), json.dumps(data).encode('utf-8'), hashlib.sha256).hexdigest()
log.logger.info(f"secret:{str(secret['secret'])},data:{data},server_signature:{server_signature}")
return server_signature == signture
log.logger.error(f"check_signature failed,error:{username} {secret}")
return False
except Exception:
log.logger.error(f"check_signature failed,error:{traceback.format_exc()}")
return False
def hmac_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
resp = {
"requestid": uuid.uuid4()
}
# query检查 nonce username
nonce = request.args.get('nonce', type=int)
username = request.args.get('username')
if nonce is None or username is None:
resp['error'] = 'username or nonce not found in query string'
log.logger.error(f"url:{request.url},resp:{resp}")
return jsonify(resp)
# 随机数验证
if check_nonce(nonce):
log.logger.error(f"check_signature failed,error:{nonce} is in database")
resp['error'] = f"check_signature failed,error:{nonce} is in database"
log.logger.error(f"url:{request.url},resp:{resp}")
return jsonify(resp)
save_nonce_with_expiry(nonce,1,MIN*60)
# signature检查
if "Signature" not in request.headers:
resp['error'] = 'signature is required in headers'
log.logger.error(f"url:{request.url},resp:{resp}")
return jsonify(resp)
# request是json
if request.is_json:
data = request.get_json()
if not check_signature(request.headers['Signature'], username, nonce, data):
resp['error'] = f'signature {request.headers["Signature"]} is invalid'
log.logger.error(f"url:{request.url},resp:{resp}")
return jsonify(resp),401
else:
resp['error'] = "Invalid JSON format in request"
return jsonify(resp),400
return f(*args, **kwargs)
return decorated_function
# 管理员注册用户接口 v2.0增加hmac sha256算法
@app.route('/api/v2.0/admin/hmac/add_user', methods=['POST'])
@hmac_auth
def add_user_hmac():
resp = {
"requestid": uuid.uuid4()
}
# request是json
if request.is_json:
data = request.get_json()
username = data.get('username')
if username is None:
resp['error'] = 'username is required'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp)
phone_number = data.get('phone_number')
if phone_number is None:
resp['error'] = 'phone_number is required'
log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
return jsonify(resp)
if not validate_username(username):
resp['error'] = 'username length should not exceed 10'
log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
return jsonify(resp)
try:
cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
cli.insert_one("users",
{
"username": username,
"phone_number": phone_number
}
)
secret = secrets.token_hex(16)
cli.insert_one("secrets",
{
"username": username,
"secret": secret
})
cli.close()
resp['message'] =f'success to add user:{username},secret {secret},remember it!'
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
except Exception:
log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
return jsonify(resp)
else:
resp['error'] = "Invalid JSON format in request"
return jsonify(resp)
# 用户获取信息接口 v2.0 增加hmac
@app.route('/api/v2.0/hmac/get_user_info', methods=['POST'])
@hmac_auth
def get_user_info_hmac():
return get_user_info()
复制代码
问题
没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
没有对重放攻击的防御,抓包后可能重放包
越权问题,管理员接口任何人都可以调用
SQL注入,没有做参数化查询等防止SQL操纵,可能被拖库
引入问题:
必要存储nonce,这里有设置逾期时间
OAuth
内容太多,留坑先不看了
比较
比较项Basic AuthAPI keyBearer Auth/Token authDigest AuthHmac AuthJWT身份认证√√√√√√密钥加密×××√√×服务端存储√√√√√×(token可以不存)防重放×××√√×(token失效前可重放)时效性×××√√√自定义××√√√√ 通过以上比较,假如设计一个签名具有以下优点会比较好:
防重放。通过随机数防止请求重放,随机数由客户端盘算可以减轻服务端压力。
密钥加密。使用密钥盘算签名。
服务端存储。存储随机数,设置逾期时间。
时效性。添加时间戳,逾期后请求失败。
自定义。自定义请求签名当然可以自定义一些东西,用于鉴权等。
自定义请求签名
身份认证&密钥加密
请求必要验证身份,就必要有账密,这里就用SecretId、SecretKey,其中SecretKey用于进行签名的盘算。
防重放
为了防重放,生成一个随机数Nonce,Nonce唯一,这样服务端收到携带该Nonce的请求后,还发送带该Nonce的请求,就拒绝掉。因此Nonce必要服务器保存,同时为了防止篡改,签名时必要。
那么问题来了,Nonce必要服务器保存,不能不停保存吧,随着时间推移,存储成本会越来越高,因此必要时间限制。
请求时效性
请求应该具有时效性,这里使用unix时间戳Timestamp。规定在1分钟内请求有效,这样Nonce保存时间在1分钟即可。
请求签名算法设计
密钥加密选择SHA-256算法,当然算法可以当做参数,由客户端指定,就用Algorithm吧
url类似:xxx?SecretId=xxx&Nonce=xxx&Timestamp=xxx&Algorithm=xxx
添加一个自定义请求头Signature,放上签名,待签名字符串规定格式如下
{Nonce}:{Timestamp}:{Algorithm}:{HTTPMethod}:{base64(HTTPBody)}
HTTPMethod:HTTP请求方法,例如POST
HTTPBody:HTTP请求体,例如{“name”:“lady_killer9”}
当然,还可以添加更多到待签名字符串
客户端
生成一个随机字符串(不包罗:)
对请求头和请求体做base64编码,并按照格式拼接
使用算法和SecretKey进行签名
生成时间戳,发送请求
服务端
从url获取时间戳Timestamp,校验是否在时间内
从url获取随机字符串Nonce,校验是否在数据库中
对请求头和请求体做base64编码
从url获取SecretId后从数据库查询对应的SecretKey
使用SecretKey和Algorithm算法对拼接同样格式字符串进行签名得到ServerSignature,比较Signature是否与请求头Signature的值同等
代码
def verify_signature(signature, nonce,timestamp,algorithm,method,body_base64):
format_str = f"{nonce}:{timestamp}:{algorithm}:{method}:{body_base64}"
server_signature = hashlib.sha256(format_str.encode()).hexdigest()
log.logger.info(format_str)
log.logger.info(server_signature)
return signature == server_signature
def require_signature(f):
@wraps(f)
def decorated_function(*args, **kwargs):
resp = {
"requestid": uuid.uuid4()
}
secret_id = request.args.get('SecretId', type=str)
nonce = request.args.get('Nonce', type=str)
timestamp = request.args.get('Timestamp', type=int)
algorithm = request.args.get('Algorithm', type=str)
if secret_id is None:
resp['error'] = 'No SecretId in query string'
return jsonify(resp),400
if nonce is None:
resp['error'] = 'No Nonce in query string'
return jsonify(resp),400
if timestamp is None:
resp['error'] = 'No Timestamp in query string'
return jsonify(resp),400
if algorithm is None:
resp['error'] = 'No Algorithm in query string'
return jsonify(resp),400
if algorithm not in ["sha256"]:
resp['error'] = f'can not support {algorithm}'
return jsonify(resp), 400
if (datetime.now() - timedelta(minutes=MIN)).timestamp() > timestamp:
resp['error'] = 'Request is send before 5 mins ago, check Timestamp'
return jsonify(resp), 400
if ':' in nonce:
resp['error'] = 'can not contain : in Nonce, generate a new one'
return jsonify(resp), 400
cli = redis.Redis.from_url(REDIS_URL)
if cli.exists(nonce):
resp['error'] = 'can not request with same Nonce'
return jsonify(resp), 400
else:
cli.setex(nonce, MIN*60, 1)
signature = request.headers.get('Signature')
if not signature:
resp['error'] = 'no Signature in headers'
return jsonify(resp), 400
# 解析 Authorization header,验证签名
body_base64 = bytes.decode(b64encode(json.dumps(request.get_json(),ensure_ascii=False).encode()))
if not verify_signature(signature, nonce,timestamp,algorithm,request.method,body_base64):
resp['error'] = 'invalid signature'
return jsonify(resp), 401
return f(*args, **kwargs)
return decorated_function
复制代码
攻击与防御
SQL注入
例如,在v1.0的get_user_info接口,存在将用户输入拼接到sql的毛病,可以被SQL注入。
抓包如下:
防御方面可以通过预编译等方式来解决
v3.0已解决
敏感信息泄露
例如,在v1.0的get_user_info接口,用户手机号被完整返回,没有打码。
防御上可以通过加*打码或MFA等来解决
v3.0已解决
越权攻击
例如,在v2.0的api key认证接口,任意用户都能查询admin用户的信息,只必要知道username即可
防御方面可以通过添加RBAC等鉴权来解决
重放攻击
例如,v2.0的api key认证接口,设置burpsuite代理,放到重放器Reapter,发送多少次都可以。
v3.0通过自定义请求签名就解决了此类问题。
全部代码
Github-api_history
参考
API-Security Owasp top 10
API 鉴权都有哪些分类,这些重点不要错过
best-practices-for-authentication-and-authorization-for-rest-apis/
pyjwt
https://github.com/ticarpi/jwt_tool
rfc6750-The OAuth 2.0 Authorization Framework: Bearer Token Usage
rfc7616-HTTP Digest Access Authentication
rfc2617-HTTP Authentication: Basic and Digest Access Authentication
rfc7519-JSON web Token (JWT)
Github-jwt_tool
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4