马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
一. 配景.
迩来在验证kafka 开启kerberos的环境下, flink使命的支持环境.
但是验证的时间发现一个互斥的环境. 在读取数据的时间, 在开启kafka gruop id的权限控制的时间, flink sql 纵然设置了gruop id , 竟然还能读取数据.
这个和预期不符. 以是才较真验证了一下.
二. kafka斲丧topic数据姿势
斲丧kafka的数据的时间起首要构造KafkaConsumer客户端, 然后KafkaConsumer客户端有两种方式读取topic 中的数据.
- 利用 subscribe 是最常见的,由于它支持动态分区再均衡和斲丧者组的管理,恰当多数场景。
- 利用 assign 恰当须要正确控制分区斲丧的特定场景,但不支持主动再均衡,因此须要开发者手动管理分区分配和调解。
2.1. subscribe 方法
- 目的:重要用于订阅一个或多个主题。斲丧者会主动分配这些主题的分区。
- 利用场景:恰当利用斲丧者组(Consumer Group)的场景。Kafka 会主动举行分区的再均衡(rebalancing),确保同一斲丧者组内不会有多个斲丧者斲丧同一分区。
- 主动分配:利用 subscribe 时,Kafka 会主动为斲丧者分配它所订阅主题下的分区。
- 再均衡监听器:可以通过实现 ConsumerRebalanceListener 接口来自界说在分区再均衡时的活动。
- 动态性:如果新的分区被添加到主题中,斲丧者将主动开始斲丧新的分区。
- API 示例:
- List<String> topics = Arrays.asList("topic1", "topic2");
- consumer.subscribe(topics);
复制代码 2.2. assign 方法
- 目的:用于手动分配斲丧者要斲丧的详细分区。
- 利用场景:恰当须要对某些特定分区举行正确控制的场景。比方,须要单独处置惩罚特定分区时。
- 手动分配:通过 assign 方法,开发者显式指定斲丧者应该斲丧哪些分区。
- 无再均衡:利用 assign 时,Kafka 不会实行分区再均衡。斲丧者组的概念在这种模式下不实用。
- 静态性:如果主题增长了新的分区,斲丧者不会主动开始斲丧这些新分区,除非显式地调用 assign 方法来分配新的分区。
- API 示例:
- List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1));
- consumer.assign(partitions);
复制代码 三. 利用java client 验证.
3.1. 总结
- 无论subscribe 和assign 都须要授权topic .
- subscribe 方法须要指定group id , 以是须要group id 授权.
- assign 方法 group id 不是必填项, 不指定group id 的时间, group id 不见效, 指定了之后group id , 权限控制就会见效.
3.2. subscribe 方法
-
-
- public static void main(String[] args) {
- System.setProperty("java.security.krb5.conf", "tmp/krb5.conf");
- Properties props = new Properties();
- // group.id,指定了消费者所属群组
- props.put("bootstrap.servers", "master01:9092");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("group.id", "kafka-group-01");
- props.put("auto.offset.reset","earliest");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "GSSAPI");
- props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +
- "useKeyTab=true " +
- "keyTab="/tmp/kafka.keytab" " +
- "storeKey=true " +
- "useTicketCache=false " +
- "serviceName="kafka" " +
- "principal="kafka/ALL@EXAMPLE.COM";");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- consumer.subscribe(Collections.singletonList("kafka-validate-01"));
-
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
- for (ConsumerRecord<String, String> record : records) {
- LOG.info("KafkaConsumerDemoSubscribe#ConsumerRecord -> KEY : {} , VALUE : {} ", record.key(),record.value());
- }
- }
-
复制代码 3.3. assign 方法示例
- public static void main(String[] args) {
- System.setProperty("java.security.krb5.conf", "/tmp/krb5.conf");
- Properties props = new Properties();
- // group.id,指定了消费者所属群组
- props.put("bootstrap.servers", "master01:9092");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- // props.put("group.id", "kafka-group");
- props.put("auto.offset.reset","earliest");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "GSSAPI");
- props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +
- "useKeyTab=true " +
- "keyTab="/tmp/kafka.keytab" " +
- "storeKey=true " +
- "useTicketCache=false " +
- "serviceName="kafka" " +
- "principal="kafka/ALL@EXAMPLE.COM";");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- String topic = "kafka-validate-01";
- topic= "kafka_kerberos";
- consumer.assign(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
- for (ConsumerRecord<String, String> record : records) {
- LOG.info("KafkaConsumerDemoAssign#ConsumerRecord -> KEY : {} , VALUE : {} ", record.key(),record.value());
- }
-
- }
复制代码 四. FLINK SQL 使命验证…
flink 官方文档:
FLINK 利用assign构建KafkaConsumer , scan.startup.mode 设置项决定了 Kafka consumer 的启动模式。
序号参数寄义kafka gruop id 是否必填1group-offsets (默认)从 Zookeeper/Kafka 中某个指定的斲丧组已提交的偏移量开始。是2earliest-offset从大概的最早偏移量开始否3latest-offset从最末端偏移量开始否4timestamp从用户为每个 partition 指定的时间戳开始否4specific-offsets从用户为每个 partition 指定的偏移量开始否
- 只有利用scan.startup.mode 为 group-offsets flink使运气行的时间才会报gruop id 相干的权限非常.
非常信息:
Caused by: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: kafka-validate-group-xx
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |