Flink SQL 支持 kafka 开启 kerberos 权限控制.

[复制链接]
发表于 2025-11-19 05:07:27 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
一. 配景.

迩来在验证kafka 开启kerberos的环境下, flink使命的支持环境.
但是验证的时间发现一个互斥的环境. 在读取数据的时间, 在开启kafka gruop id的权限控制的时间, flink sql 纵然设置了gruop id , 竟然还能读取数据.
这个和预期不符. 以是才较真验证了一下.
二. kafka斲丧topic数据姿势

斲丧kafka的数据的时间起首要构造KafkaConsumer客户端, 然后KafkaConsumer客户端有两种方式读取topic 中的数据.


  • 利用 subscribe 是最常见的,由于它支持动态分区再均衡和斲丧者组的管理,恰当多数场景。
  • 利用 assign 恰当须要正确控制分区斲丧的特定场景,但不支持主动再均衡,因此须要开发者手动管理分区分配和调解。
2.1. subscribe 方法


  • 目的:重要用于订阅一个或多个主题。斲丧者会主动分配这些主题的分区。
  • 利用场景:恰当利用斲丧者组(Consumer Group)的场景。Kafka 会主动举行分区的再均衡(rebalancing),确保同一斲丧者组内不会有多个斲丧者斲丧同一分区。
  • 主动分配:利用 subscribe 时,Kafka 会主动为斲丧者分配它所订阅主题下的分区。
  • 再均衡监听器:可以通过实现 ConsumerRebalanceListener 接口来自界说在分区再均衡时的活动。
  • 动态性:如果新的分区被添加到主题中,斲丧者将主动开始斲丧新的分区。
  • API 示例:
    1. List<String> topics = Arrays.asList("topic1", "topic2");
    2. consumer.subscribe(topics);
    复制代码
2.2. assign 方法


  • 目的:用于手动分配斲丧者要斲丧的详细分区。
  • 利用场景:恰当须要对某些特定分区举行正确控制的场景。比方,须要单独处置惩罚特定分区时。
  • 手动分配:通过 assign 方法,开发者显式指定斲丧者应该斲丧哪些分区。
  • 无再均衡:利用 assign 时,Kafka 不会实行分区再均衡。斲丧者组的概念在这种模式下不实用。
  • 静态性:如果主题增长了新的分区,斲丧者不会主动开始斲丧这些新分区,除非显式地调用 assign 方法来分配新的分区。
  • API 示例:
    1. List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1));
    2. consumer.assign(partitions);
    复制代码
三. 利用java client 验证.

3.1. 总结


  • 无论subscribe 和assign 都须要授权topic .
  • subscribe 方法须要指定group id , 以是须要group id 授权.
  • assign 方法 group id 不是必填项, 不指定group id 的时间, group id 不见效, 指定了之后group id , 权限控制就会见效.
3.2. subscribe 方法

  1.         
  2.         
  3.     public static void main(String[] args) {
  4.         System.setProperty("java.security.krb5.conf", "tmp/krb5.conf");
  5.         Properties props = new Properties();
  6.         // group.id,指定了消费者所属群组
  7.         props.put("bootstrap.servers", "master01:9092");
  8.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  9.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  10.         props.put("group.id", "kafka-group-01");
  11.         props.put("auto.offset.reset","earliest");
  12.         props.put("security.protocol", "SASL_PLAINTEXT");
  13.         props.put("sasl.mechanism", "GSSAPI");
  14.         props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +
  15.                 "useKeyTab=true " +
  16.                 "keyTab="/tmp/kafka.keytab" " +
  17.                 "storeKey=true " +
  18.                 "useTicketCache=false " +
  19.                 "serviceName="kafka" " +
  20.                 "principal="kafka/ALL@EXAMPLE.COM";");
  21.         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  22.         consumer.subscribe(Collections.singletonList("kafka-validate-01"));
  23.         
  24.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
  25.         for (ConsumerRecord<String, String> record : records) {
  26.             LOG.info("KafkaConsumerDemoSubscribe#ConsumerRecord  -> KEY : {} , VALUE : {} ", record.key(),record.value());
  27.          }
  28.     }
  29.    
复制代码
3.3. assign 方法示例

  1.     public static void main(String[] args) {
  2.         System.setProperty("java.security.krb5.conf", "/tmp/krb5.conf");
  3.         Properties props = new Properties();
  4.         // group.id,指定了消费者所属群组
  5.         props.put("bootstrap.servers", "master01:9092");
  6.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  7.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  8. //        props.put("group.id", "kafka-group");
  9.         props.put("auto.offset.reset","earliest");
  10.         props.put("security.protocol", "SASL_PLAINTEXT");
  11.         props.put("sasl.mechanism", "GSSAPI");
  12.         props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +
  13.                 "useKeyTab=true " +
  14.                 "keyTab="/tmp/kafka.keytab" " +
  15.                 "storeKey=true " +
  16.                 "useTicketCache=false " +
  17.                 "serviceName="kafka" " +
  18.                 "principal="kafka/ALL@EXAMPLE.COM";");
  19.         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  20.         String topic = "kafka-validate-01";
  21.         topic= "kafka_kerberos";
  22.         consumer.assign(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));
  23.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
  24.         for (ConsumerRecord<String, String> record : records) {
  25.              LOG.info("KafkaConsumerDemoAssign#ConsumerRecord  -> KEY : {} , VALUE : {} ", record.key(),record.value());
  26.          }
  27.          
  28.     }
复制代码
四. FLINK SQL 使命验证…

flink 官方文档:
FLINK 利用assign构建KafkaConsumer , scan.startup.mode 设置项决定了 Kafka consumer 的启动模式。
序号参数寄义kafka gruop id 是否必填1group-offsets (默认)从 Zookeeper/Kafka 中某个指定的斲丧组已提交的偏移量开始。是2earliest-offset从大概的最早偏移量开始否3latest-offset从最末端偏移量开始否4timestamp从用户为每个 partition 指定的时间戳开始否4specific-offsets从用户为每个 partition 指定的偏移量开始否

  • 只有利用scan.startup.mode  为 group-offsets  flink使运气行的时间才会报gruop id 相干的权限非常.
非常信息:
   Caused by: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: kafka-validate-group-xx

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表