配景
zookeeper无权限访问到根目录
步调
- 在kafka/config 目录中创建
- vi config/zookeeper_jaas.conf
复制代码 - 在zookeeper_jaas.conf中添加
- Server {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="12345"
- user_admin="12345";
- };
- #user_{username}="{password}"
复制代码 - 在 zookeeper.properties最后添加配置
- #auth
- authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
- requireClientAuthScheme=sasl
- jaasLoginRenew=3600000
复制代码 - 在zookeeper-server-start.sh中添加配置
根据不同的目录位置进行修改-Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/zookeeper_jaas.conf
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/zookeeper_jaas.conf"
- fi
复制代码 - 启动zookeeper
- ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
复制代码 - 下面开始设置ACL配置
- 登录zookeeper192.168.6.42:2181IP地点根据自己的进行调整
- ./zookeeper-shell.sh 192.168.6.42:2181
复制代码 - 添加用户
- addauth digest admin:12345
- addauth digest kafka:12345
复制代码 - 设置ACLip:192.168.6.42:cdrwa根据自己的ip地点进行修改
- setAcl / ip:192.168.4.235:cdrwa,ip:127.0.0.1:cdrwa,auth:kafka:cdrwa,auth:admin:cdrwa
- setAcl /consumers ip:192.168.4.235:cdrwa,ip:127.0.0.1:cdrwa,auth:kafka:cdrwa,auth:admin:cdrwa
复制代码- cdrwa:
- create: 你可以创建子节点。
- read: 你可以获取节点数据以及当前节点的子节点列表。
- write: 你可以为节点设置数据。
- delete: 你可以删除子节点。
- admin: 可以为节点设置权限
复制代码 - 查看是否配置精确
- getAcl /
- getAcl /consumers
复制代码
- 添加kafka的配置
- vim config/kafka_server_jaas.conf
复制代码 - 添加内容
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="12345"
- user_admin="12345";
- };
- Client {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="12345";
- };
复制代码 - 修改config/server.properties
- # AUTH
- security.inter.broker.protocol=SASL_PLAINTEXT
- sasl.mechanism.inter.broker.protocol=PLAIN
- sasl.enabled.mechanisms=PLAIN
- authorizer.class.name=kafka.security.authorizer.AclAuthorizer
- allow.everyone.if.no.acl.found=true
- listeners=SASL_PLAINTEXT://0.0.0.0:9092
- advertised.listeners=SASL_PLAINTEXT://:9092
- #将zookeeper.connect改成zookeeper的地址
- zookeeper.connect=192.168.6.42:2181
复制代码 - 调整kafka的启动脚本 kafka-server-start.sh-Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_server_jaas.conf根据自己的地点进行配置
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_server_jaas.conf"
- fi
复制代码 - 启动kafka
- ./kafka-server-start.sh -daemon ../config/server.properti
复制代码 - 测试
进入kafka目录,在config目录下创建kafka_client_jaas.conf文件,并写入如下内容。
- KafkaClient {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="admin";
- };
复制代码 - 配置提供者认证,修改提供者启动脚本,vi bin/kafka-console-producer.sh
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_client_jaas.conf"
- fi
- exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
复制代码 - 启动消费者
- ./kafka-console-producer.sh --broker-list 192.168.4.235:9092 --topic testTopic --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
复制代码 - 配置消费者认证,修改提供者启动脚本,vi bin/kafka-console-consumer.sh
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_client_jaas.conf"
- fi
- exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
复制代码 - 启动消费者
- ./kafka-console-consumer.sh --bootstrap-server 192.168.4.235:9092 --topic testTopic --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
复制代码 有消息输出即为成功
- springboot的配置
- spring:
- kafka:
- # docker http://192.168.2.202:8080
- bootstrap-servers: http://192.168.6.42:9092
- # 配置用户名密码
- producer:
- properties:
- sasl:
- mechanism: PLAIN
- security:
- protocol: SASL_PLAINTEXT
- consumer:
- properties:
- sasl:
- mechanism: PLAIN
- security:
- protocol: SASL_PLAINTEXT
复制代码 - 在相干的kafkaConfig中增长相干配置
- @Bean
- public KafkaTemplate kafkaTemplate() {
- Map<String, Object> configs = new HashMap<>();
- configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);
- configs.put(ProducerConfig.RETRIES_CONFIG, pro_retry_config);
- configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config);
- configs.put(ProducerConfig.ACKS_CONFIG, acks_config);
- configs.put(ProducerConfig.LINGER_MS_CONFIG, linger_ms_config);
- configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config);
- configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,key_serializer_config);
- configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,value_serializer_config);
- configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression_type_config);
- if (Boolean.valueOf(auth_enabled)) {
- configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name());
- configs.put(SaslConfigs.SASL_MECHANISM, sasl_mechanism);
- }
- DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configs);
- return new KafkaTemplate(producerFactory);
- }
复制代码 - 最后在启动脚本中新增一个配置
- -Djava.security.auth.login.config=客户端登录文件所在的位置
- #eg:
- -Djava.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf
-
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |