ToB企服应用市场:ToB评测及商务社交产业平台

标题: Python 实现的风控系统(利用了kafka、Faust、模仿drools、redis、分布式数 [打印本页]

作者: 美食家大橙子    时间: 2024-12-26 01:12
标题: Python 实现的风控系统(利用了kafka、Faust、模仿drools、redis、分布式数
以下是一个利用 Python 实现的风控系统示例,涵盖以下技能组件:
我们将构建一个简单的风控系统,流程如下:

项目结构和依赖

1. 项目结构
  1. risk_control_demo/
  2. ├── app.py                      # 主应用程序
  3. ├── models.py                   # 数据模型定义
  4. ├── rules.py                    # 规则引擎逻辑
  5. ├── database.py                 # 数据库服务类
  6. ├── redis_service.py            # Redis 服务类
  7. ├── requirements.txt            # 项目依赖
  8. └── producer.py                 # Kafka 生产者,发送测试数据
复制代码
2. 项目依赖(requirements.txt)
  1. faust==1.10.4
  2. redis==4.5.5
  3. aiokafka==0.7.2
  4. sqlite3==0.0.1
复制代码
安装依赖
  1. pip install -r requirements.txt
复制代码
详细代码

1. models.py(数据模子定义)

  1. # models.py
  2. from dataclasses import dataclass
  3. @dataclass
  4. class Transaction:
  5.     transaction_id: str
  6.     account_id: str
  7.     amount: float
  8.     timestamp: float
  9. @dataclass
  10. class RiskTag:
  11.     account_id: str
  12.     risk_level: int  # 1-低风险, 2-中风险, 3-高风险
复制代码
2. database.py(数据库服务类)

  1. # database.py
  2. import sqlite3
  3. from models import RiskTag
  4. class DatabaseService:
  5.     def __init__(self):
  6.         # 连接 SQLite 数据库,内存模式
  7.         self.conn = sqlite3.connect(':memory:')
  8.         self.initialize_database()
  9.     def initialize_database(self):
  10.         cursor = self.conn.cursor()
  11.         # 创建风险标签表
  12.         cursor.execute('''
  13.             CREATE TABLE IF NOT EXISTS risk_tags (
  14.                 account_id TEXT PRIMARY KEY,
  15.                 risk_level INTEGER
  16.             )
  17.         ''')
  18.         # 插入示例数据
  19.         cursor.execute('''
  20.             INSERT INTO risk_tags (account_id, risk_level) VALUES ('account123', 2)
  21.         ''')
  22.         self.conn.commit()
  23.     def get_risk_tag(self, account_id):
  24.         cursor = self.conn.cursor()
  25.         cursor.execute('SELECT risk_level FROM risk_tags WHERE account_id = ?', (account_id,))
  26.         result = cursor.fetchone()
  27.         if result:
  28.             return RiskTag(account_id, result[0])
  29.         else:
  30.             return None
  31.     def close(self):
  32.         self.conn.close()
复制代码
3. redis_service.py(Redis 服务类)
  1. # redis_service.py
  2. import redis
  3. from models import RiskTag
  4. class RedisService:
  5.     def __init__(self, host='localhost', port=6379):
  6.         self.redis_client = redis.Redis(host=host, port=port, decode_responses=True)
  7.     def get_risk_tag(self, account_id):
  8.         risk_level = self.redis_client.get(f'risk:{account_id}')
  9.         if risk_level:
  10.             return RiskTag(account_id, int(risk_level))
  11.         return None
  12.     def set_risk_tag(self, risk_tag):
  13.         self.redis_client.set(f'risk:{risk_tag.account_id}', risk_tag.risk_level)
  14.     def close(self):
  15.         self.redis_client.close()
复制代码
 4. rules.py(规则引擎逻辑)
  1. # rules.py
  2. from models import Transaction, RiskTag
  3. class RiskEvaluator:
  4.     def evaluate(self, transaction: Transaction, risk_tag: RiskTag) -> bool:
  5.         """
  6.         返回 True 表示交易存在风险,需要阻止。
  7.         返回 False 表示交易安全,可以通过。
  8.         """
  9.         # 高风险交易规则
  10.         if transaction.amount > 10000 and risk_tag.risk_level == 3:
  11.             print(f"检测到高风险交易:{transaction}")
  12.             return True  # 阻止交易
  13.         # 中风险交易规则
  14.         if 5000 < transaction.amount <= 10000 and risk_tag.risk_level >= 2:
  15.             print(f"检测到中风险交易:{transaction}")
  16.             return True  # 阻止交易
  17.         # 低风险交易规则
  18.         print(f"交易通过:{transaction}")
  19.         return False  # 允许交易
复制代码
5. app.py(主应用程序)
  1. # app.py
  2. import faust
  3. import asyncio
  4. import json
  5. from models import Transaction, RiskTag
  6. from database.py import DatabaseService
  7. from redis_service import RedisService
  8. from rules import RiskEvaluator
  9. # 定义 Faust 应用
  10. app = faust.App(
  11.     'risk_control_app',
  12.     broker='kafka://localhost:9092',
  13.     value_serializer='raw',
  14. )
  15. # 定义 Kafka 主题
  16. transaction_topic = app.topic('transaction_topic')
  17. # 初始化服务
  18. redis_service = RedisService()
  19. database_service = DatabaseService()
  20. risk_evaluator = RiskEvaluator()
  21. @app.agent(transaction_topic)
  22. async def process_transaction(stream):
  23.     async for event in stream:
  24.         try:
  25.             # 解析交易数据
  26.             data = json.loads(event)
  27.             transaction = Transaction(
  28.                 transaction_id=data['transaction_id'],
  29.                 account_id=data['account_id'],
  30.                 amount=data['amount'],
  31.                 timestamp=data['timestamp']
  32.             )
  33.             # 从 Redis 获取风险标签
  34.             risk_tag = redis_service.get_risk_tag(transaction.account_id)
  35.             if not risk_tag:
  36.                 # 如果 Redis 中没有,从数据库获取并更新到 Redis
  37.                 risk_tag = database_service.get_risk_tag(transaction.account_id)
  38.                 if risk_tag:
  39.                     redis_service.set_risk_tag(risk_tag)
  40.                 else:
  41.                     # 如果数据库中也没有,设定默认风险标签
  42.                     risk_tag = RiskTag(transaction.account_id, 1)
  43.             # 使用规则引擎进行风险评估
  44.             is_risky = risk_evaluator.evaluate(transaction, risk_tag)
  45.             # 根据评估结果进行处理
  46.             if is_risky:
  47.                 print(f"交易 {transaction.transaction_id} 存在风险,执行阻止操作")
  48.                 # TODO: 将结果返回给支付业务系统,阻止交易
  49.             else:
  50.                 print(f"交易 {transaction.transaction_id} 安全,允许通过")
  51.                 # TODO: 将结果返回给支付业务系统,允许交易
  52.         except Exception as e:
  53.             print(f"处理交易时发生错误:{e}")
  54. if __name__ == '__main__':
  55.     app.main()
复制代码
注释:

6. producer.py(Kafka 生产者,发送测试数据)
  1. # producer.py
  2. from kafka import KafkaProducer
  3. import json
  4. import time
  5. producer = KafkaProducer(
  6.     bootstrap_servers='localhost:9092',
  7.     value_serializer=lambda v: json.dumps(v).encode('utf-8')
  8. )
  9. # 创建示例交易数据
  10. transaction_data = {
  11.     'transaction_id': 'tx1001',
  12.     'account_id': 'account123',
  13.     'amount': 12000.0,
  14.     'timestamp': time.time()
  15. }
  16. # 发送交易数据到 Kafka
  17. producer.send('transaction_topic', transaction_data)
  18. producer.flush()
  19. print(f"已发送交易数据:{transaction_data}")
  20. producer.close()
复制代码
运行示例

1. 启动须要的服务

注意事项



总结

上述示例提供了一个基本的 Python 程序框架,演示了如何将 Kafka、Faust、Redis、规则引擎和分布式数据库集成在一起,完成实时风控的基本功能。您可以根据详细的业务需求和技能环境,对程序进行扩展和优化。
扩展发起:



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4