Python 实现的风控系统(利用了kafka、Faust、模仿drools、redis、分布式数 ...

打印 上一主题 下一主题

主题 835|帖子 835|积分 2505

以下是一个利用 Python 实现的风控系统示例,涵盖以下技能组件:

  • Kafka 消息中间件:用于实时吸取支付业务系统传递的买卖业务数据。
  • Faust(Kafka Streams 的 Python 等价):用于流式处理 Kafka 中的消息。
  • 规则引擎:利用 Python 实现简单的规则评估逻辑,模仿 Drools 的功能。
  • Redis 内存数据库:用于存储风险标签,快速获取账户的风险级别。
  • 分布式数据库:利用 SQLite 模仿,从中获取风险标签数据(当 Redis 中没有时)。
我们将构建一个简单的风控系统,流程如下:


  • 从 Kafka 中消费实时买卖业务数据。
  • 从 Redis 获取对应的风险标签,假如没有则从分布式数据库获取并更新到 Redis。
  • 利用规则引擎对买卖业务数据和风险标签进行评估。
  • 将评估结果返回给支付业务系统或纪录下来。
    1. 实时交易模块:接收交易数据 ——> 获取风险标签(Redis) ——> 调用规则引擎 ——> 评估结果返回
    2.       ↓                                           ↓                          ↑
    3. 规则引擎模块:交易数据 + 风险标签 ---> 规则执行 ----> 输出评估结果(通过/拒绝)
    复制代码

     
项目结构和依赖

1. 项目结构
  1. risk_control_demo/
  2. ├── app.py                      # 主应用程序
  3. ├── models.py                   # 数据模型定义
  4. ├── rules.py                    # 规则引擎逻辑
  5. ├── database.py                 # 数据库服务类
  6. ├── redis_service.py            # Redis 服务类
  7. ├── requirements.txt            # 项目依赖
  8. └── producer.py                 # Kafka 生产者,发送测试数据
复制代码
2. 项目依赖(requirements.txt)
  1. faust==1.10.4
  2. redis==4.5.5
  3. aiokafka==0.7.2
  4. sqlite3==0.0.1
复制代码
安装依赖
  1. pip install -r requirements.txt
复制代码
详细代码

1. models.py(数据模子定义)

  1. # models.py
  2. from dataclasses import dataclass
  3. @dataclass
  4. class Transaction:
  5.     transaction_id: str
  6.     account_id: str
  7.     amount: float
  8.     timestamp: float
  9. @dataclass
  10. class RiskTag:
  11.     account_id: str
  12.     risk_level: int  # 1-低风险, 2-中风险, 3-高风险
复制代码
2. database.py(数据库服务类)

  1. # database.py
  2. import sqlite3
  3. from models import RiskTag
  4. class DatabaseService:
  5.     def __init__(self):
  6.         # 连接 SQLite 数据库,内存模式
  7.         self.conn = sqlite3.connect(':memory:')
  8.         self.initialize_database()
  9.     def initialize_database(self):
  10.         cursor = self.conn.cursor()
  11.         # 创建风险标签表
  12.         cursor.execute('''
  13.             CREATE TABLE IF NOT EXISTS risk_tags (
  14.                 account_id TEXT PRIMARY KEY,
  15.                 risk_level INTEGER
  16.             )
  17.         ''')
  18.         # 插入示例数据
  19.         cursor.execute('''
  20.             INSERT INTO risk_tags (account_id, risk_level) VALUES ('account123', 2)
  21.         ''')
  22.         self.conn.commit()
  23.     def get_risk_tag(self, account_id):
  24.         cursor = self.conn.cursor()
  25.         cursor.execute('SELECT risk_level FROM risk_tags WHERE account_id = ?', (account_id,))
  26.         result = cursor.fetchone()
  27.         if result:
  28.             return RiskTag(account_id, result[0])
  29.         else:
  30.             return None
  31.     def close(self):
  32.         self.conn.close()
复制代码
3. redis_service.py(Redis 服务类)
  1. # redis_service.py
  2. import redis
  3. from models import RiskTag
  4. class RedisService:
  5.     def __init__(self, host='localhost', port=6379):
  6.         self.redis_client = redis.Redis(host=host, port=port, decode_responses=True)
  7.     def get_risk_tag(self, account_id):
  8.         risk_level = self.redis_client.get(f'risk:{account_id}')
  9.         if risk_level:
  10.             return RiskTag(account_id, int(risk_level))
  11.         return None
  12.     def set_risk_tag(self, risk_tag):
  13.         self.redis_client.set(f'risk:{risk_tag.account_id}', risk_tag.risk_level)
  14.     def close(self):
  15.         self.redis_client.close()
复制代码
 4. rules.py(规则引擎逻辑)
  1. # rules.py
  2. from models import Transaction, RiskTag
  3. class RiskEvaluator:
  4.     def evaluate(self, transaction: Transaction, risk_tag: RiskTag) -> bool:
  5.         """
  6.         返回 True 表示交易存在风险,需要阻止。
  7.         返回 False 表示交易安全,可以通过。
  8.         """
  9.         # 高风险交易规则
  10.         if transaction.amount > 10000 and risk_tag.risk_level == 3:
  11.             print(f"检测到高风险交易:{transaction}")
  12.             return True  # 阻止交易
  13.         # 中风险交易规则
  14.         if 5000 < transaction.amount <= 10000 and risk_tag.risk_level >= 2:
  15.             print(f"检测到中风险交易:{transaction}")
  16.             return True  # 阻止交易
  17.         # 低风险交易规则
  18.         print(f"交易通过:{transaction}")
  19.         return False  # 允许交易
复制代码
5. app.py(主应用程序)
  1. # app.py
  2. import faust
  3. import asyncio
  4. import json
  5. from models import Transaction, RiskTag
  6. from database.py import DatabaseService
  7. from redis_service import RedisService
  8. from rules import RiskEvaluator
  9. # 定义 Faust 应用
  10. app = faust.App(
  11.     'risk_control_app',
  12.     broker='kafka://localhost:9092',
  13.     value_serializer='raw',
  14. )
  15. # 定义 Kafka 主题
  16. transaction_topic = app.topic('transaction_topic')
  17. # 初始化服务
  18. redis_service = RedisService()
  19. database_service = DatabaseService()
  20. risk_evaluator = RiskEvaluator()
  21. @app.agent(transaction_topic)
  22. async def process_transaction(stream):
  23.     async for event in stream:
  24.         try:
  25.             # 解析交易数据
  26.             data = json.loads(event)
  27.             transaction = Transaction(
  28.                 transaction_id=data['transaction_id'],
  29.                 account_id=data['account_id'],
  30.                 amount=data['amount'],
  31.                 timestamp=data['timestamp']
  32.             )
  33.             # 从 Redis 获取风险标签
  34.             risk_tag = redis_service.get_risk_tag(transaction.account_id)
  35.             if not risk_tag:
  36.                 # 如果 Redis 中没有,从数据库获取并更新到 Redis
  37.                 risk_tag = database_service.get_risk_tag(transaction.account_id)
  38.                 if risk_tag:
  39.                     redis_service.set_risk_tag(risk_tag)
  40.                 else:
  41.                     # 如果数据库中也没有,设定默认风险标签
  42.                     risk_tag = RiskTag(transaction.account_id, 1)
  43.             # 使用规则引擎进行风险评估
  44.             is_risky = risk_evaluator.evaluate(transaction, risk_tag)
  45.             # 根据评估结果进行处理
  46.             if is_risky:
  47.                 print(f"交易 {transaction.transaction_id} 存在风险,执行阻止操作")
  48.                 # TODO: 将结果返回给支付业务系统,阻止交易
  49.             else:
  50.                 print(f"交易 {transaction.transaction_id} 安全,允许通过")
  51.                 # TODO: 将结果返回给支付业务系统,允许交易
  52.         except Exception as e:
  53.             print(f"处理交易时发生错误:{e}")
  54. if __name__ == '__main__':
  55.     app.main()
复制代码
注释:


  • 利用 Faust 定义 Kafka Streams 应用程序,处理 transaction_topic 中的消息。
  • 在 process_transaction 函数中,逐条处理买卖业务数据。
  • 从 Redis 获取风险标签,假如没有则从数据库获取并更新到 Redis。
  • 利用自定义的 RiskEvaluator 进行风险评估,根据评估结果实行相应的操作
6. producer.py(Kafka 生产者,发送测试数据)
  1. # producer.py
  2. from kafka import KafkaProducer
  3. import json
  4. import time
  5. producer = KafkaProducer(
  6.     bootstrap_servers='localhost:9092',
  7.     value_serializer=lambda v: json.dumps(v).encode('utf-8')
  8. )
  9. # 创建示例交易数据
  10. transaction_data = {
  11.     'transaction_id': 'tx1001',
  12.     'account_id': 'account123',
  13.     'amount': 12000.0,
  14.     'timestamp': time.time()
  15. }
  16. # 发送交易数据到 Kafka
  17. producer.send('transaction_topic', transaction_data)
  18. producer.flush()
  19. print(f"已发送交易数据:{transaction_data}")
  20. producer.close()
复制代码
运行示例

1. 启动须要的服务

注意事项



总结

上述示例提供了一个基本的 Python 程序框架,演示了如何将 Kafka、Faust、Redis、规则引擎和分布式数据库集成在一起,完成实时风控的基本功能。您可以根据详细的业务需求和技能环境,对程序进行扩展和优化。
扩展发起:



  • Redis:确保 Redis 服务在当地的 6379 端口运行
    1. redis-server
    复制代码
    Kafka:确保 Kafka 服务在当地的 9092 端口运行,并创建主题 transaction_topic。
    1. # 启动 Zookeeper
    2. zookeeper-server-start.sh config/zookeeper.properties
    3. # 启动 Kafka
    4. kafka-server-start.sh config/server.properties
    5. # 创建主题
    6. kafka-topics.sh --create --topic transaction_topic --bootstrap-server localhost:9092
    复制代码
    2. 运行应用程序
  • 启动风控系统(app.py):
    1. python app.py worker -l info
    复制代码
    运行 Kafka 生产者,发送买卖业务数据(producer.py):
    1. python producer.py
    复制代码
    3. 预期输出
    风控系统将处理买卖业务数据,利用规则引擎进行评估,并根据规则打印评估结果。例如:
    1. 检测到高风险交易:Transaction(transaction_id='tx1001', account_id='account123', amount=12000.0, timestamp=...)
    2. 交易 tx1001 存在风险,执行阻止操作
    复制代码
    说明
  • Faust:Python 的流式处理库,类似于 Kafka Streams,用于处理 Kafka 中的消息流。
  • 规则引擎:利用 Python 自定义规则评估逻辑,模仿 Drools 的功能。
  • Redis:作为缓存,存储风险标签,快速获取账户的风险级别。
  • 分布式数据库(SQLite 模仿):当 Redis 中没有风险标签时,从数据库获取,并更新到 Redis。
  • 风险标签:简单地利用风险级别(1-低风险,2-中风险,3-高风险)来表现。
  • 异常处理:在实际应用中,必要更美满的异常处理机制,防止因异常导致程序崩溃。
  • 引入异步 Redis 客户端:利用 aioredis 提拔 Redis 操作的性能。
  • 利用真正的分布式数据库:替换 SQLite,利用例如 PostgreSQL、MySQL 等数据库,并设置集群模式。
  • 美满规则引擎:利用现有的 Python 规则引擎库(如 durable_rules、experta)实现更复杂的规则逻辑。
  • 添加日记和监控:集成日记系统和监控工具,便于维护和故障排查。

    • 性能优化:对于高并发场景,必要考虑异步 I/O、毗连池等技能优化性能。
    • 设置管理:将硬编码的设置(如主机地址、端口、主题名)提取到设置文件或环境变量中,便于管理和修改。
    • 安全性:在生产环境中,注意保护敏感信息,确保数据传输和存储的安全。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

美食家大橙子

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表