kafka与zookeeper的SSL认证教程

  金牌会员 | 2024-9-9 03:23:38 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 882|帖子 882|积分 2646

作者 乐维社区(forum.lwops.cn)许远
在构建现代的分布式系统时,确保数据传输的安全性至关紧张。Apache Kafka 和 Zookeeper 作为流行的分布式消息队列和协调服务,提供了SSL(Secure Sockets Layer)认证机制,以增强数据传输过程中的安全性。
本文将详细先容从生成SSL证书到设置服务端和客户端的全过程,确保数据在传输过程中得到充实的掩护。
一、设置Kafka账号密码:
1、起首,需要修改kafka设置文件:vim /asop/kafka/kafka_2.11-2.1.0/config/server.properties
broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://10.176.31.137:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/asop/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#假如没有找到ACL(访问控制列表)设置,则答应任何利用。
allow.everyone.if.no.acl.found=false
#需要开启设置超等管理员,设置visitor用户为超等管理员
super.users=User:visitor
2、其次,为server创建登录验证文件,可以根据自己爱好命名文件,如vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf ,文件内容如下
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“visitor”
password=“qaz@123”
user_visitor=“qaz@123”;
};
3、然后修改kafka安装目次vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh,在文件最上面添加变量
export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"

4、接下来为consumer和producer创建登录验证文件,可以根据爱好命名文件,如kafka_client_jaas.conf,文件内容如下(假如是步伐访问,如springboot访问,可以不设置)
vim /asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“visitor”
password=“qaz@123”;
};
5、在consumer.properties和producer.properties里分别加上如下设置:
vim /asop/kafka/kafka_2.11-2.1.0/config/consumer.properties
vim /asop/kafka/kafka_2.11-2.1.0/config/producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
6、修改kafka安装目次bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh,在文件最上面添加变量
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh
vim /asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"

7、分别启动zookeeper和kafka,至此服务端kafka用户登录验证设置完成(先关闭kafka后关闭zookeeper)
关闭服务kafka
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties
启动服务kafka
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties
关闭服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg
启动服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg
8、创建及检察主题
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh --broker-list 10.176.31.137:9092 --topic cmdb --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

接收消息
/asop/kafka/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 10.176.31.137:9092 --topic cmdb --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

二、zk和kafka设置ssal账号密码:

  • Zookeeper 设置 SASL
    1.1 新建 zoo_jaas.conf 文件
    zoo_jaas.conf 文件名、文件地点路径没有特殊要求,一般放置在${ZOOKEEPER_HOME}/conf目次下vim /asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“admin”
password=“admin@12”
user_kafka=“kafka@123”;
};
  1.     Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可
  2.     Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名
