IT评测·应用市场-qidao123.com技术社区

标题: Azure Delta Lake、Databricks和Event Hubs实现实时欺诈检测 [打印本页]

作者: 悠扬随风    时间: 2025-3-25 21:09
标题: Azure Delta Lake、Databricks和Event Hubs实现实时欺诈检测
设计Azure云架构方案实现Azure Delta Lake和Azure Databricks,结合 Azure Event Hubs/Kafka 摄入实时数据,通过 Delta Lake 实现 Exactly-Once 语义,实时欺诈检测(流数据写入 Delta Lake,批处理模型实时更新),以及具体实现的具体步调和关键PySpark代码。
完备实现代码需要根据具体数据格式和业务规则举行调解,建议通过Databricks Repos举行CI/CD管理。
一、架构设计

二、关键实现步调

1. 情况准备

  1. # 创建Azure资源
  2. az eventhubs namespace create --name fraud-detection-eh --resource-group myRG --location eastus
  3. az storage account create --name deltalakedemo --resource-group myRG --location eastus
复制代码
2. 实时数据摄入(PySpark)

  1. from pyspark.sql.streaming import StreamingQuery
  2. event_hub_conf = {
  3.   "eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("<CONNECTION_STRING>")
  4. }
  5. raw_stream = (spark
  6.   .readStream
  7.   .format("eventhubs")
  8.   .options(**event_hub_conf)
  9.   .load())
  10. # Schema示例
  11. from pyspark.sql.types import *
  12. transaction_schema = StructType([
  13.   StructField("transaction_id", StringType()),
  14.   StructField("user_id", StringType()),
  15.   StructField("amount", DoubleType()),
  16.   StructField("timestamp", TimestampType()),
  17.   StructField("location", StringType())
  18. ])
  19. parsed_stream = raw_stream.select(
  20.   from_json(col("body").cast("string"), transaction_schema).alias("data")
  21. ).select("data.*")
复制代码
3. Exactly-Once实现

  1. delta_path = "abfss://delta@deltalakedemo.dfs.core.windows.net/transactions"
  2. checkpoint_path = "/delta/checkpoints/fraud_detection"
  3. (parsed_stream.writeStream
  4.   .format("delta")
  5.   .outputMode("append")
  6.   .option("checkpointLocation", checkpoint_path)
  7.   .trigger(processingTime="10 seconds")
  8.   .start(delta_path))
复制代码
4. 实时欺诈检测

  1. from pyspark.ml import PipelineModel
  2. # 加载预训练模型
  3. model = PipelineModel.load("dbfs:/models/fraud_detection/v1")
  4. def predict_batch(df, epoch_id):
  5.     # 去重处理
  6.     df = df.dropDuplicates(["transaction_id"])
  7.     # 特征工程
  8.     df = feature_engineering(df)
  9.     # 模型预测
  10.     predictions = model.transform(df)
  11.     # 写入警报表
  12.     (predictions.filter(col("prediction") == 1)
  13.      .write
  14.      .format("delta")
  15.      .mode("append")
  16.      .saveAsTable("fraud_alerts"))
  17.     return df
  18. streaming_query = (parsed_stream
  19.   .writeStream
  20.   .foreachBatch(predict_batch)
  21.   .trigger(processingTime="30 seconds")
  22.   .start())
复制代码
5. 模型更新(批处理)

  1. from pyspark.ml.pipeline import Pipeline
  2. from pyspark.ml.classification import GBTClassifier
  3. from pyspark.ml.feature import VectorAssembler
  4. def retrain_model():
  5.     # 读取增量数据
  6.     latest_data = spark.read.format("delta").load(delta_path)
  7.     # 特征工程
  8.     train_df = feature_engineering(latest_data)
  9.     # 定义模型
  10.     assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
  11.     gbt = GBTClassifier(maxIter=10)
  12.     pipeline = Pipeline(stages=[assembler, gbt])
  13.     # 训练
  14.     model = pipeline.fit(train_df)
  15.     # 版本控制
  16.     model.write().overwrite().save("dbfs:/models/fraud_detection/v2")
  17.     # 注册到MLflow
  18.     mlflow.spark.log_model(model, "fraud_detection", registered_model_name="Fraud_GBT")
  19. # 每天调度执行
  20. spark.sparkContext.addPyFile("retrain.py")
  21. dbutils.library.restartPython()
复制代码
6. 动态模型加载(流处理加强)

  1. model_version = 1  # 初始版本
  2. def predict_batch(df, epoch_id):
  3.     global model_version
  4.     try:
  5.         # 检查模型更新
  6.         latest_model = get_latest_model_version()
  7.         if latest_model > model_version:
  8.             model = PipelineModel.load(f"dbfs:/models/fraud_detection/v{latest_model}")
  9.             model_version = latest_model
  10.     except:
  11.         pass
  12.    
  13.     # 剩余预测逻辑保持不变
复制代码
三、关键技术点

  1. display(streaming_query.lastProgress)
复制代码
四、部署建议

五、扩展建议


该方案特点:

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4