4.3、Flink使命怎样读取Kafka中的数据

打印 上一主题 下一主题

主题 582|帖子 582|积分 1746

目录
1、添加pom依靠
2、API利用说明
3、这是一个完整的入门案例
4、Kafka消息应该怎样解析
4.1、只获取Kafka消息的value部分
​4.2、获取完整Kafka消息(key、value、Metadata)
4.3、自界说Kafka消息解析器
5、起始消费位点应该怎样设置
​5.1、earliest()
5.2、latest()
5.3、timestamp()
6、Kafka分区扩容了,该怎么办 —— 动态分区查抄
7、在加载KafkaSource时提取事件时间&添加水位线
7.1、利用内置的单调递增的水位线天生器 + kafka timestamp 为事件时间
7.2、利用内置的单调递增的水位线天生器 + kafka 消息中的 ID字段 为事件时间

1、添加pom依靠

我们可以利用Flink官方提供毗连Kafka的工具flink-connector-kafka
该工具实现了一个消费者FlinkKafkaConsumer,可以用它来读取kafka的数据
如果想利用这个通用的Kafka毗连工具,需要引入jar依靠
  1. <!-- 引入 kafka连接器依赖-->
  2. <dependency>
  3.     <groupId>org.apache.flink</groupId>
  4.     <artifactId>flink-connector-kafka</artifactId>
  5.     <version>1.17.0</version>
  6. </dependency>
复制代码

2、API利用说明

官网链接:Apache Kafka 毗连器
语法说明: 
  1. // 1.初始化 KafkaSource 实例
  2. KafkaSource<String> source = KafkaSource.<String>builder()
  3.     .setBootstrapServers(brokers)                           // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)                     
  4.     .setTopics("input-topic")                               // 必填:指定要消费的topic
  5.     .setGroupId("my-group")                                 // 必填:指定消费者的groupid(不存在时会自动创建)
  6.     .setValueOnlyDeserializer(new SimpleStringSchema())     // 必填:指定反序列化器(用来解析kafka消息数据,转换为flink数据类型)
  7.     .setStartingOffsets(OffsetsInitializer.earliest())      // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest())
  8.     .build();
  9. // 2.通过 fromSource + KafkaSource 获取 DataStreamSource
  10. env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
复制代码

3、这是一个完整的入门案例

开发语言:java1.8
flink版本:flink1.17.0
  1. public class ReadKafka {
  2.     public static void main(String[] args) throws Exception {
  3.         newAPI();
  4.     }
  5.     public static void newAPI() throws Exception {
  6.         // 1.获取执行环境
  7.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8.         // 2.读取kafka数据
  9.         KafkaSource<String> source = KafkaSource.<String>builder()
  10.                 .setBootstrapServers("worker01:9092")               // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)
  11.                 .setTopics("20230810")                              // 必填:指定要消费的topic
  12.                 .setGroupId("FlinkConsumer")                        // 必填:指定消费者的groupid(不存在时会自动创建)
  13.                 .setValueOnlyDeserializer(new SimpleStringSchema()) // 必填:指定反序列化器(用来解析kafka消息数据)
  14.                 .setStartingOffsets(OffsetsInitializer.earliest())  // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest())
  15.                 .build();
  16.         env.fromSource(source,
  17.                 WatermarkStrategy.noWatermarks(),
  18.                 "Kafka Source")
  19.                 .print()
  20.         ;
  21.         // 3.触发程序执行
  22.         env.execute();
  23.     }
  24. }
复制代码

4、Kafka消息应该怎样解析

代码中需要提供一个反序列化器(Deserializer)来对 Kafka 的消息进行解析
反序列化器的功能:
                将Kafka ConsumerRecords转换为Flink处置惩罚的数据范例(Java/Scala对象)
反序列化器通过  setDeserializer(KafkaRecordDeserializationSchema.of(反序列化器范例)) 指定
下面先容两种常用Kafka消息解析器:
        KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)) :
                 1、返回完整的Kafka消息,将JSON字符串反序列化为ObjectNode对象
                 2、可以选择是否返回Kafak消息的Metadata信息,true-返回,false-不返回
        KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) :
                1、只返回Kafka消息中的value部分 
4.1、只获取Kafka消息的value部分

4.2、获取完整Kafka消息(key、value、Metadata)


