ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink-基于 DataStream API 实现欺诈检测 [打印本页]

作者: 涛声依旧在    时间: 2022-8-20 18:37
标题: Flink-基于 DataStream API 实现欺诈检测
案例来源于 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/
案例背景

在当今数字时代,信用卡欺诈行为越来越被重视。 罪犯可以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。 用盗得的信用卡进行很小额度的例如一美元或者更小额度的消费进行测试。 如果测试消费成功,那么他们就会用这个信用卡进行大笔消费,来购买一些他们希望得到的,或者可以倒卖的财物。
在这个教程中,你将会建立一个针对可疑信用卡交易行为的反欺诈检测系统。 通过使用一组简单的规则,你将了解到 Flink 如何为我们实现复杂业务逻辑并实时执行。
欺诈检测规则

对原有案例进行改造
  1. 1. 数据源使用Kafka,发送json格式字符串
  2. 消息格式:  {"accountId":1001, "timestamp":1656490723171, "amount":0.12}
  3. 2. 自定义 DeserializationSchema, 直接将kafka的json字符串转成POJO对象
复制代码
流程图


核心代码

  1. public class TransactionDeserialization implements DeserializationSchema<Transaction> {
  2.     @Override
  3.     public Transaction deserialize(byte[] bytes) throws IOException {
  4.         ByteBuffer buffer = ByteBuffer.wrap(bytes);
  5.         String message = byteBufferToString(buffer);
  6.         if (StringUtils.isBlank(message)) {
  7.             return null;
  8.         }
  9.         Transaction transaction = JsonUtils.fromJson(message, Transaction.class);
  10.         return transaction;
  11.     }
  12.     @Override
  13.     public boolean isEndOfStream(Transaction transaction) {
  14.         return false;
  15.     }
  16.     @Override
  17.     public TypeInformation<Transaction> getProducedType() {
  18.         return TypeInformation.of(Transaction.class);
  19.     }
  20.     /**
  21.      * ByteBuffer 转换 String
  22.      * @param buffer
  23.      * @return
  24.      */
  25.     private String byteBufferToString(ByteBuffer buffer) {
  26.         String ret = "";
  27.         try{
  28.             CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
  29.             CharBuffer charBuffer = decoder.decode(buffer.asReadOnlyBuffer());;
  30.             ret = charBuffer.toString();
  31.         }catch (Exception e) {
  32.             e.printStackTrace();
  33.         }
  34.         return ret;
  35.     }
  36. }
复制代码
  1. public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
  2.     /**
  3.      * 定义小金额边界
  4.      */
  5.     private static final double SMALL_AMOUNT = 1.00;
  6.     /**
  7.      * 定义大金额边界
  8.      */
  9.     private static final double LARGE_AMOUNT = 500.00;
  10.     /**
  11.      * 1分钟时间
  12.      */
  13.     private static final long ONE_MINUTE = 60 * 1000;
  14.     /**
  15.      * 保存是否有消费小金额的状态
  16.      */
  17.     private transient ValueState<Boolean> smallAmountState;
  18.     /**
  19.      * 定时器状态
  20.      */
  21.     private transient ValueState<Long> timerState;
  22.     @Override
  23.     public void open(Configuration parameters) throws Exception {
  24.         // 初始化ValueState
  25.         ValueStateDescriptor<Boolean> smallAmountStateDescriptor = new ValueStateDescriptor<Boolean>("small-amount-state", Types.BOOLEAN);
  26.         smallAmountState = getRuntimeContext().getState(smallAmountStateDescriptor);
  27.         ValueStateDescriptor<Long> timerStateDescriptor = new ValueStateDescriptor<Long>("timer-state", Types.LONG);
  28.         timerState = getRuntimeContext().getState(timerStateDescriptor);
  29.     }
  30.     @Override
  31.     public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
  32.         if (Objects.isNull(transaction)) {
  33.             return;
  34.         }
  35.         // Get the current state for the current key
  36.         Boolean lastTransactionWasSmall = smallAmountState.value();
  37.         // Check if the flag is set
  38.         if (Objects.nonNull(lastTransactionWasSmall)) {
  39.             if (transaction.getAmount() > LARGE_AMOUNT) {
  40.                 Alert alert = new Alert();
  41.                 alert.setAccountId(transaction.getAccountId());
  42.                 alert.setAmount(transaction.getAmount());
  43.                 collector.collect(alert);
  44.             }
  45.             clearUp(context);
  46.         }
  47.         if (transaction.getAmount() < SMALL_AMOUNT) {
  48.             // set the flag to true
  49.             smallAmountState.update(true);
  50.             // 注册定时器,设置一个当前时间一分钟后触发的定时器,同时,将触发时间保存到 timerState 状态中。
  51.             long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
  52.             context.timerService().registerProcessingTimeTimer(timer);
  53.             timerState.update(timer);
  54.         }
  55.     }
  56.     @Override
  57.     public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
  58.         // remove flag after 1 minute
  59.         timerState.clear();
  60.         smallAmountState.clear();
  61.     }
  62.     private void clearUp(Context ctx) {
  63.         try {
  64.             // delete timer
  65.             Long timer = timerState.value();
  66.             ctx.timerService().deleteProcessingTimeTimer(timer);
  67.             timerState.clear();
  68.             smallAmountState.clear();
  69.         } catch (IOException e) {
  70.             e.printStackTrace();
  71.         }
  72.     }
  73. }
复制代码
  1. public class FraudDetectionJob {
  2.     public static void main(String[] args) throws Exception {
  3.         // 初始化环境
  4.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         // kafka消息格式: {"accountId":1001, "timestamp":1656490723171, "amount":0.12}
  6.         // 定义Kafka数据源
  7.         KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
  8.                 .setBootstrapServers("192.168.0.192:9092")
  9.                 .setTopics("TOPIC_FRAUD_DETECTION")
  10.                 .setGroupId("TEST_GROUP")
  11.                 .setStartingOffsets(OffsetsInitializer.latest())
  12.                 .setValueOnlyDeserializer(new TransactionDeserialization())
  13.                 .build();
  14.         // 加载数据源
  15.         DataStreamSource<Transaction> fraudDetectionSource
  16.                 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "FraudDetection-Source");
  17.         // 处理数据
  18.         SingleOutputStreamOperator<Alert> alertStreamOperator = fraudDetectionSource.keyBy(Transaction::getAccountId)
  19.                 .process(new FraudDetector())
  20.                 .name("Fraud-Detector");
  21.         // 输出告警结果
  22.         alertStreamOperator.addSink(new AlertSink())
  23.                 .name("Send-Alerts");
  24.         env.execute("Fraud Detection");
  25.     }
  26. }
复制代码
执行效果

完整代码

https://github.com/Mr-LuXiaoHua/study-flink
  1. 代码入口: com.example.datastream.frauddetection.FraudDetectionJob
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4