案例来源: https://github.com/apache/flink-training/blob/release-1.14/README_zh.md
案例背景
出租车车程(taxi ride)事件结构
- 1.每次车程都由两个事件表示:行程开始(trip start)和行程结束(trip end)。
- 2.每个事件都由十一个字段组成:
- rideId : Long // 每次车程的唯一id
- taxiId : Long // 每一辆出租车的唯一id
- driverId : Long // 每一位司机的唯一id
- isStart : Boolean // 行程开始事件为 TRUE, 行程结束事件为 FALSE
- eventTime : Long // 事件的时间戳
- startLon : Float // 车程开始位置的经度
- startLat : Float // 车程开始位置的维度
- endLon : Float // 车程结束位置的经度
- endLat : Float // 车程结束位置的维度
- passengerCnt : Short // 乘车人数
复制代码 出租车车费(taxi fare)事件结构
- rideId : Long // 每次车程的唯一id
- taxiId : Long // 每一辆出租车的唯一id
- driverId : Long // 每一位司机的唯一id
- startTime : Long // 车程开始时间
- paymentType : String // 现金(CASH)或刷卡(CARD)
- tip : Float // 小费
- tolls : Float // 过路费
- totalFare : Float // 总计车费
复制代码 案例目标
- 1.将每次车程的 TaxiRide 和 TaxiFare 记录连接在一起
- 2.对于每个不同的 rideId,恰好有三个事件:
- TaxiRide START 事件
- TaxiRide END 事件
- 一个 TaxiFare 事件(其时间戳恰好与开始时间匹配)
- 最终的结果应该是 DataStream<RideAndFare>,每个不同的 rideId 都产生一个 RideAndFare 记录。 每个 RideAndFare 都应该将某个 rideId 的 TaxiRide START 事件与其匹配的 TaxiFare 配对。
复制代码 案例流程

核心代码
- connect 可以将两个流连接成一个ConnectedStreams, 而且不要求两个流的数据类型一致
- // 从车程事件中过滤中车程开始时间,并按车程标识 rideId 分组
- KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source")
- .filter(ride -> ride.getStart()).keyBy(TaxiRide::getRideId);
- // 付车费事件按行程标识 rideId 分组
- KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source")
- .keyBy(TaxiFare::getRideId);
- rideStream.connect(fareStream).flatMap(new EnrichmentFunction())
- .uid("enrichment") // uid for this operator's state
- .name("enrichment") // name for this operator in the web UI
- .addSink(new PrintSinkFunction<>());
复制代码- public class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {
- private ValueState<TaxiRide> taxiRideValueState;
- private ValueState<TaxiFare> taxiFareValueState;
- @Override
- public void open(Configuration parameters) throws Exception {
- ValueStateDescriptor<TaxiRide> taxiRideDescriptor = new ValueStateDescriptor<TaxiRide>("save-ride", TaxiRide.class);
- ValueStateDescriptor<TaxiFare> taxiFareDescriptor = new ValueStateDescriptor<TaxiFare>("save-fare", TaxiFare.class);
- taxiRideValueState = getRuntimeContext().getState(taxiRideDescriptor);
- taxiFareValueState = getRuntimeContext().getState(taxiFareDescriptor);
- }
- /**
- * 当车程事件到来,检查车费的taxiFareValueState是否保存有对应行程付费记录
- * 如果有,则匹配输出,清空状态
- * 如果没有,则将车程事件保存起来
- */
- @Override
- public void flatMap1(TaxiRide taxiRide, Collector<RideAndFare> collector) throws Exception {
- TaxiFare taxiFare = taxiFareValueState.value();
- if (Objects.isNull(taxiFare)) {
- taxiRideValueState.update(taxiRide);
- } else {
- taxiFareValueState.clear();
- RideAndFare rideAndFare = new RideAndFare();
- rideAndFare.setRide(taxiRide);
- rideAndFare.setFare(taxiFare);
- collector.collect(rideAndFare);
- }
- }
- /**
- * 当付费事件到来,检查车程的taxiRideValueState是否保存有对应行程车程记录
- * 如果有,则匹配输出,清空状态
- * 如果没有,则将付费事件保存起来
- */
- @Override
- public void flatMap2(TaxiFare taxiFare, Collector<RideAndFare> collector) throws Exception {
- TaxiRide taxiRide = taxiRideValueState.value();
- if (Objects.isNull(taxiRide)) {
- taxiFareValueState.update(taxiFare);
- } else {
- taxiRideValueState.clear();
- RideAndFare rideAndFare = new RideAndFare();
- rideAndFare.setRide(taxiRide);
- rideAndFare.setFare(taxiFare);
- collector.collect(rideAndFare);
- }
- }
- }
复制代码- // 定义出租车-车程数据源
- KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
- .setBootstrapServers("192.168.0.192:9092")
- .setTopics("TOPIC_RIDE")
- .setGroupId("TEST_GROUP")
- .setClientIdPrefix("ride") // 避免kafka clientId重复
- .setStartingOffsets(OffsetsInitializer.latest())
- .setValueOnlyDeserializer(new TaxiRideDeserialization())
- .build();
- // 定义出租车-车费数据源
- KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
- .setBootstrapServers("192.168.0.192:9092")
- .setTopics("TOPIC_FARE")
- .setGroupId("TEST_GROUP")
- .setClientIdPrefix("fare") // 避免kafka clientId重复
- .setStartingOffsets(OffsetsInitializer.latest())
- .setValueOnlyDeserializer(new TaxiFareDeserialization())
- .build();
复制代码 事件格式:
- 1.车程事件: {"rideId":10086, "taxiId":1, "driverId":2, "isStart":true, "eventTime":1656571391726, "startLon":113.273031, "startLat":23.147103, "endLon":113.268245, "endLat":23.14445, "passengerCnt":1}
- 2.付费事件: {"rideId":10086, "taxiId":1, "driverId":2, "startTime":1656571391726, "paymentType":"CASH", "tip":0.00, "tolls":10.00, "totalFare":110.00}
复制代码 完整代码
https://github.com/Mr-LuXiaoHua/study-flink- 程序入口: com.example.datastream.rideandfare.RideAndFareJob
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |