flink学习(14)—— 双流join

打印 上一主题 下一主题

主题 769|帖子 769|积分 2307

概述

Join:内连接
CoGroup:内连接,左连接,右连接
Interval Join:点对面
Join

  1. 1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。
  2. 2、Join 可以支持处理时间(processing time)和事件时间(event time)两种时间特征。
  3. 3、Join 通用用法如下:
  4.     stream.join(otherStream)
  5.         .where(<KeySelector>)
  6.         .equalTo(<KeySelector>)
  7.         .window(<WindowAssigner>)
  8.         .apply(<JoinFunction>)
复制代码
滚动窗口

  1. package com.bigdata.day07;
  2. import org.apache.commons.lang3.time.DateUtils;
  3. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.api.common.functions.JoinFunction;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. import org.apache.flink.api.java.functions.KeySelector;
  8. import org.apache.flink.api.java.tuple.Tuple3;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  14. import org.apache.flink.streaming.api.windowing.time.Time;
  15. import java.text.ParseException;
  16. import java.time.Duration;
  17. import java.util.Date;
  18. /**
  19. * 内连接
  20. * 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2
  21. */
  22. public class _01_双流join_join_内连接 {
  23.     public static void main(String[] args) throws Exception {
  24.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25.         env.setParallelism(1);
  26.         
  27.         // 绿色的流
  28.         DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
  29.         SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  30.             @Override
  31.             public Tuple3<String, Integer, String> map(String line) throws Exception {
  32.                 String[] split = line.split(",");
  33.                 return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
  34.             }
  35.         }).assignTimestampsAndWatermarks(
  36.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  37.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  38.                             @Override
  39.                             public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
  40.                                 
  41.                                 String timeStr = tuple3.f2;
  42.                                 try {
  43.                                     Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
  44.                                     return date.getTime();
  45.                                 } catch (ParseException e) {
  46.                                     throw new RuntimeException(e);
  47.                                 }
  48.                             }
  49.                         }));
  50.         
  51.         // 红色的流
  52.         DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);
  53.         SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  54.             @Override
  55.             public Tuple3<String, Integer, String> map(String line) throws Exception {
  56.                 String[] split = line.split(",");
  57.                 return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
  58.             }
  59.         }).assignTimestampsAndWatermarks(
  60.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  61.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  62.                             @Override
  63.                             public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
  64.                               
  65.                                 String timeStr = tuple3.f2;
  66.                                 try {
  67.                                     Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
  68.                                     return date.getTime();
  69.                                 } catch (ParseException e) {
  70.                                     throw new RuntimeException(e);
  71.                                 }
  72.                             }
  73.                         }));
  74.                         
  75.         // 双流join
  76.         DataStream<Tuple3<String, Integer, Integer>> rsSource = greenSource.join(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {
  77.             @Override
  78.             public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
  79.                 return tuple3.f0;
  80.             }
  81.         }).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {
  82.             @Override
  83.             public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
  84.                 return tuple3.f0;
  85.             }
  86.             // 滚动窗口
  87.         }).window(TumblingEventTimeWindows.of(Time.seconds(5)))
  88.                 .apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {
  89.             @Override
  90.             public Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {
  91.                 return Tuple3.of(first.f0, first.f1, second.f1);
  92.             }
  93.         });
  94.         redSource.print("红色的流:");
  95.         greenSource.print("绿色的流:");
  96.         rsSource.print("合并后的流:");
  97.         env.execute();
  98.     }
  99. }