复制代码
1.2 设置 /asop/zk/zookeeper-3.4.13/conf/zoo.cfg 文件
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper.sasl.client=true
zookeeper.sasl.client 设置为 true,开启客户端身份验证,否则zoo_jaas.conf中设置的用户名将不起作用,客户端仍然可以无 jaas 文件毗连,只是带有 WARNNING 而已
1.3 导入依赖包
因为使用的权限验证类为:org.apache.kafka.common.security.plain.PlainLoginModule,所以需要 kafka 相干 jar 包,新建文件夹 zk_sasl_lib,如下:从kafka/lib目次下复制以下几个jar包到zookeeper的lib和新建的zk_sasl_lib目次下:
kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
slf4j-log4j12-1.7.28.jar
snappy-java-1.1.7.3.jar
mkdir /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/lib/
cp /asop/kafka/kafka_2.11-2.1.0/libs/kafka-clients-2.1.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/lz4-java-1.5.0.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-api-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/slf4j-log4j12-1.7.25.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
cp /asop/kafka/kafka_2.11-2.1.0/libs/snappy-java-1.1.7.2.jar /asop/zk/zookeeper-3.4.13/zk_sasl_lib
chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/
chmod 755 -R /asop/zk/zookeeper-3.4.13/zk_sasl_lib/
1.4 修改 zkEnv.sh 文件/asop/zk/zookeeper-3.4.13/bin/zkEnv.sh
修改前:假如没有就直接加
export SERVER_JVMFLAGS=“-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS”
修改后:
for jar in /asop/zk/zookeeper-3.4.13/zk_sasl_lib/*.jar;
do
CLASSPATH=“                                   j                         a                         r                         :                              jar:                  jar:CLASSPATH”
done
export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf "
重启 Zookeeper 服务即可
关闭服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh stop /asop/zk/zookeeper-3.4.13/conf/zoo.cfg
启动服务zookeeper-3.4.13
/asop/zk/zookeeper-3.4.13/bin/zkServer.sh start /asop/zk/zookeeper-3.4.13/conf/zoo.cfg

  • Kakfa 设置 SASL
    2.1 新建 kafka_server_jaas.conf 文件
    kafka_server_jaas.conf 文件名和存放路径没有要求,一般放置在${KAFKA_HOME}/config目次下/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“visitor”
password=“qaz@123”
user_visitor=“qaz@123”;
};
Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“kafka”
password=“kafka@123”;
};
  1.     KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上
复制代码
KafkaServer.user_xxx 此中 xxx 必须和 KafkaServer.username 设置的用户名一致,密码也一致
KafkaServer.user_producer、KafkaServer.user_consumer 为了之后的 ACL 做预备,达到消费者生产者使用不同账号且消费者账号只能消费数据,生产者账号只能生产数据
Client.username、Client.password 填写 Zookeeper 中注册的账号密码,用于 broker 与 zk 的通讯(若 zk 没有设置 SASL 可以忽略、若 zookeeper.sasl.client 为 false 也可以忽略只是带有,日志如下)
[2021-06-29 17:14:30,204] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named ‘Client’ was found in specified JAAS configuration file: ‘/Users/wjun/env/kafka/config/kafka_server_jaas.conf’. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
2.2 修改 server.properties 文件
broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://192.168.157.198:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/asop/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#假如没有找到ACL(访问控制列表)设置,则答应任何利用。
allow.everyone.if.no.acl.found=false
#需要开启设置超等管理员,设置visitor用户为超等管理员
super.users=User:visitor
此中 localhost 需要修改成 IP地点
super.users 设置超等用户,该用户不受之后的 ACL 设置影响
2.3 修改启动脚本
修改 kafka-server-start.sh 文件,使之加载到 kafka_server_jaas.conf 文件/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh
修改前:
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS=“-Xmx1G -Xms1G”
fi
修改后:
(先在首行加这一行,假如有了就不消加了)export KAFKA_OPTS=" -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
export KAFKA_HEAP_OPTS=“-Xmx1G -Xms1G -Djava.security.auth.login.config=/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf”
fi
设置zookeeper的ACL规则
/asop/zk/zookeeper-3.4.13/bin/zkCli.sh #进入zk的下令行模式
addauth digest admin:admin@12 #切换登陆用户(超等管理员是在zk的设置文件/asop/zk/zookeeper-3.4.13/conf/zoo_jaas.conf里面)
setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa #(设置可以登陆的IP和用户账号密码,admin是上面的zk的设置文件里面界说的管理员,Kafka用户是/asop/kafka/kafka_2.11-2.1.0/config/kafka_server_jaas.conf文件里面的界说的kafka毗连zk的用户 (Client下面的))
addauth digest kafka:kafka@123 #再切换为kafka用户再设置一次acl
setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa
注意:假如要加白名单IP大概用户要在原来的基础上加,不然会覆盖
setAcl / ip:127.0.0.1:cdrwa,auth:kafka:kafka@123:cdrwa,auth:admin:admin@12:cdrwa,ip:1.1.1.1
需要恢复权限,不设置acl的话就运行
setAcl / world:anyone:cdrwa
重启 kafka 服务即可
关闭服务kafka
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-stop.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties
启动服务kafka
#/asop/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /asop/kafka/kafka_2.11-2.1.0/config/server.properties
至此,完成kafka与zookeeper设置ssl认证。更多运维技巧欢迎关注乐维社区,更多运维题目也欢迎留言提问。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表