Flink-基于 DataStream API 实现欺诈检测

打印 上一主题 下一主题

主题 630|帖子 630|积分 1890

案例来源于 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/
案例背景

在当今数字时代,信用卡欺诈行为越来越被重视。 罪犯可以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。 用盗得的信用卡进行很小额度的例如一美元或者更小额度的消费进行测试。 如果测试消费成功,那么他们就会用这个信用卡进行大笔消费,来购买一些他们希望得到的,或者可以倒卖的财物。
在这个教程中,你将会建立一个针对可疑信用卡交易行为的反欺诈检测系统。 通过使用一组简单的规则,你将了解到 Flink 如何为我们实现复杂业务逻辑并实时执行。
欺诈检测规则


  • 对于一个账户,如果出现一笔小于1元的交易后, 紧跟着在1分钟内又出现一笔大于500元的交易,则认为该账户属于欺诈,就输出一个报警消息。
  • 图说明如下

对原有案例进行改造
  1. 1. 数据源使用Kafka,发送json格式字符串
  2. 消息格式:  {"accountId":1001, "timestamp":1656490723171, "amount":0.12}
  3. 2. 自定义 DeserializationSchema, 直接将kafka的json字符串转成POJO对象
复制代码
流程图


核心代码


  • 自定义DeserializationSchema
  1. public class TransactionDeserialization implements DeserializationSchema<Transaction> {
  2.     @Override
  3.     public Transaction deserialize(byte[] bytes) throws IOException {
  4.         ByteBuffer buffer = ByteBuffer.wrap(bytes);
  5.         String message = byteBufferToString(buffer);
  6.         if (StringUtils.isBlank(message)) {
  7.             return null;
  8.         }
  9.         Transaction transaction = JsonUtils.fromJson(message, Transaction.class);
  10.         return transaction;
  11.     }
  12.     @Override
  13.     public boolean isEndOfStream(Transaction transaction) {
  14.         return false;
  15.     }
  16.     @Override
  17.     public TypeInformation<Transaction> getProducedType() {
  18.         return TypeInformation.of(Transaction.class);
  19.     }
  20.     /**
  21.      * ByteBuffer 转换 String
  22.      * @param buffer
  23.      * @return
  24.      */
  25.     private String byteBufferToString(ByteBuffer buffer) {
  26.         String ret = "";
  27.         try{
  28.             CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
  29.             CharBuffer charBuffer = decoder.decode(buffer.asReadOnlyBuffer());;
  30.             ret = charBuffer.toString();
  31.         }catch (Exception e) {
  32.             e.printStackTrace();
  33.         }
  34.         return ret;
  35.     }
  36. }
复制代码

  • 欺诈检测核心代码
  1. public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
  2.     /**
  3.      * 定义小金额边界
  4.      */
  5.     private static final double SMALL_AMOUNT = 1.00;
  6.     /**
  7.      * 定义大金额边界
  8.      */
  9.     private static final double LARGE_AMOUNT = 500.00;
  10.     /**
  11.      * 1分钟时间
  12.      */
  13.     private static final long ONE_MINUTE = 60 * 1000;
  14.     /**
  15.      * 保存是否有消费小金额的状态
  16.      */
  17.     private transient ValueState<Boolean> smallAmountState;
  18.     /**
  19.      * 定时器状态
  20.      */
  21.     private transient ValueState<Long> timerState;
  22.     @Override
  23.     public void open(Configuration parameters) throws Exception {
  24.         // 初始化ValueState
  25.         ValueStateDescriptor<Boolean> smallAmountStateDescriptor = new ValueStateDescriptor<Boolean>("small-amount-state", Types.BOOLEAN);
  26.         smallAmountState = getRuntimeContext().getState(smallAmountStateDescriptor);
  27.         ValueStateDescriptor<Long> timerStateDescriptor = new ValueStateDescriptor<Long>("timer-state", Types.LONG);
  28.         timerState = getRuntimeContext().getState(timerStateDescriptor);
  29.     }
  30.     @Override
  31.     public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
  32.         if (Objects.isNull(transaction)) {
  33.             return;
  34.         }
  35.         // Get the current state for the current key
  36.         Boolean lastTransactionWasSmall = smallAmountState.value();
  37.         // Check if the flag is set
  38.         if (Objects.nonNull(lastTransactionWasSmall)) {
  39.             if (transaction.getAmount() > LARGE_AMOUNT) {
  40.                 Alert alert = new Alert();
  41.                 alert.setAccountId(transaction.getAccountId());
  42.                 alert.setAmount(transaction.getAmount());
  43.                 collector.collect(alert);
  44.             }
  45.             clearUp(context);
  46.         }
  47.         if (transaction.getAmount() < SMALL_AMOUNT) {
  48.             // set the flag to true
  49.             smallAmountState.update(true);
  50.             // 注册定时器,设置一个当前时间一分钟后触发的定时器,同时,将触发时间保存到 timerState 状态中。
  51.             long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
  52.             context.timerService().registerProcessingTimeTimer(timer);
  53.             timerState.update(timer);
  54.         }
  55.     }
  56.     @Override
  57.     public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
  58.         // remove flag after 1 minute
  59.         timerState.clear();
  60.         smallAmountState.clear();
  61.     }
  62.     private void clearUp(Context ctx) {
  63.         try {
  64.             // delete timer
  65.             Long timer = timerState.value();
  66.             ctx.timerService().deleteProcessingTimeTimer(timer);
  67.             timerState.clear();
  68.             smallAmountState.clear();
  69.         } catch (IOException e) {
  70.             e.printStackTrace();
  71.         }
  72.     }
  73. }
复制代码

  • FLink Job 启动类
  1. public class FraudDetectionJob {
  2.     public static void main(String[] args) throws Exception {
  3.         // 初始化环境
  4.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         // kafka消息格式: {"accountId":1001, "timestamp":1656490723171, "amount":0.12}
  6.         // 定义Kafka数据源
  7.         KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
  8.                 .setBootstrapServers("192.168.0.192:9092")
  9.                 .setTopics("TOPIC_FRAUD_DETECTION")
  10.                 .setGroupId("TEST_GROUP")
  11.                 .setStartingOffsets(OffsetsInitializer.latest())
  12.                 .setValueOnlyDeserializer(new TransactionDeserialization())
  13.                 .build();
  14.         // 加载数据源
  15.         DataStreamSource<Transaction> fraudDetectionSource
  16.                 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "FraudDetection-Source");
  17.         // 处理数据
  18.         SingleOutputStreamOperator<Alert> alertStreamOperator = fraudDetectionSource.keyBy(Transaction::getAccountId)
  19.                 .process(new FraudDetector())
  20.                 .name("Fraud-Detector");
  21.         // 输出告警结果
  22.         alertStreamOperator.addSink(new AlertSink())
  23.                 .name("Send-Alerts");
  24.         env.execute("Fraud Detection");
  25.     }
  26. }
复制代码
执行效果


  • kafka输入

  • 告警结果

完整代码

https://github.com/Mr-LuXiaoHua/study-flink
  1. 代码入口: com.example.datastream.frauddetection.FraudDetectionJob
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

涛声依旧在

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表