zookeeper、kakfa添加用户加密

打印 上一主题 下一主题

主题 908|帖子 908|积分 2724

配景

zookeeper无权限访问到根目录
步调


  • 在kafka/config 目录中创建
    1. vi config/zookeeper_jaas.conf
    复制代码
  • 在zookeeper_jaas.conf中添加
    1. Server {
    2. org.apache.kafka.common.security.plain.PlainLoginModule required
    3. username="admin"
    4. password="12345"
    5. user_admin="12345";
    6. };
    7. #user_{username}="{password}"
    复制代码
  • zookeeper.properties最后添加配置
    1. #auth
    2. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    3. requireClientAuthScheme=sasl
    4. jaasLoginRenew=3600000
    复制代码
  • 在zookeeper-server-start.sh中添加配置
    根据不同的目录位置进行修改-Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/zookeeper_jaas.conf
    1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    2.     export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/zookeeper_jaas.conf"
    3. fi
    复制代码
  • 启动zookeeper
    1. ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
    复制代码
  • 下面开始设置ACL配置

    • 登录zookeeper192.168.6.42:2181IP地点根据自己的进行调整
      1. ./zookeeper-shell.sh 192.168.6.42:2181
      复制代码
    • 添加用户
      1. addauth digest admin:12345
      2. addauth digest kafka:12345
      复制代码
    • 设置ACLip:192.168.6.42:cdrwa根据自己的ip地点进行修改
      1. setAcl / ip:192.168.4.235:cdrwa,ip:127.0.0.1:cdrwa,auth:kafka:cdrwa,auth:admin:cdrwa
      2. setAcl /consumers ip:192.168.4.235:cdrwa,ip:127.0.0.1:cdrwa,auth:kafka:cdrwa,auth:admin:cdrwa
      复制代码
      1. cdrwa:
      2.         create: 你可以创建子节点。
      3.         read: 你可以获取节点数据以及当前节点的子节点列表。
      4.         write: 你可以为节点设置数据。
      5.         delete: 你可以删除子节点。
      6.         admin: 可以为节点设置权限
      复制代码
    • 查看是否配置精确
      1. getAcl /
      2. getAcl /consumers
      复制代码

  • 添加kafka的配置
    1. vim config/kafka_server_jaas.conf
    复制代码
  • 添加内容
    1. KafkaServer {
    2.   org.apache.kafka.common.security.plain.PlainLoginModule required
    3.   username="admin"
    4.   password="12345"
    5.   user_admin="12345";
    6. };
    7. Client {
    8.   org.apache.kafka.common.security.plain.PlainLoginModule required
    9.   username="admin"
    10.   password="12345";
    11. };
    复制代码
  • 修改config/server.properties
    1. # AUTH
    2. security.inter.broker.protocol=SASL_PLAINTEXT
    3. sasl.mechanism.inter.broker.protocol=PLAIN
    4. sasl.enabled.mechanisms=PLAIN
    5. authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    6. allow.everyone.if.no.acl.found=true
    7. listeners=SASL_PLAINTEXT://0.0.0.0:9092
    8. advertised.listeners=SASL_PLAINTEXT://:9092
    9. #将zookeeper.connect改成zookeeper的地址
    10. zookeeper.connect=192.168.6.42:2181
    复制代码
  • 调整kafka的启动脚本 kafka-server-start.sh-Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_server_jaas.conf根据自己的地点进行配置
    1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    2.     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_server_jaas.conf"
    3. fi
    复制代码
  • 启动kafka
    1. ./kafka-server-start.sh -daemon ../config/server.properti
    复制代码
  • 测试
    进入kafka目录,在config目录下创建kafka_client_jaas.conf文件,并写入如下内容。
    1. KafkaClient {  
    2. org.apache.kafka.common.security.plain.PlainLoginModule required  
    3.     username="admin"  
    4.     password="admin";  
    5. };
    复制代码
  • 配置提供者认证,修改提供者启动脚本,vi bin/kafka-console-producer.sh
    1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    2.     export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_client_jaas.conf"
    3. fi
    4. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"                        
    复制代码
  • 启动消费者
    1. ./kafka-console-producer.sh --broker-list 192.168.4.235:9092 --topic testTopic --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
    复制代码
  • 配置消费者认证,修改提供者启动脚本,vi bin/kafka-console-consumer.sh
    1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    2.     export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_client_jaas.conf"
    3. fi
    4. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
    复制代码
  • 启动消费者
    1. ./kafka-console-consumer.sh --bootstrap-server 192.168.4.235:9092 --topic testTopic --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
    复制代码
    有消息输出即为成功
  • springboot的配置
    1. spring:
    2.   kafka:
    3.     # docker http://192.168.2.202:8080
    4.     bootstrap-servers: http://192.168.6.42:9092
    5.     # 配置用户名密码
    6.     producer:
    7.       properties:
    8.         sasl:
    9.             mechanism: PLAIN
    10.         security:
    11.             protocol: SASL_PLAINTEXT
    12.     consumer:
    13.       properties:
    14.         sasl:
    15.             mechanism: PLAIN
    16.         security:
    17.             protocol: SASL_PLAINTEXT
    复制代码
  • 在相干的kafkaConfig中增长相干配置
    1.    @Bean
    2.     public KafkaTemplate kafkaTemplate() {
    3.         Map<String, Object> configs = new HashMap<>();
    4.         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);
    5.         configs.put(ProducerConfig.RETRIES_CONFIG, pro_retry_config);
    6.         configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config);
    7.         configs.put(ProducerConfig.ACKS_CONFIG, acks_config);
    8.         configs.put(ProducerConfig.LINGER_MS_CONFIG, linger_ms_config);
    9.         configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config);
    10.         configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,key_serializer_config);
    11.         configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,value_serializer_config);
    12.         configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression_type_config);
    13.         if (Boolean.valueOf(auth_enabled)) {
    14.             configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name());
    15.             configs.put(SaslConfigs.SASL_MECHANISM, sasl_mechanism);
    16.         }
    17.         DefaultKafkaProducerFactory  producerFactory = new DefaultKafkaProducerFactory(configs);
    18.         return new KafkaTemplate(producerFactory);
    19.     }
    复制代码
  • 最后在启动脚本中新增一个配置
  1. -Djava.security.auth.login.config=客户端登录文件所在的位置
  2. #eg:
  3. -Djava.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf
  4.    
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大号在练葵花宝典

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表