【Kafka】手把手SASL,SSL教学

打印 上一主题 下一主题

主题 913|帖子 913|积分 2739

Kafka设置SASL

1.确定使用的SASL协议

Kafka支持以下SASL机制:GSSAPI 、PLAIN、 SCRAM-SHA-256、 SCRAM-SHA-512、 OAUTHBEARER。
本指南主要以SCRAM机制设置为主。
2.准备用户凭据

当使用SCRAM机制时,Kafka使用Zookeeper存储用户加密后的凭据,所以需要先使用Kafka提供的脚本举行用户的创建。
好比创建用户名为kafkaAdmin,暗码为admin用户的操纵下令如下:
  1. > bin/Kafka-configs.sh --zookeeper localhost:2182 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name KafkaAdmin
复制代码
3.设置JAAS安全验证文件

Kafka使用Java的JAAS机制举行安全验证文件的加载,起首创建文件命名为broker_jaas.config,内容如下:
  1. KafkaServer {
  2.         org.apache.kafka.common.security.scram.ScramLoginModule required
  3.         username="KafkaAdmin"
  4.         password="admin";
  5. };
复制代码
然后编辑/bin/kafka-server-start.sh文件,使用Java的-Djava.security.auth.login.config参数指定JAAS文件位置。
  1. [kafka@CTSP1 bin]$ cat kafka-server-start.sh
  2. #!/bin/bash
  3. export JMX_PORT=9876
  4. export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=10.10.92.17"
  5. #指定JAAS文件
  6. export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/config/broker_jaas.config"
  7. if [ $# -lt 1 ];
  8. then
  9.         echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
  10.         exit 1
  11. fi
  12. base_dir=$(dirname $0)
  13. if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
  14.     export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
  15. fi
  16. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  17.     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G "
  18. fi
  19. EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
  20. COMMAND=$1
  21. case $COMMAND in
  22.   -daemon)
  23.     EXTRA_ARGS="-daemon "$EXTRA_ARGS
  24.     shift
  25.     ;;
  26.   *)
  27.     ;;
  28. esac
  29. exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
复制代码
4.设置server.properties文件

修改/config/server.properties文件,启用安全认证协议。修改项如下:
  1. #指定客户端连接使用的协议以及地址,端口
  2. listeners=SASL_PLAINTEXT://127.0.0.1:9092
  3. #指定服务器之间使用的安全连接协议,可选的协议有PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。
  4. security.inter.broker.protocol=SASL_PLAINTEXT
  5. #指定启用的SASL认证机制,可选项有PLAIN,GSSAPI,SCRAM。
  6. sasl.enabled.mechanisms=SCRAM-SHA-256
  7. #指定集群内部的认证机制
  8. sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
复制代码
5.设置Kafka客户端

客户端可以通过指定JAAS文件的方式使用或者在客户端创建的Producer或者Consumer的属性中添加对应的键值对使用。
设置客户端JAAS格式文件如下:
  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=SCRAM-SHA-256
  3. sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafkaAdmin" password="admin";
复制代码
在bin目录中的脚本文件,可以通过–command-config参数指定JAAS文件,或者是生产者脚本的–producer.config或者消费者脚本的–consumer.config参数指定JAAS文件举行验证。
Kafka设置SSL

1.为每个Broker天生 SSL 密钥和证书

为集群的每一个broker天生对应节点的密钥对,执行下令后会要求设置暗码,这个暗码需要记下来已备后面使用。
以下下令为天生一个10年的有效期的密钥对,暗码为123456 :
  1. keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkey -keyalg RSA -storetype pkcs12
复制代码
注意点:Kafka默认启用了SSL的主机名验证功能,可以设置设置文件参数ssl.endpoint.identification.algorithm为空方便测试。启用验证的时间,制作密钥对需要添加SAN信息,握手过程中会对当前毗连的域名和地址举行校验,假如不是预期的域名和IP则握手失败。
  1. keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkey -keyalg RSA -storetype pkcs12
  2. -ext SAN=DNS:{填写域名},IP:{填写ip地址}
复制代码
2.天生CA证书

CA证书用于给不同的通信端提供权威授权的认证,当某个节点接收到SSL请求时,先验证是否证书是否为CA签名,假如CA签名则信托该请求。一般公司内部,可以设置一台机器作为CA认证服务中央。
由于OpenSSL的bug,x509 模块不会将请求的扩展字段从 CSR 复制到终极证书中。所以制作ca需要依赖指定config的方式。
ca.cnf文件内容如下:
  1. HOME            = .
  2. RANDFILE        = $ENV::HOME/.rnd
  3. ####################################################################
  4. [ ca ]
  5. default_ca    = CA_default      # The default ca section
  6. [ CA_default ]
  7. base_dir      = .
  8. certificate   = $base_dir/ca-cert   # The CA certifcate
  9. private_key   = $base_dir/cakey.pem    # The CA private key
  10. new_certs_dir = $base_dir              # Location for new certs after signing
  11. database      = $base_dir/index.txt    # Database index file
  12. serial        = $base_dir/serial.txt   # The current serial number
  13. default_days     = 1000         # How long to certify for
  14. default_crl_days = 30           # How long before next CRL
  15. default_md       = sha256       # Use public key default MD
  16. preserve         = no           # Keep passed DN ordering
  17. x509_extensions = ca_extensions # The extensions to add to the cert
  18. email_in_dn     = no            # Don't concat the email in the DN
  19. copy_extensions = copy          # Required to copy SANs from CSR to cert
  20. ####################################################################
  21. [ req ]
  22. default_bits       = 4096
  23. default_keyfile    = cakey.pem
  24. distinguished_name = ca_distinguished_name
  25. x509_extensions    = ca_extensions
  26. string_mask        = utf8only
  27. ####################################################################
  28. [ ca_distinguished_name ]
  29. countryName         = Country Name (2 letter code)
  30. countryName_default = DE
  31. stateOrProvinceName         = State or Province Name (full name)
  32. stateOrProvinceName_default = Test Province
  33. localityName                = Locality Name (eg, city)
  34. localityName_default        = Test Town
  35. organizationName            = Organization Name (eg, company)
  36. organizationName_default    = Test Company
  37. organizationalUnitName         = Organizational Unit (eg, division)
  38. organizationalUnitName_default = Test Unit
  39. commonName         = Common Name (e.g. server FQDN or YOUR name)
  40. commonName_default = Test Name
  41. emailAddress         = Email Address
  42. emailAddress_default = test@test.com
  43. ####################################################################
  44. [ ca_extensions ]
  45. subjectKeyIdentifier   = hash
  46. authorityKeyIdentifier = keyid:always, issuer
  47. basicConstraints       = critical, CA:true
  48. keyUsage               = keyCertSign, cRLSign
  49. ####################################################################
  50. [ signing_policy ]
  51. countryName            = optional
  52. stateOrProvinceName    = optional
  53. localityName           = optional
  54. organizationName       = optional
  55. organizationalUnitName = optional
  56. commonName             = supplied
  57. emailAddress           = optional
  58. ####################################################################
  59. [ signing_req ]
  60. subjectKeyIdentifier   = hash
  61. authorityKeyIdentifier = keyid,issuer
  62. basicConstraints       = CA:FALSE
  63. keyUsage               = digitalSignature, keyEncipherment
复制代码
然后创建数据库和序列号文件,这些文件将用于跟踪使用该 CA 签订的证书。这两个文件都是简单的文本文件,与您的 CA 密钥位于同一目录中。
  1. echo 01 > serial.txt
  2. touch index.txt
复制代码
  1. #需要主机认证的时候
  2. openssl req -x509 -config ca.cnf -newkey rsa:4096 -sha256 -nodes -out ca-cert -outform PEM
  3. #不需要主机认证
  4. openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
复制代码
3.创建信托库

将天生的CA添加到服务端和客户端的信托库,以便服务端和客户端可以信托这个CA:
  1. keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
复制代码
  1. keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
复制代码
4.签名证书

用步调2天生的CA签名步调1天生的证书。起首需要导出请求文件。
  1. keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
复制代码
cert-file: 服务器的未签名证书
然后用CA举行签名认证。
  1. #不需要主机认证的时候
  2. openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456
  3. #需要主机验证的时候
  4. openssl ca -config ca.cnf -policy signing_policy -extensions signing_req -out cert-signed -infiles cert-file
复制代码
末了,你需要导入CA的证书和已签名的证书到密钥仓库:
  1. keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
复制代码
  1. keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
复制代码
备注:假如服务端对客户端的毗连有认证需求,可以使用相同的步调天生客户端的keystore文件,然后使用CA证书举行认证。
5.服务端设置

假如需要启用SSL,天生对应的证书文件后,server.properties如下设置:
  1. #如果broker之间不需要启用SSL,则同时需要配置SSL和PLAINTEXT
  2. listeners=PLAINTEXT://host.name:port,SSL://host.name:port
  3. #如果broker之间也使用SSL,则需要设置
  4. #security.inter.broker.protocol=SSL
  5. #listeners=SSL://host.name:port
  6. ssl.keystore.location=/var/private/ssl/server.keystore.jks
  7. ssl.keystore.password=123456
  8. ssl.key.password=123456
  9. ssl.truststore.location=/var/private/ssl/server.truststore.jks
  10. ssl.truststore.password=123456
  11. #关闭主机名验证时设置以下参数
  12. ssl.endpoint.identification.algorithm=
复制代码
6.客户端设置文件

但从 Kafka 2.0.0 开始,默认为客户端毗连以及代理间毗连启用服务器主机名验证,可以通过设置ssl.endpoint.identification.algorithm为空字符串来关闭主机名验证。
  1. security.protocol=SSL
  2. ssl.truststore.location=/var/private/ssl/client.truststore.jks
  3. ssl.truststore.password=123456
  4. #关闭主机名验证时设置以下参数
  5. ssl.endpoint.identification.algorithm=
复制代码
7.测试方法

  1. kafka-console-producer.sh --bootstrap-server localhost:9095 --topic test --producer.config client-ssl.properties
  2. kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic test --consumer.config client-ssl.properties
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

干翻全岛蛙蛙

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表