复制代码
滑动窗口

  1. package com.bigdata.day07;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.api.common.functions.JoinFunction;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. import org.apache.flink.api.java.tuple.Tuple3;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  12. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  14. import org.apache.flink.streaming.api.windowing.time.Time;
  15. import java.text.ParseException;
  16. import java.text.SimpleDateFormat;
  17. import java.time.Duration;
  18. import java.util.Date;
  19. /**
  20. * @基本功能: 演示join的滑动窗口
  21. * @program:FlinkDemo
  22. * @author: 闫哥
  23. * @create:2024-05-20 09:11:13
  24. **/
  25. public class Demo02Join {
  26.     public static void main(String[] args) throws Exception {
  27.         //1. env-准备环境
  28.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  30.         // 将并行度设置为1,否则很难看到现象
  31.         env.setParallelism(1);
  32.         // 创建一个绿色的流
  33.         DataStreamSource<String> greenSource = env.socketTextStream("localhost", 8899);
  34.         // key,0,2021-03-26 12:09:00 将它变为三元组
  35.         SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenDataStream = greenSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  36.             @Override
  37.             public Tuple3<String, Integer, String> map(String value) throws Exception {
  38.                 String[] arr = value.split(",");
  39.                 return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);
  40.             }
  41.         }).assignTimestampsAndWatermarks(
  42.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  43.                         // 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值
  44.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  45.                             @Override
  46.                             public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
  47.                                 // 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。
  48.                                 String time = element.f2; //2021-03-26 12:09:00
  49.                                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  50.                                 try {
  51.                                     Date date = sdf.parse(time);
  52.                                     return date.getTime();
  53.                                 } catch (ParseException e) {
  54.                                     throw new RuntimeException(e);
  55.                                 }
  56.                             }
  57.                         })
  58.         );
  59.         // 创建一个橘色的流
  60.         DataStreamSource<String> orangeSource = env.socketTextStream("localhost", 9988);
  61.         // key,0,2021-03-26 12:09:00 将它变为三元组
  62.         SingleOutputStreamOperator<Tuple3<String, Integer, String>> orangeDataStream = orangeSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  63.             @Override
  64.             public Tuple3<String, Integer, String> map(String value) throws Exception {
  65.                 String[] arr = value.split(",");
  66.                 return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);
  67.             }
  68.         }).assignTimestampsAndWatermarks(
  69.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  70.                         // 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值
  71.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  72.                             @Override
  73.                             public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
  74.                                 // 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。
  75.                                 String time = element.f2; //2021-03-26 12:09:00
  76.                                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  77.                                 try {
  78.                                     Date date = sdf.parse(time);
  79.                                     return date.getTime();
  80.                                 } catch (ParseException e) {
  81.                                     throw new RuntimeException(e);
  82.                                 }
  83.                             }
  84.                         })
  85.         );
  86.         //2. source-加载数据
  87.         //3. transformation-数据处理转换
  88.         DataStream<Tuple3<String, Integer, Integer>> resultStream = greenDataStream.join(orangeDataStream)
  89.                 .where(tuple3 -> tuple3.f0)
  90.                 .equalTo(tuple3 -> tuple3.f0)
  91.                 // 滑动窗口
  92.                 .window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))
  93.                 .apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {
  94.                     @Override
  95.                     public Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {
  96.                         return Tuple3.of(first.f0, first.f1, second.f1);
  97.                     }
  98.                 });
  99.         //4. sink-数据输出
  100.         greenDataStream.print("绿色的流:");
  101.         orangeDataStream.print("橘色的流:");
  102.         resultStream.print("最终的结果:");
  103.         //5. execute-执行
  104.         env.execute();
  105.     }
  106. }
复制代码
CoGroup

  1. 1、优势:可以实现内连接,左连接,右连接
  2. 2、劣势:内存压力大
  3. 3、和上面的写法区别:将join换成coGroup,apply中实现的具体方法有区别
  4. 4、流程
  5. stream.coGroup(otherStream)
  6.         .where(<KeySelector>)
  7.         .equalTo(<KeySelector>)
  8.         .window(<WindowAssigner>)
  9.         .apply(<CoGroupFunction>);
复制代码
内连接

  1. package com.bigdata.day07;
  2. import org.apache.commons.lang3.time.DateUtils;
  3. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.api.common.functions.CoGroupFunction;
  6. import org.apache.flink.api.common.functions.JoinFunction;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.api.java.functions.KeySelector;
  9. import org.apache.flink.api.java.tuple.Tuple3;
  10. import org.apache.flink.streaming.api.datastream.DataStream;
  11. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  15. import org.apache.flink.streaming.api.windowing.time.Time;
  16. import org.apache.flink.util.Collector;
  17. import java.text.ParseException;
  18. import java.time.Duration;
  19. import java.util.Date;
  20. /**
  21. * 内连接
  22. */
  23. public class _02_双流join_CoGroup_内连接 {
  24.     public static void main(String[] args) throws Exception {
  25.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  26.         env.setParallelism(1);
  27.         // 绿色的流
  28.         DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
  29.         SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  30.             @Override
  31.             public Tuple3<String, Integer, String> map(String line) throws Exception {
  32.                 String[] split = line.split(",");
  33.                 return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
  34.             }
  35.         }).assignTimestampsAndWatermarks(
  36.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  37.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  38.                             @Override
  39.                             public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
  40.                               
  41.                                 String timeStr = tuple3.f2;
  42.                                 try {
  43.                                     Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
  44.                                     return date.getTime();
  45.                                 } catch (ParseException e) {
  46.                                     throw new RuntimeException(e);
  47.                                 }
  48.                             }
  49.                         }));
  50.                         
  51.         // 红色的流
  52.         DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);
  53.         SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  54.             @Override
  55.             public Tuple3<String, Integer, String> map(String line) throws Exception {
  56.                 String[] split = line.split(",");
  57.                 return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
  58.             }
  59.         }).assignTimestampsAndWatermarks(
  60.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  61.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  62.                             @Override
  63.                             public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
  64.                                 String timeStr = tuple3.f2;
  65.                                 try {
  66.                                     Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
  67.                                     return date.getTime();
  68.                                 } catch (ParseException e) {
  69.                                     throw new RuntimeException(e);
  70.                                 }
  71.                             }
  72.                         }));
  73.         
  74.         
  75.         // 连接
  76.         DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {
  77.             @Override
  78.             public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
  79.                 return tuple3.f0;
  80.             }
  81.         }).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {
  82.             @Override
  83.             public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
  84.                 return tuple3.f0;
  85.             }
  86.         }).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {
  87.             @Override
  88.             public void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {
  89.                 for (Tuple3<String, Integer, String> firesTuple3 : first) {
  90.                     for (Tuple3<String, Integer, String> secondTuple3 : second) {
  91.                         out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));
  92.                     }
  93.                 }
  94.             }
  95.         });
  96.         redSource.print("红色的流:");
  97.         greenSource.print("绿色的流:");
  98.         rsSource.print("合并后的流:");
  99.         env.execute();
  100.     }
  101. }