kafak消息格式:
                key =  {"nation":"蜀国"}
                value = {"ID":整数}
  1.     public static void ParseMessageJSONKeyValue() throws Exception {
  2.         // 1.获取执行环境
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         // 2.读取kafka数据
  5.         KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
  6.                 .setBootstrapServers("worker01:9092")               // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)
  7.                 .setTopics("9527")                                  // 必填:指定要消费的topic
  8.                 .setGroupId("FlinkConsumer")                        // 必填:指定消费者的groupid(不存在时会自动创建)
  9.                 // 必填:指定反序列化器(将kafak消息解析为ObjectNode,json对象)
  10.                 .setDeserializer(KafkaRecordDeserializationSchema.of(
  11.                         // includeMetadata = (true:返回Kafak元数据信息 false:不返回)
  12.                         new JSONKeyValueDeserializationSchema(true)
  13.                 ))
  14.                 .setStartingOffsets(OffsetsInitializer.latest())  // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest())
  15.                 .build();
  16.         env
  17.                 .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
  18.                 .print()
  19.         ;
  20.         // 3.触发程序执行
  21.         env.execute();
  22.     }
复制代码
运行效果:    

常见报错: 
  1. Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = 9527, partition = 0, leaderEpoch = 0, offset = 1064, CreateTime = 1691668775938, serialized key size = 4, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@5e9eaab8, value = [B@67390400).
  2.         at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
  3.         at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
  4.         ... 14 more
  5. Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'xxxx': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
  6. at [Source: (byte[])"xxxx"; line: 1, column: 5]
复制代码
报错原因:
          出现这个报错,一般是利用flink读取fafka时,利用JSONKeyValueDeserializationSchema
来解析消息时,kafka消息中的key 大概 value 内容不符合json格式而造成的解析错误
比方下面这个格式,就会造成解析错误  key=1000,value=你好
那应该怎么解决呢?
        1、如果有权限修改Kafka消息格式,可以将Kafka消息key&value内容修改为Json格式
        2、如果没有权限修改Kafka消息格式(好比线上环境,修改比较困难),可以重新实现
  1.        JSONKeyValueDeserializationSchema类,根据所需格式来解析Kafka消息(可以参考源码)
复制代码
4.3、自界说Kafka消息解析器

        生产中对Kafka消息及解析的格式总是各种各样的,当flink预界说的解析器满意不了业务需求时,可以通过自界说kafka消息解析器来完成业务的支持
比方,当利用 MyJSONKeyValueDeserializationSchema 获取Kafka元数据时,只返回了 offset、topic、partition 三个字段信息,现在需要`kafka生产者写入数据时的timestamp`,就可以通过自界说kafka消息解析器来完成
代码示例:
  1. // TODO 自定义Kafka消息解析器,在 metadata 中增加 timestamp字段
  2. public class MyJSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode>{
  3.         private static final long serialVersionUID = 1509391548173891955L;
  4.         private final boolean includeMetadata;
  5.         private ObjectMapper mapper;
  6.         public MyJSONKeyValueDeserializationSchema(boolean includeMetadata) {
  7.             this.includeMetadata = includeMetadata;
  8.         }
  9.         @Override
  10.         public void open(DeserializationSchema.InitializationContext context) throws Exception {
  11.             mapper = JacksonMapperFactory.createObjectMapper();
  12.         }
  13.         @Override
  14.         public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
  15.             ObjectNode node = mapper.createObjectNode();
  16.             if (record.key() != null) {
  17.                 node.set("key", mapper.readValue(record.key(), JsonNode.class));
  18.             }
  19.             if (record.value() != null) {
  20.                 node.set("value", mapper.readValue(record.value(), JsonNode.class));
  21.             }
  22.             if (includeMetadata) {
  23.                 node.putObject("metadata")
  24.                         .put("offset", record.offset())
  25.                         .put("topic", record.topic())
  26.                         .put("partition", record.partition())
  27.                         // 添加 timestamp 字段
  28.                         .put("timestamp",record.timestamp())
  29.                 ;
  30.             }
  31.             return node;
  32.         }
  33.         @Override
  34.         public boolean isEndOfStream(ObjectNode nextElement) {
  35.             return false;
  36.         }
  37.         @Override
  38.         public TypeInformation<ObjectNode> getProducedType() {
  39.             return getForClass(ObjectNode.class);
  40.         }
  41.     }
复制代码
运行效果:


5、起始消费位点应该怎样设置

起始消费位点说明:
        起始消费位点是指 启动flink使命时,应该从哪个位置开始读取Kafka的消息   
        下面先容下常用的三个设置:    
                OffsetsInitializer.earliest()  :
                        从最早位点开始消
                        这里的最早指的是Kafka消息生存的时长(默以为7天,天生环境各公司略有不同)
                        该这设置为默认设置,当不指定OffsetsInitializer.xxx时,默以为earliest() 
                OffsetsInitializer.latest()   :
                        从最末了位点开始消费
                        这里的最末了指的是flink使命启动时间点之后生产的消息
                OffsetsInitializer.timestamp(时间戳) :
                        从时间戳大于等于指定时间戳(毫秒)的数据开始消费
