以下是一个利用 Python 实现的风控系统示例,涵盖以下技能组件:
- Kafka 消息中间件:用于实时吸取支付业务系统传递的买卖业务数据。
- Faust(Kafka Streams 的 Python 等价):用于流式处理 Kafka 中的消息。
- 规则引擎:利用 Python 实现简单的规则评估逻辑,模仿 Drools 的功能。
- Redis 内存数据库:用于存储风险标签,快速获取账户的风险级别。
- 分布式数据库:利用 SQLite 模仿,从中获取风险标签数据(当 Redis 中没有时)。
我们将构建一个简单的风控系统,流程如下:
- 从 Kafka 中消费实时买卖业务数据。
- 从 Redis 获取对应的风险标签,假如没有则从分布式数据库获取并更新到 Redis。
- 利用规则引擎对买卖业务数据和风险标签进行评估。
- 将评估结果返回给支付业务系统或纪录下来。
- 实时交易模块:接收交易数据 ——> 获取风险标签(Redis) ——> 调用规则引擎 ——> 评估结果返回
- ↓ ↓ ↑
- 规则引擎模块:交易数据 + 风险标签 ---> 规则执行 ----> 输出评估结果(通过/拒绝)
复制代码
项目结构和依赖
1. 项目结构
- risk_control_demo/
- ├── app.py # 主应用程序
- ├── models.py # 数据模型定义
- ├── rules.py # 规则引擎逻辑
- ├── database.py # 数据库服务类
- ├── redis_service.py # Redis 服务类
- ├── requirements.txt # 项目依赖
- └── producer.py # Kafka 生产者,发送测试数据
复制代码 2. 项目依赖(requirements.txt)
- faust==1.10.4
- redis==4.5.5
- aiokafka==0.7.2
- sqlite3==0.0.1
复制代码 安装依赖
- pip install -r requirements.txt
复制代码 详细代码
1. models.py(数据模子定义)
- # models.py
- from dataclasses import dataclass
- @dataclass
- class Transaction:
- transaction_id: str
- account_id: str
- amount: float
- timestamp: float
- @dataclass
- class RiskTag:
- account_id: str
- risk_level: int # 1-低风险, 2-中风险, 3-高风险
复制代码 2. database.py(数据库服务类)
- # database.py
- import sqlite3
- from models import RiskTag
- class DatabaseService:
- def __init__(self):
- # 连接 SQLite 数据库,内存模式
- self.conn = sqlite3.connect(':memory:')
- self.initialize_database()
- def initialize_database(self):
- cursor = self.conn.cursor()
- # 创建风险标签表
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS risk_tags (
- account_id TEXT PRIMARY KEY,
- risk_level INTEGER
- )
- ''')
- # 插入示例数据
- cursor.execute('''
- INSERT INTO risk_tags (account_id, risk_level) VALUES ('account123', 2)
- ''')
- self.conn.commit()
- def get_risk_tag(self, account_id):
- cursor = self.conn.cursor()
- cursor.execute('SELECT risk_level FROM risk_tags WHERE account_id = ?', (account_id,))
- result = cursor.fetchone()
- if result:
- return RiskTag(account_id, result[0])
- else:
- return None
- def close(self):
- self.conn.close()
复制代码 3. redis_service.py(Redis 服务类)
- # redis_service.py
- import redis
- from models import RiskTag
- class RedisService:
- def __init__(self, host='localhost', port=6379):
- self.redis_client = redis.Redis(host=host, port=port, decode_responses=True)
- def get_risk_tag(self, account_id):
- risk_level = self.redis_client.get(f'risk:{account_id}')
- if risk_level:
- return RiskTag(account_id, int(risk_level))
- return None
- def set_risk_tag(self, risk_tag):
- self.redis_client.set(f'risk:{risk_tag.account_id}', risk_tag.risk_level)
- def close(self):
- self.redis_client.close()
复制代码 4. rules.py(规则引擎逻辑)
- # rules.py
- from models import Transaction, RiskTag
- class RiskEvaluator:
- def evaluate(self, transaction: Transaction, risk_tag: RiskTag) -> bool:
- """
- 返回 True 表示交易存在风险,需要阻止。
- 返回 False 表示交易安全,可以通过。
- """
- # 高风险交易规则
- if transaction.amount > 10000 and risk_tag.risk_level == 3:
- print(f"检测到高风险交易:{transaction}")
- return True # 阻止交易
- # 中风险交易规则
- if 5000 < transaction.amount <= 10000 and risk_tag.risk_level >= 2:
- print(f"检测到中风险交易:{transaction}")
- return True # 阻止交易
- # 低风险交易规则
- print(f"交易通过:{transaction}")
- return False # 允许交易
复制代码 5. app.py(主应用程序)
- # app.py
- import faust
- import asyncio
- import json
- from models import Transaction, RiskTag
- from database.py import DatabaseService
- from redis_service import RedisService
- from rules import RiskEvaluator
- # 定义 Faust 应用
- app = faust.App(
- 'risk_control_app',
- broker='kafka://localhost:9092',
- value_serializer='raw',
- )
- # 定义 Kafka 主题
- transaction_topic = app.topic('transaction_topic')
- # 初始化服务
- redis_service = RedisService()
- database_service = DatabaseService()
- risk_evaluator = RiskEvaluator()
- @app.agent(transaction_topic)
- async def process_transaction(stream):
- async for event in stream:
- try:
- # 解析交易数据
- data = json.loads(event)
- transaction = Transaction(
- transaction_id=data['transaction_id'],
- account_id=data['account_id'],
- amount=data['amount'],
- timestamp=data['timestamp']
- )
- # 从 Redis 获取风险标签
- risk_tag = redis_service.get_risk_tag(transaction.account_id)
- if not risk_tag:
- # 如果 Redis 中没有,从数据库获取并更新到 Redis
- risk_tag = database_service.get_risk_tag(transaction.account_id)
- if risk_tag:
- redis_service.set_risk_tag(risk_tag)
- else:
- # 如果数据库中也没有,设定默认风险标签
- risk_tag = RiskTag(transaction.account_id, 1)
- # 使用规则引擎进行风险评估
- is_risky = risk_evaluator.evaluate(transaction, risk_tag)
- # 根据评估结果进行处理
- if is_risky:
- print(f"交易 {transaction.transaction_id} 存在风险,执行阻止操作")
- # TODO: 将结果返回给支付业务系统,阻止交易
- else:
- print(f"交易 {transaction.transaction_id} 安全,允许通过")
- # TODO: 将结果返回给支付业务系统,允许交易
- except Exception as e:
- print(f"处理交易时发生错误:{e}")
- if __name__ == '__main__':
- app.main()
复制代码 注释:
- 利用 Faust 定义 Kafka Streams 应用程序,处理 transaction_topic 中的消息。
- 在 process_transaction 函数中,逐条处理买卖业务数据。
- 从 Redis 获取风险标签,假如没有则从数据库获取并更新到 Redis。
- 利用自定义的 RiskEvaluator 进行风险评估,根据评估结果实行相应的操作
6. producer.py(Kafka 生产者,发送测试数据)
- # producer.py
- from kafka import KafkaProducer
- import json
- import time
- producer = KafkaProducer(
- bootstrap_servers='localhost:9092',
- value_serializer=lambda v: json.dumps(v).encode('utf-8')
- )
- # 创建示例交易数据
- transaction_data = {
- 'transaction_id': 'tx1001',
- 'account_id': 'account123',
- 'amount': 12000.0,
- 'timestamp': time.time()
- }
- # 发送交易数据到 Kafka
- producer.send('transaction_topic', transaction_data)
- producer.flush()
- print(f"已发送交易数据:{transaction_data}")
- producer.close()
复制代码 运行示例
1. 启动须要的服务
注意事项
总结
上述示例提供了一个基本的 Python 程序框架,演示了如何将 Kafka、Faust、Redis、规则引擎和分布式数据库集成在一起,完成实时风控的基本功能。您可以根据详细的业务需求和技能环境,对程序进行扩展和优化。
扩展发起:
- Redis:确保 Redis 服务在当地的 6379 端口运行
- Kafka:确保 Kafka 服务在当地的 9092 端口运行,并创建主题 transaction_topic。
- # 启动 Zookeeper
- zookeeper-server-start.sh config/zookeeper.properties
- # 启动 Kafka
- kafka-server-start.sh config/server.properties
- # 创建主题
- kafka-topics.sh --create --topic transaction_topic --bootstrap-server localhost:9092
复制代码 2. 运行应用程序
- 启动风控系统(app.py):
- python app.py worker -l info
复制代码 运行 Kafka 生产者,发送买卖业务数据(producer.py):
- 3. 预期输出
风控系统将处理买卖业务数据,利用规则引擎进行评估,并根据规则打印评估结果。例如:
- 检测到高风险交易:Transaction(transaction_id='tx1001', account_id='account123', amount=12000.0, timestamp=...)
- 交易 tx1001 存在风险,执行阻止操作
复制代码 说明
- Faust:Python 的流式处理库,类似于 Kafka Streams,用于处理 Kafka 中的消息流。
- 规则引擎:利用 Python 自定义规则评估逻辑,模仿 Drools 的功能。
- Redis:作为缓存,存储风险标签,快速获取账户的风险级别。
- 分布式数据库(SQLite 模仿):当 Redis 中没有风险标签时,从数据库获取,并更新到 Redis。
- 风险标签:简单地利用风险级别(1-低风险,2-中风险,3-高风险)来表现。
- 异常处理:在实际应用中,必要更美满的异常处理机制,防止因异常导致程序崩溃。
- 引入异步 Redis 客户端:利用 aioredis 提拔 Redis 操作的性能。
- 利用真正的分布式数据库:替换 SQLite,利用例如 PostgreSQL、MySQL 等数据库,并设置集群模式。
- 美满规则引擎:利用现有的 Python 规则引擎库(如 durable_rules、experta)实现更复杂的规则逻辑。
- 添加日记和监控:集成日记系统和监控工具,便于维护和故障排查。
- 性能优化:对于高并发场景,必要考虑异步 I/O、毗连池等技能优化性能。
- 设置管理:将硬编码的设置(如主机地址、端口、主题名)提取到设置文件或环境变量中,便于管理和修改。
- 安全性:在生产环境中,注意保护敏感信息,确保数据传输和存储的安全。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |