ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink-出租车-车程事件流和付车费事件流connect [打印本页]

作者: 立聪堂德州十三局店    时间: 2022-8-20 23:36
标题: Flink-出租车-车程事件流和付车费事件流connect
案例来源: https://github.com/apache/flink-training/blob/release-1.14/README_zh.md
案例背景

出租车车程(taxi ride)事件结构
  1. 1.每次车程都由两个事件表示:行程开始(trip start)和行程结束(trip end)。
  2. 2.每个事件都由十一个字段组成:
  3. rideId         : Long      // 每次车程的唯一id
  4. taxiId         : Long      // 每一辆出租车的唯一id
  5. driverId       : Long      // 每一位司机的唯一id
  6. isStart        : Boolean   // 行程开始事件为 TRUE, 行程结束事件为 FALSE
  7. eventTime      : Long      // 事件的时间戳
  8. startLon       : Float     // 车程开始位置的经度
  9. startLat       : Float     // 车程开始位置的维度
  10. endLon         : Float     // 车程结束位置的经度
  11. endLat         : Float     // 车程结束位置的维度
  12. passengerCnt   : Short     // 乘车人数
复制代码
出租车车费(taxi fare)事件结构
  1. rideId         : Long      // 每次车程的唯一id
  2. taxiId         : Long      // 每一辆出租车的唯一id
  3. driverId       : Long      // 每一位司机的唯一id
  4. startTime      : Long   // 车程开始时间
  5. paymentType    : String    // 现金(CASH)或刷卡(CARD)
  6. tip            : Float     // 小费
  7. tolls          : Float     // 过路费
  8. totalFare      : Float     // 总计车费
复制代码
案例目标
  1. 1.将每次车程的 TaxiRide 和 TaxiFare 记录连接在一起
  2. 2.对于每个不同的 rideId,恰好有三个事件:
  3. TaxiRide START 事件
  4. TaxiRide END 事件
  5. 一个 TaxiFare 事件(其时间戳恰好与开始时间匹配)
  6. 最终的结果应该是 DataStream<RideAndFare>,每个不同的 rideId 都产生一个 RideAndFare 记录。 每个 RideAndFare 都应该将某个 rideId 的 TaxiRide START 事件与其匹配的 TaxiFare 配对。
复制代码
案例流程


核心代码

  1.        // 从车程事件中过滤中车程开始时间,并按车程标识 rideId 分组
  2.         KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source")
  3.                 .filter(ride -> ride.getStart()).keyBy(TaxiRide::getRideId);
  4.         // 付车费事件按行程标识 rideId 分组
  5.         KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source")
  6.                 .keyBy(TaxiFare::getRideId);
  7.         rideStream.connect(fareStream).flatMap(new EnrichmentFunction())
  8.                 .uid("enrichment") // uid for this operator's state
  9.                 .name("enrichment") // name for this operator in the web UI
  10.                 .addSink(new PrintSinkFunction<>());
复制代码
  1. public class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {
  2.     private ValueState<TaxiRide> taxiRideValueState;
  3.     private ValueState<TaxiFare> taxiFareValueState;
  4.     @Override
  5.     public void open(Configuration parameters) throws Exception {
  6.         ValueStateDescriptor<TaxiRide> taxiRideDescriptor = new ValueStateDescriptor<TaxiRide>("save-ride", TaxiRide.class);
  7.         ValueStateDescriptor<TaxiFare> taxiFareDescriptor = new ValueStateDescriptor<TaxiFare>("save-fare", TaxiFare.class);
  8.         taxiRideValueState = getRuntimeContext().getState(taxiRideDescriptor);
  9.         taxiFareValueState = getRuntimeContext().getState(taxiFareDescriptor);
  10.     }
  11.     /**
  12.      * 当车程事件到来,检查车费的taxiFareValueState是否保存有对应行程付费记录
  13.      * 如果有,则匹配输出,清空状态
  14.      * 如果没有,则将车程事件保存起来
  15.      */
  16.     @Override
  17.     public void flatMap1(TaxiRide taxiRide, Collector<RideAndFare> collector) throws Exception {
  18.         TaxiFare taxiFare = taxiFareValueState.value();
  19.         if (Objects.isNull(taxiFare)) {
  20.             taxiRideValueState.update(taxiRide);
  21.         } else {
  22.             taxiFareValueState.clear();
  23.             RideAndFare rideAndFare = new RideAndFare();
  24.             rideAndFare.setRide(taxiRide);
  25.             rideAndFare.setFare(taxiFare);
  26.             collector.collect(rideAndFare);
  27.         }
  28.     }
  29.     /**
  30.      * 当付费事件到来,检查车程的taxiRideValueState是否保存有对应行程车程记录
  31.      * 如果有,则匹配输出,清空状态
  32.      * 如果没有,则将付费事件保存起来
  33.      */
  34.     @Override
  35.     public void flatMap2(TaxiFare taxiFare, Collector<RideAndFare> collector) throws Exception {
  36.         TaxiRide taxiRide = taxiRideValueState.value();
  37.         if (Objects.isNull(taxiRide)) {
  38.             taxiFareValueState.update(taxiFare);
  39.         } else {
  40.             taxiRideValueState.clear();
  41.             RideAndFare rideAndFare = new RideAndFare();
  42.             rideAndFare.setRide(taxiRide);
  43.             rideAndFare.setFare(taxiFare);
  44.             collector.collect(rideAndFare);
  45.         }
  46.     }
  47. }
复制代码
  1.        // 定义出租车-车程数据源
  2.         KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
  3.                 .setBootstrapServers("192.168.0.192:9092")
  4.                 .setTopics("TOPIC_RIDE")
  5.                 .setGroupId("TEST_GROUP")
  6.                 .setClientIdPrefix("ride") // 避免kafka clientId重复
  7.                 .setStartingOffsets(OffsetsInitializer.latest())
  8.                 .setValueOnlyDeserializer(new TaxiRideDeserialization())
  9.                 .build();
  10.         // 定义出租车-车费数据源
  11.         KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
  12.                 .setBootstrapServers("192.168.0.192:9092")
  13.                 .setTopics("TOPIC_FARE")
  14.                 .setGroupId("TEST_GROUP")
  15.                 .setClientIdPrefix("fare") // 避免kafka clientId重复
  16.                 .setStartingOffsets(OffsetsInitializer.latest())
  17.                 .setValueOnlyDeserializer(new TaxiFareDeserialization())
  18.                 .build();
复制代码
事件格式:
  1. 1.车程事件: {"rideId":10086, "taxiId":1, "driverId":2, "isStart":true, "eventTime":1656571391726, "startLon":113.273031, "startLat":23.147103, "endLon":113.268245, "endLat":23.14445, "passengerCnt":1}
  2. 2.付费事件: {"rideId":10086, "taxiId":1, "driverId":2, "startTime":1656571391726, "paymentType":"CASH", "tip":0.00, "tolls":10.00, "totalFare":110.00}
复制代码
完整代码

https://github.com/Mr-LuXiaoHua/study-flink
  1. 程序入口: com.example.datastream.rideandfare.RideAndFareJob
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4