下面用案例说明下,三种设置的效果,kafak天生10条数据,如下:
5.1、earliest()


代码示例:
  1. KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
  2.         .setBootstrapServers("worker01:9092")
  3.         .setTopics("23230811")
  4.         .setGroupId("FlinkConsumer")
  5.         // 将kafka消息解析为Json对象,并返回元数据
  6.         .setDeserializer(KafkaRecordDeserializationSchema.of(
  7.                 new JSONKeyValueDeserializationSchema(true)
  8.         ))
  9.         // 设置起始消费位点:从最早位置开始消费(该设置为默认设置)
  10.         .setStartingOffsets(OffsetsInitializer.earliest())
  11.         .build();
复制代码
运行效果:

5.2、latest()

代码示例:
  1. KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
  2.         .setBootstrapServers("worker01:9092")
  3.         .setTopics("23230811")
  4.         .setGroupId("FlinkConsumer")
  5.         // 将kafka消息解析为Json对象,并返回元数据
  6.         .setDeserializer(KafkaRecordDeserializationSchema.of(
  7.                 new JSONKeyValueDeserializationSchema(true)
  8.         ))
  9.         // 设置起始消费位点:从最末尾位点开始消费
  10.         .setStartingOffsets(OffsetsInitializer.latest())
  11.         .build();
复制代码
运行效果:

5.3、timestamp()

代码示例:
  1. KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
  2.         .setBootstrapServers("worker01:9092")
  3.         .setTopics("23230811")
  4.         .setGroupId("FlinkConsumer")
  5.         // 将kafka消息解析为Json对象,并返回元数据
  6.         .setDeserializer(KafkaRecordDeserializationSchema.of(
  7.                 new MyJSONKeyValueDeserializationSchema(true)
  8.         ))
  9.         // 设置起始消费位点:从指定时间戳后开始消费
  10.         .setStartingOffsets(OffsetsInitializer.timestamp(1691722791273L))
  11.         .build();
复制代码
运行效果:


6、Kafka分区扩容了,该怎么办 —— 动态分区查抄

        在flink1.13的时候,如果Kafka分区扩容了,只有通过重启flink使命,才气消费到新增分区的数据,小编就曾遇到过上游业务部门的kafka分区扩容了,并没有关照卑鄙利用方,导致及时指标异常,以致丢失了数据。
        在flink1.17的时候,可以通过`开启动态分区查抄`,来实现不用重启flink使命,就能消费到新增分区的数据
开启分区查抄:(默认不开启)
  1. KafkaSource.builder()
  2.     .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区
复制代码
代码示例:
  1. KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
  2.         .setBootstrapServers("worker01:9092")
  3.         .setTopics("9527")
  4.         .setGroupId("FlinkConsumer")
  5.         // 将kafka消息解析为Json对象,并返回元数据
  6.         .setDeserializer(KafkaRecordDeserializationSchema.of(
  7.                 new JSONKeyValueDeserializationSchema(true)
  8.         ))
  9.         // 设置起始消费位点:从最末尾位点开始消费
  10.         .setStartingOffsets(OffsetsInitializer.latest())
  11.         // 开启动态分区检查(默认不开启)
  12.         .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区
  13.         .build();
复制代码

7、在加载KafkaSource时提取事件时间&添加水位线

可以在 fromSource(source,WatermarkStrategy,sourceName) 时,提取事件时间和制定水位线天生战略
注意:当不指定事件时间提取器时,Kafka Source 利用 Kafka 消息中的时间戳作为事件时间
7.1、利用内置的单调递增的水位线天生器 + kafka timestamp 为事件时间