复制代码
外连接

  1. package com.bigdata.day07;
  2. import org.apache.commons.lang3.time.DateUtils;
  3. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.api.common.functions.CoGroupFunction;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. import org.apache.flink.api.java.functions.KeySelector;
  8. import org.apache.flink.api.java.tuple.Tuple3;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  14. import org.apache.flink.streaming.api.windowing.time.Time;
  15. import org.apache.flink.util.Collector;
  16. import java.text.ParseException;
  17. import java.time.Duration;
  18. import java.util.Date;
  19. /**
  20. * 外连接
  21. */
  22. public class _03_双流join_CoGroup_外连接 {
  23.     public static void main(String[] args) throws Exception {
  24.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25.         env.setParallelism(1);
  26.         
  27.         // 绿色的流
  28.         DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
  29.         SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  30.             @Override
  31.             public Tuple3<String, Integer, String> map(String line) throws Exception {
  32.                 String[] split = line.split(",");
  33.                 return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
  34.             }
  35.         }).assignTimestampsAndWatermarks(
  36.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  37.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  38.                             @Override
  39.                             public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
  40.                                 String timeStr = tuple3.f2;
  41.                                 try {
  42.                                     Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
  43.                                     return date.getTime();
  44.                                 } catch (ParseException e) {
  45.                                     throw new RuntimeException(e);
  46.                                 }
  47.                             }
  48.                         }));
  49.         // 红色的流
  50.         DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);
  51.         SingleOutputStreamOperator<Tuple3<String, Integer, String>> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  52.             @Override
  53.             public Tuple3<String, Integer, String> map(String line) throws Exception {
  54.                 String[] split = line.split(",");
  55.                 return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
  56.             }
  57.         }).assignTimestampsAndWatermarks(
  58.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  59.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  60.                             @Override
  61.                             public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
  62.                                 String timeStr = tuple3.f2;
  63.                                 try {
  64.                                     Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
  65.                                     return date.getTime();
  66.                                 } catch (ParseException e) {
  67.                                     throw new RuntimeException(e);
  68.                                 }
  69.                             }
  70.                         }));
  71.         DataStream<Tuple3<String, String, String>> rsSource = greenSource.coGroup(redSource).where(new KeySelector<Tuple3<String, Integer, String>, String>() {
  72.             @Override
  73.             public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
  74.                 return tuple3.f0;
  75.             }
  76.         }).equalTo(new KeySelector<Tuple3<String, Integer, String>, String>() {
  77.             @Override
  78.             public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
  79.                 return tuple3.f0;
  80.             }
  81.         }).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, String, String>>() {
  82.             @Override
  83.             public void coGroup(Iterable<Tuple3<String, Integer, String>> first, Iterable<Tuple3<String, Integer, String>> second, Collector<Tuple3<String, String, String>> out) throws Exception {
  84.             // 内连接,左连接,右连接的区别只在这里面存在,两层循环
  85.                 for (Tuple3<String, Integer, String> firesTuple3 : first) {
  86.                     boolean isExist = false;
  87.                     for (Tuple3<String, Integer, String> secondTuple3 : second) {
  88.                         isExist = true;
  89.                         out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red"+secondTuple3.f1));
  90.                     }
  91.                     if (!isExist){
  92.                         out.collect(Tuple3.of(firesTuple3.f0,"green"+firesTuple3.f1,"red null"));
  93.                     }
  94.                 }
  95.             }
  96.         });
  97.         redSource.print("红色的流:");
  98.         greenSource.print("绿色的流:");
  99.         rsSource.print("合并后的流:");
  100.         env.execute();
  101.     }
  102. }