代码示例:
  1.     // 在读取Kafka消息时,提取事件时间&插入水位线
  2.     public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {
  3.         // 1.获取执行环境
  4.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         // 2.读取kafka数据
  6.         KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
  7.                 .setBootstrapServers("worker01:9092")
  8.                 .setTopics("9527")
  9.                 .setGroupId("FlinkConsumer")
  10.                 // 将kafka消息解析为Json对象,并返回元数据
  11.                 .setDeserializer(KafkaRecordDeserializationSchema.of(
  12.                         new MyJSONKeyValueDeserializationSchema(true)
  13.                 ))
  14.                 // 设置起始消费位点:从最末尾位点开始消费
  15.                 .setStartingOffsets(OffsetsInitializer.latest())
  16.                 .build();
  17.         env.fromSource(source,
  18.                         // 使用内置的单调递增的水位线生成器(默认使用 kafka的timestamp作为事件时间)
  19.                         WatermarkStrategy.forMonotonousTimestamps(),
  20.                         "Kafka Source")
  21.                 // 通过 ProcessFunction 查看提取的事件时间和水位线信息
  22.                 .process(
  23.                         new ProcessFunction<ObjectNode, String>() {
  24.                             @Override
  25.                             public void processElement(ObjectNode kafkaJson, ProcessFunction<ObjectNode, String>.Context ctx, Collector<String> out) throws Exception {
  26.                                 // 当前处理时间
  27.                                 long currentProcessingTime = ctx.timerService().currentProcessingTime();
  28.                                 // 当前水位线
  29.                                 long currentWatermark = ctx.timerService().currentWatermark();
  30.                                 StringBuffer record = new StringBuffer();
  31.                                 record.append("========================================\n");
  32.                                 record.append(kafkaJson + "\n");
  33.                                 record.append("currentProcessingTime:" + currentProcessingTime + "\n");
  34.                                 record.append("currentWatermark:" + currentWatermark + "\n");
  35.                                 record.append("kafka-ID:" + Long.parseLong(kafkaJson.get("value").get("ID").toString()) + "\n");
  36.                                 record.append("kafka-timestamp:" + Long.parseLong(kafkaJson.get("metadata").get("timestamp").toString()) + "\n");
  37.                                 out.collect(record.toString());
  38.                             }
  39.                         }
  40.                 ).print();
  41.         // 3.触发程序执行
  42.         env.execute();
  43.     }
复制代码
运行效果:

7.2、利用内置的单调递增的水位线天生器 + kafka 消息中的 ID字段 为事件时间

代码示例:
  1.     // 在读取Kafka消息时,提取事件时间&插入水位线
  2.     public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {
  3.         // 1.获取执行环境
  4.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         // 2.读取kafka数据
  6.         KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
  7.                 .setBootstrapServers("worker01:9092")
  8.                 .setTopics("9527")
  9.                 .setGroupId("FlinkConsumer")
  10.                 // 将kafka消息解析为Json对象,并返回元数据
  11.                 .setDeserializer(KafkaRecordDeserializationSchema.of(
  12.                         new MyJSONKeyValueDeserializationSchema(true)
  13.                 ))
  14.                 // 设置起始消费位点:从最末尾位点开始消费
  15.                 .setStartingOffsets(OffsetsInitializer.latest())
  16.                 .build();
  17.         env.fromSource(source,
  18.                         // 使用内置的单调递增的水位线生成器(使用 kafka消息中的ID字段作为事件时间)
  19.                         WatermarkStrategy.<ObjectNode>forMonotonousTimestamps()
  20.                                 // 提取 Kafka消息中的 ID字段作为 事件时间
  21.                                 .withTimestampAssigner(
  22.                                         (json, timestamp) -> Long.parseLong(json.get("value").get("ID").toString())
  23.                                 ),
  24.                         "Kafka Source")
  25.                 // 通过 ProcessFunction 查看提取的事件时间和水位线信息
  26.                 .process(
  27.                         new ProcessFunction<ObjectNode, String>() {
  28.                             @Override
  29.                             public void processElement(ObjectNode kafkaJson, ProcessFunction<ObjectNode, String>.Context ctx, Collector<String> out) throws Exception {
  30.                                 // 当前处理时间
  31.                                 long currentProcessingTime = ctx.timerService().currentProcessingTime();
  32.                                 // 当前水位线
  33.                                 long currentWatermark = ctx.timerService().currentWatermark();
  34.                                 StringBuffer record = new StringBuffer();
  35.                                 record.append("========================================\n");
  36.                                 record.append(kafkaJson + "\n");
  37.                                 record.append("currentProcessingTime:" + currentProcessingTime + "\n");
  38.                                 record.append("currentWatermark:" + currentWatermark + "\n");
  39.                                 record.append("kafka-ID:" + Long.parseLong(kafkaJson.get("value").get("ID").toString()) + "\n");
  40.                                 record.append("kafka-timestamp:" + Long.parseLong(kafkaJson.get("metadata").get("timestamp").toString()) + "\n");
  41.                                 out.collect(record.toString());
  42.                             }
  43.                         }
  44.                 ).print();
  45.         // 3.触发程序执行
  46.         env.execute();
  47.     }
复制代码
运行效果:







免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

北冰洋以北

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表