复制代码
Interval Join


  1. 1、Join以及CoGroup 原因是 Join和CoGroup是窗口Join,必须给定窗口
  2. 2、Interval Join不需要给窗口。Interval Join 必须先分组才能使用。
  3. 3、先对数据源进行keyBy
  4. 4、 外流.intervalJoin(内流)
  5.         .between(-2,2)
  6.         .process
  7. between 左不包,右包
  8. 内部的流为下面的流(取单个值)
复制代码
 代码实现

  1. package com.bigdata.day07;
  2. import org.apache.commons.lang3.time.DateUtils;
  3. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.api.common.functions.CoGroupFunction;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. import org.apache.flink.api.java.functions.KeySelector;
  8. import org.apache.flink.api.java.tuple.Tuple3;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.KeyedStream;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  15. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  16. import org.apache.flink.streaming.api.windowing.time.Time;
  17. import org.apache.flink.util.Collector;
  18. import java.text.ParseException;
  19. import java.time.Duration;
  20. import java.util.Date;
  21. public class _04_双流join_Interval_Join {
  22.     public static void main(String[] args) throws Exception {
  23.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  24.         env.setParallelism(1);
  25.         
  26.         //绿色的流
  27.         DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
  28.         
  29.         KeyedStream<Tuple3<String, Integer, String>, String> greenSource = source.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  30.             @Override
  31.             public Tuple3<String, Integer, String> map(String line) throws Exception {
  32.                 String[] split = line.split(",");
  33.                 return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
  34.             }
  35.             // 水印
  36.         }).assignTimestampsAndWatermarks(
  37.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  38.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  39.                             @Override
  40.                             public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
  41.                                 String timeStr = tuple3.f2;
  42.                                 try {
  43.                                     Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
  44.                                     return date.getTime();
  45.                                 } catch (ParseException e) {
  46.                                     throw new RuntimeException(e);
  47.                                 }
  48.                             }
  49.                             // keyBy
  50.                         })).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {
  51.             @Override
  52.             public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
  53.                 return tuple3.f0;
  54.             }
  55.         });
  56.         // 红色的流
  57.         DataStreamSource<String> source2 = env.socketTextStream("localhost", 7778);
  58.         KeyedStream<Tuple3<String, Integer, String>, String> redSource = source2.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
  59.             @Override
  60.             public Tuple3<String, Integer, String> map(String line) throws Exception {
  61.                 String[] split = line.split(",");
  62.                 return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);
  63.             }
  64.             // 水印
  65.         }).assignTimestampsAndWatermarks(
  66.                 WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  67.                         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
  68.                             @Override
  69.                             public long extractTimestamp(Tuple3<String, Integer, String> tuple3, long recordTimestamp) {
  70.                                 String timeStr = tuple3.f2;
  71.                                 try {
  72.                                     Date date = DateUtils.parseDate(timeStr, "yyyy-MM-dd hh-mm-ss");
  73.                                     return date.getTime();
  74.                                 } catch (ParseException e) {
  75.                                     throw new RuntimeException(e);
  76.                                 }
  77.                             }
  78.                             // 分组
  79.                         })).keyBy(new KeySelector<Tuple3<String, Integer, String>, String>() {
  80.             @Override
  81.             public String getKey(Tuple3<String, Integer, String> tuple3) throws Exception {
  82.                 return tuple3.f0;
  83.             }
  84.         });
  85.         
  86.         // 实现
  87.         SingleOutputStreamOperator<String> rsSource = greenSource
  88.                 .intervalJoin(redSource)
  89.                 .between(Time.seconds(-2), Time.seconds(2))
  90.                 .process(new ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {
  91.             @Override
  92.             public void processElement(Tuple3<String, Integer, String> left, Tuple3<String, Integer, String> right, ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>.Context ctx, Collector<String> out) throws Exception {
  93.                 out.collect("left中的key:"+left.f0+",value="+left.f1+",time="+left.f2+",right中的key:"+right.f0+",value="+right.f1+",time="+right.f2);
  94.             }
  95.         });
  96.         redSource.print("红色的流:");
  97.         greenSource.print("绿色的流:");
  98.         rsSource.print("合并后的流:");
  99.         env.execute();
  100. /**
  101. * 红色的为下面的流
  102. * 范围:
  103. * 假如现在是10
  104. * 9 10 11 12
  105. */
  106.     }
  107. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

写过一篇

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表