这里利用的镜像是 bitnami的,假如利用的是其他项目下的镜像,可能环境变量的参数名不一样,不能直接照抄了,必要看下对应镜像支持的环境变量参数名,后面的值是底层利用的,应该是一样的;
1. 无密码验证部署
1.1 启动 ZooKeeper
创建挂载 zookeeper 数据的文件夹:
- mkdir -p zookeeper/{conf,data,logs}
复制代码 再conf文件夹下创建一个自定义配置文件zoo.cfg:
- # 服务器之间或客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒
- tickTime=2000
- # 集群中leader服务器与follower服务器第一次连接最多次数
- initLimit=10
- # 集群中leader服务器与follower服务器第一次连接最多次数
- syncLimit=5
- # 客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
- clientPort=2181
- # 存放数据文件
- dataDir=/opt/bitnami/zookeeper/data
- # 存放日志文件
- dataLogDir=/opt/bitnami/zookeeper/logs
复制代码 注:假如不配置dataLogDir,那么事务日志也会写在data目次中。这样会严肃影响zookeeper的性能。由于在zookeeper吞吐量很高的时间,产生的事务日志和快照日志太多。
- chown -R 1001.1001 zookeeper
复制代码 利用 docker-compose.yaml
- version: "3"
- services:
- zookeeper:
- image: docker.io/bitnami/zookeeper:3.9
- container_name: zookeeper
- hostname: zookeeper
- privileged: true
- restart: always
- environment:
- volumes:
- - ./zookeeper/data:/opt/bitnami/zookeeper/data
- - ./zookeeper/logs:/opt/bitnami/zookeeper/logs
- - ./zookeeper/conf/zoo.cfg:/opt/bitnami/zookeeper/conf/zoo.cfg
- ports:
- - "12181:2181"
- deploy:
- resources:
- limits:
- cpus: '4'
- memory: 4G
- reservations:
- cpus: '0.5'
- memory: 200M
复制代码 查看启动日志如下:
- [root@localhost deploy]# docker logs -f --tail 20 zookeeper
- 2024-05-03 03:10:19,022 [myid:] - INFO [main:o.e.j.s.AbstractConnector@333] - Started ServerConnector@4b7dc788{HTTP/1.1, (http/1.1)}{}
- 2024-05-03 03:10:19,023 [myid:] - INFO [main:o.e.j.s.Server@415] - Started @1256ms
- 2024-05-03 03:10:19,023 [myid:] - INFO [main:o.a.z.s.a.JettyAdminServer@201] - Started AdminServer on address, port 8080 and command URL /commands
- 2024-05-03 03:10:19,030 [myid:] - INFO [main:o.a.z.s.ServerCnxnFactory@169] - Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory
- 2024-05-03 03:10:19,032 [myid:] - WARN [main:o.a.z.s.ServerCnxnFactory@309] - maxCnxns is not configured, using default value 0.
- 2024-05-03 03:10:19,034 [myid:] - INFO [main:o.a.z.s.NIOServerCnxnFactory@652] - Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 8 worker threads, and 64 kB direct buffers.
- 2024-05-03 03:10:19,036 [myid:] - INFO [main:o.a.z.s.NIOServerCnxnFactory@660] - binding to port
- 2024-05-03 03:10:19,057 [myid:] - INFO [main:o.a.z.s.w.WatchManagerFactory@42] - Using org.apache.zookeeper.server.watch.WatchManager as watch manager
- 2024-05-03 03:10:19,057 [myid:] - INFO [main:o.a.z.s.w.WatchManagerFactory@42] - Using org.apache.zookeeper.server.watch.WatchManager as watch manager
- 2024-05-03 03:10:19,059 [myid:] - INFO [main:o.a.z.s.ZKDatabase@134] - zookeeper.snapshotSizeFactor = 0.33
- 2024-05-03 03:10:19,059 [myid:] - INFO [main:o.a.z.s.ZKDatabase@154] - zookeeper.commitLogCount=500
- 2024-05-03 03:10:19,067 [myid:] - INFO [main:o.a.z.s.p.SnapStream@61] - zookeeper.snapshot.compression.method = CHECKED
- 2024-05-03 03:10:19,067 [myid:] - INFO [main:o.a.z.s.p.FileTxnSnapLog@480] - Snapshotting: 0x0 to /opt/bitnami/zookeeper/data/version-2/snapshot.0
- 2024-05-03 03:10:19,071 [myid:] - INFO [main:o.a.z.s.ZKDatabase@291] - Snapshot loaded in 13 ms, highest zxid is 0x0, digest is 1371985504
- 2024-05-03 03:10:19,073 [myid:] - INFO [main:o.a.z.s.p.FileTxnSnapLog@480] - Snapshotting: 0x0 to /opt/bitnami/zookeeper/data/version-2/snapshot.0
- 2024-05-03 03:10:19,088 [myid:] - INFO [main:o.a.z.s.ZooKeeperServer@589] - Snapshot taken in 15 ms
- 2024-05-03 03:10:19,105 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::o.a.z.s.PrepRequestProcessor@138] - PrepRequestProcessor (sid:0) started, reconfigEnabled=false
- 2024-05-03 03:10:19,106 [myid:] - INFO [main:o.a.z.s.RequestThrottler@75] - zookeeper.request_throttler.shutdownTimeout = 10000 ms
- 2024-05-03 03:10:19,134 [myid:] - INFO [main:o.a.z.s.ContainerManager@83] - Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0
- 2024-05-03 03:10:19,136 [myid:] - INFO [main:o.a.z.a.ZKAuditProvider@42] - ZooKeeper audit is disabled.
复制代码 可以看到 zookeeper 正常启动了,并且末了一行日志显示验证被禁用了,这是由于上面在启动的时间添加了环境变量: ALLOW_ANONYMOUS_LOGIN=yes 答应任何人连接;后面配置 Kafka 密码验证的时间,就不这么配置了;
1.2 查看 zookeeper 状态
- docker exec -it zookeeper bash
复制代码- I have no name!@zookeeper:/$ cd /opt/bitnami/zookeeper/bin
- I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$ ls
- README.txt zkCli.sh zkServer-initialize.sh zkSnapShotToolkit.cmd zkSnapshotComparer.sh zkTxnLogToolkit.cmd
- zkCleanup.sh zkEnv.cmd zkServer.cmd zkSnapShotToolkit.sh zkSnapshotRecursiveSummaryToolkit.cmd zkTxnLogToolkit.sh
- zkCli.cmd zkEnv.sh zkServer.sh zkSnapshotComparer.cmd zkSnapshotRecursiveSummaryToolkit.sh
- I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$
复制代码 查看 zookeeper 状态:
- I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$ zkServer.sh status
- /opt/bitnami/java/bin/java
- ZooKeeper JMX enabled by default
- Using config: /opt/bitnami/zookeeper/bin/../conf/zoo.cfg
- Client port found: 2181. Client address: localhost. Client SSL: false.
- Mode: standalone
- I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$
复制代码 1.3 启动 Kafka
- mkdir -p kafka/data
- chown -R 1001.1001 kafka
复制代码 docker-compose.yaml
- kafka:
- image: docker.io/bitnami/kafka:3.4
- container_name: kafka
- hostname: kafka
- privileged: true
- restart: always
- environment:
- volumes:
- - ./kafka/data:/opt/bitnami/kafka/data
- ports:
- - "9092:9092"
- deploy:
- resources:
- limits:
- cpus: '4'
- memory: 4G
- reservations:
- cpus: '0.5'
- memory: 200M
- depends_on:
- - zookeeper
复制代码 启动 Kafka :docker-compose up -d kafka
- [root@localhost kafka-with-zookeeper]# docker-compose up -d kafka
- [+] Running 2/2
- ⠿ Container zookeeper Running 0.0s
- ⠿ Container kafka Started
复制代码 查看 Kafka 日志:
- [root@localhost ~]# docker logs -f --tail 20 kafka
- [2024-05-03 04:12:03,381] INFO [ReplicaStateMachine controllerId=1001] Triggering online replica state changes (kafka.controller.ZkReplicaStateMachine)
- [2024-05-03 04:12:03,391] INFO [ReplicaStateMachine controllerId=1001] Triggering offline replica state changes (kafka.controller.ZkReplicaStateMachine)
- [2024-05-03 04:12:03,391] DEBUG [ReplicaStateMachine controllerId=1001] Started replica state machine with initial state -> Map() (kafka.controller.ZkReplicaStateMachine)
- [2024-05-03 04:12:03,393] INFO [PartitionStateMachine controllerId=1001] Initializing partition state (kafka.controller.ZkPartitionStateMachine)
- [2024-05-03 04:12:03,394] INFO [PartitionStateMachine controllerId=1001] Triggering online partition state changes (kafka.controller.ZkPartitionStateMachine)
- [2024-05-03 04:12:03,396] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to kafka:9092 (id: 1001 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
- [2024-05-03 04:12:03,399] DEBUG [PartitionStateMachine controllerId=1001] Started partition state machine with initial state -> Map() (kafka.controller.ZkPartitionStateMachine)
- [2024-05-03 04:12:03,400] INFO [Controller id=1001] Ready to serve as the new controller with epoch 1 (kafka.controller.KafkaController)
- [2024-05-03 04:12:03,411] INFO [Controller id=1001] Partitions undergoing preferred replica election: (kafka.controller.KafkaController)
- [2024-05-03 04:12:03,412] INFO [Controller id=1001] Partitions that completed preferred replica election: (kafka.controller.KafkaController)
- [2024-05-03 04:12:03,412] INFO [Controller id=1001] Skipping preferred replica election for partitions due to topic deletion: (kafka.controller.KafkaController)
- [2024-05-03 04:12:03,413] INFO [Controller id=1001] Resuming preferred replica election for partitions: (kafka.controller.KafkaController)
- [2024-05-03 04:12:03,416] INFO [Controller id=1001] Starting replica leader election (PREFERRED) for partitions triggered by ZkTriggered (kafka.controller.KafkaController)
- [2024-05-03 04:12:03,431] INFO [Controller id=1001] Starting the controller scheduler (kafka.controller.KafkaController)
- [2024-05-03 04:12:03,461] INFO [BrokerToControllerChannelManager broker=1001 name=forwarding]: Recorded new controller, from now on will use node kafka:9092 (id: 1001 rack: null) (kafka.server.BrokerToControllerRequestThread)
- [2024-05-03 04:12:03,551] INFO [BrokerToControllerChannelManager broker=1001 name=alterPartition]: Recorded new controller, from now on will use node kafka:9092 (id: 1001 rack: null) (kafka.server.BrokerToControllerRequestThread)
- [2024-05-03 04:12:08,434] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
- [2024-05-03 04:12:08,435] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
- [2024-05-03 04:17:08,439] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
- [2024-05-03 04:17:08,439] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
复制代码 可以看到 kafka 正常启动了。
1.4 Kafka 配置文件
进入 Kafka 容器,看下上面什么环境变量都没配置时,配置文件server.properties的内容:
- I have no name!@kafka:/opt/bitnami/kafka/config$ cat server.properties
- ############################# Server Basics #############################
- # The id of the broker. This must be set to a unique integer for each broker.
- #broker.id=0
- ############################# Socket Server Settings #############################
- # The address the socket server listens on. If not configured, the host name will be equal to the value of
- # java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
- # listeners = listener_name://host_name:port
- # listeners = PLAINTEXT://your.host.name:9092
- #listeners=PLAINTEXT://:9092
- # Listener name, hostname and port the broker will advertise to clients.
- # If not set, it uses the value for "listeners".
- #advertised.listeners=PLAINTEXT://your.host.name:9092
- # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
- # The number of threads that the server uses for receiving requests from the network and sending responses to the network
- num.network.threads=3
- # The number of threads that the server uses for processing requests, which may include disk I/O
- num.io.threads=8
- # The send buffer (SO_SNDBUF) used by the socket server
- socket.send.buffer.bytes=102400
- # The receive buffer (SO_RCVBUF) used by the socket server
- socket.receive.buffer.bytes=102400
- # The maximum size of a request that the socket server will accept (protection against OOM)
- socket.request.max.bytes=104857600
- ############################# Log Basics #############################
- # A comma separated list of directories under which to store log files
- log.dirs=/bitnami/kafka/data
- # The default number of log partitions per topic. More partitions allow greater
- # parallelism for consumption, but this will also result in more files across
- # the brokers.
- num.partitions=1
- # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
- # This value is recommended to be increased for installations with data dirs located in RAID array.
- num.recovery.threads.per.data.dir=1
- ############################# Internal Topic Settings #############################
- # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
- # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- ############################# Log Flush Policy #############################
- # Messages are immediately written to the filesystem but by default we only fsync() to sync
- # the OS cache lazily. The following configurations control the flush of data to disk.
- # There are a few important trade-offs here:
- # 1. Durability: Unflushed data may be lost if you are not using replication.
- # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
- # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
- # The settings below allow one to configure the flush policy to flush data after a period of time or
- # every N messages (or both). This can be done globally and overridden on a per-topic basis.
- # The number of messages to accept before forcing a flush of data to disk
- #log.flush.interval.messages=10000
- # The maximum amount of time a message can sit in a log before we force a flush
- #log.flush.interval.ms=1000
- ############################# Log Retention Policy #############################
- # The following configurations control the disposal of log segments. The policy can
- # be set to delete segments after a period of time, or after a given size has accumulated.
- # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
- # from the end of the log.
- # The minimum age of a log file to be eligible for deletion due to age
- log.retention.hours=168
- # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
- # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
- #log.retention.bytes=1073741824
- # The maximum size of a log segment file. When this size is reached a new log segment will be created.
- #log.segment.bytes=1073741824
- # The interval at which log segments are checked to see if they can be deleted according
- # to the retention policies
- log.retention.check.interval.ms=300000
- ############################# Zookeeper #############################
- # Zookeeper connection string (see zookeeper docs for details).
- # This is a comma separated host:port pairs, each corresponding to a zk
- # server. e.g. ",,".
- # You can also append an optional chroot string to the urls to specify the
- # root directory for all kafka znodes.
- zookeeper.connect=zookeeper:2181
- # Timeout in ms for connecting to zookeeper
- #zookeeper.connection.timeout.ms=18000
- ############################# Group Coordinator Settings #############################
- # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
- # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
- # The default value for this is 3 seconds.
- # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
- # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
- #group.initial.rebalance.delay.ms=0
- sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
复制代码 这里配置文件的值都可以利用环境变量来指定;
服务底子配置:broker.id 服务器节点ID,同一集群中必须唯一
可以看到,脚本中没有设置 listeners 和 advertised.listeners 的值,配置文件中这两项的配置是注释掉的,假如必要外部呆板访问当前 Kafka 则必要配置这两项:
- 第一组是明文端口9092,可以利用服务名kafka进行访问;
- 第三组是外部呆板明文访问,9094端口,在现实利用时,localhost 必要改为当前呆板的IP;
1.4 利用下令操作 Kafak 生产、消费
进入 Kafka 容器:
- docker exec -it kafka bash
- cd /opt/bitnami/bin/
复制代码- I have no name!@kafka:/opt/bitnami/kafka/bin$ ls
- connect-distributed.sh kafka-console-producer.sh kafka-leader-election.sh kafka-run-class.sh kafka-verifiable-producer.sh
- connect-mirror-maker.sh kafka-consumer-groups.sh kafka-log-dirs.sh kafka-server-start.sh trogdor.sh
- connect-standalone.sh kafka-consumer-perf-test.sh kafka-metadata-quorum.sh kafka-server-stop.sh windows
- kafka-acls.sh kafka-delegation-tokens.sh kafka-metadata-shell.sh kafka-storage.sh zookeeper-security-migration.sh
- kafka-broker-api-versions.sh kafka-delete-records.sh kafka-mirror-maker.sh kafka-streams-application-reset.sh zookeeper-server-start.sh
- kafka-cluster.sh kafka-dump-log.sh kafka-producer-perf-test.sh kafka-topics.sh zookeeper-server-stop.sh
- kafka-configs.sh kafka-features.sh kafka-reassign-partitions.sh kafka-transactions.sh zookeeper-shell.sh
- kafka-console-consumer.sh kafka-get-offsets.sh kafka-replica-verification.sh kafka-verifiable-consumer.sh
- I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码 这里看下脚本中的配置KAFKA_HEAP_OPTS:
- I have no name!@kafka:/opt/bitnami/kafka/bin$ cat kafka-console-producer.sh
- #!/bin/bash
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-Xmx512M"
- fi
- exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
- I have no name!@kafka:/opt/bitnami/kafka/bin$
- I have no name!@kafka:/opt/bitnami/kafka/bin$ cat kafka-console-consumer.sh
- #!/bin/bash
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-Xmx512M"
- fi
- exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
- I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码 可以看到生产者和消费者的参数配置仅配置了一个环境变量:KAFKA_HEAP_OPTS,并且内容仅为 -Xmx512M,记着这个玩意,等下密码验证时,必要对这个值进行扩展;
1.4.1 创建topic
创建名为 test,partitions(分区)为10,replication(副本)为1的topic
- I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --create --bootstrap-server kafka:9092 --partitions 10 --replication-factor 1 --topic test
- Created topic test.
- I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码 这里利用的是 kafka 来访问的,假如配置了 KAFKA_CFG_ADVERTISED_LISTENERS 也可以利用当前呆板的Ip进行访问;
1.4.2 查看某个 topic
查看 Topic 分区情况,利用 describe
- I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic test
- Topic: test TopicId: nAakv4u8SJO022gRe-ahVQ PartitionCount: 10 ReplicationFactor: 1 Configs:
- Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
- Topic: test Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
- Topic: test Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
- Topic: test Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001
- Topic: test Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
- Topic: test Partition: 5 Leader: 1001 Replicas: 1001 Isr: 1001
- Topic: test Partition: 6 Leader: 1001 Replicas: 1001 Isr: 1001
- Topic: test Partition: 7 Leader: 1001 Replicas: 1001 Isr: 1001
- Topic: test Partition: 8 Leader: 1001 Replicas: 1001 Isr: 1001
- Topic: test Partition: 9 Leader: 1001 Replicas: 1001 Isr: 1001
- I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码 1.4.3 获取所有 topic
利用关键字 list
- I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --list --bootstrap-server kafka:9092
- test
- I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码 1.4.4 删除 topic
- I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic test
- I have no name!@kafka:/opt/bitnami/kafka/bin$
- I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --list --bootstrap-server kafka:9092
- I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码 1.4.4 发送消息
首先创建一个 topic:
- kafka-topics.sh --create --bootstrap-server kafka:9092 --partitions 10 --replication-factor 1 --topic test
复制代码 发送两条消息
- I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-console-producer.sh --broker-list kafka:9092 --topic test
- >hello
- >hello everyone
- >
复制代码 1.4.5 消费消息
再打开一个窗口,作为消费者连接 kafka
- [root@localhost ~]# docker exec -it kafka bash
- I have no name!@kafka:/$
- I have no name!@kafka:/$ cd /opt/bitnami/kafka/bin/
- I have no name!@kafka:/opt/bitnami/kafka/bin$
- I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
- hello
- hello everyone
复制代码 可以看到,已经正常消费了刚才天生的两条消息;
2. 有密码部署
这里利用SASL_PLAINTEXT模式,也就是明文密码,明文密码是直接配置在配置文件中,假如必要更安全的操作,可以利用jks证书的方式来配置 SASL_SSL密码验证。可参考:kafka-generate-ssl.sh
2.1 利用 SASL 验证模式
这里利用 无密码的 zookeeper,来搭配有密码的 Kafka;
首先创建一个配置文件secrets/kafka_server_jaas.conf,用来配置 Kafka 的账号和密码:
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="Abc123456"
- user_admin="Abc123456"
- user_producer="Abc123456"
- user_consumer="Abc123456";
- };
- KafkaClient {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="producer"
- password="Abc123456";
- };
复制代码 该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule指定采用PLAIN机制,定义了用户。
- usemame和password指定该署理与集群其他署理初始化连接的用户名和密码
- "user_"为前缀后接用户名方式创建连接署理的用户名和密码,比方,user_producer=“Abc123456”是指用户名为producer,密码为Abc123456
- username为admin的用户,和user为admin的用户,密码要保持划一,否则会认证失败
- chown -R 1001.1001 secrets
复制代码 接着是 doker-compose 中的 kafka 配置调解为:
- kafka:
- image: docker.io/bitnami/kafka:3.4
- container_name: kafka
- hostname: kafka
- privileged: true
- restart: always
- environment:
- KAFKA_OPTS: '-Djava.security.auth.login.config=/opt/bitnami/kafka/secrets/kafka_server_jaas.conf'
- KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- # 平替默认配置文件里面的值
- volumes:
- - ./kafka/data:/opt/bitnami/kafka/data
- - ./secrets:/opt/bitnami/kafka/secrets
- ports:
- - "9092:9092"
- deploy:
- resources:
- limits:
- cpus: '4'
- memory: 4G
- reservations:
- cpus: '0.5'
- memory: 200M
- depends_on:
- - zookeeper
复制代码 启动成功日志末端为:
- [2024-05-03 07:26:17,096] TRACE [Controller id=0] Leader imbalance ratio for broker 1001 is 1.0 (kafka.controller.KafkaController)
- [2024-05-03 07:26:17,102] INFO [Controller id=0] Starting replica leader election (PREFERRED) for partitions triggered by AutoTriggered (kafka.controller.KafkaController)
- [2024-05-03 07:26:20,808] INFO [RequestSendThread controllerId=0] Controller 0 connected to (id: 0 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
- [2024-05-03 07:26:20,850] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use node (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
- [2024-05-03 07:26:20,859] INFO [Broker id=0] Add 60 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 0 epoch 2 with correlation id 1 (state.change.logger)
- [2024-05-03 07:26:20,927] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use node (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
复制代码 可以看到,由于指定了环境变量:Broker ID 这里也确实见效了,由默认的ID 1001 改成了我们设置的 0;
2.2 客户端验证
进入容器,测试下令行操作 Kafka
利用原来操作 Topic 的下令:
- kafka-console-producer.sh --bootstrap-server --topic test --producer.config /opt/bitnami/kafka/config/producer.properties
复制代码 上面下令不好使,执行会住,查看日志可以看到一直循环打印下面的信息:
- [2024-05-03 07:53:41,938] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with / (channelId= (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
复制代码 发现一直在报错,然后新加一个配置文件client.properties,内容如下:
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=PLAIN
- sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="Abc123456";
复制代码 接着利用查看 Topic 的下令:
- kafka-topics.sh --list --bootstrap-server kafka:9092 --command-config /opt/bitnami/kafka/secrets/client.properties
- __consumer_offsets
- test
- test1
复制代码 可以看到,可以正常看到 topic 列表;
查看某个 Topic 信息:
- I have no name!@kafka:/opt/bitnami/kafka/config$ kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic test --command-config /opt/bitnami/kafka/secrets/client.properties
- Topic: test TopicId: _rY4JuTlQ-KRWdhbmIpklQ PartitionCount: 10 ReplicationFactor: 1 Configs:
- Topic: test Partition: 0 Leader: none Replicas: 1001 Isr: 1001
- Topic: test Partition: 1 Leader: none Replicas: 1001 Isr: 1001
- Topic: test Partition: 2 Leader: none Replicas: 1001 Isr: 1001
- Topic: test Partition: 3 Leader: none Replicas: 1001 Isr: 1001
- Topic: test Partition: 4 Leader: none Replicas: 1001 Isr: 1001
- Topic: test Partition: 5 Leader: none Replicas: 1001 Isr: 1001
- Topic: test Partition: 6 Leader: none Replicas: 1001 Isr: 1001
- Topic: test Partition: 7 Leader: none Replicas: 1001 Isr: 1001
- Topic: test Partition: 8 Leader: none Replicas: 1001 Isr: 1001
- Topic: test Partition: 9 Leader: none Replicas: 1001 Isr: 1001
- I have no name!@kafka:/opt/bitnami/kafka/config$
- I have no name!@kafka:/opt/bitnami/kafka/config$ kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic test1 --command-config /opt/bitnami/kafka/secrets/client.properties
- Topic: test1 TopicId: pNlhOdhjQtaemjVB3jjM0g PartitionCount: 1 ReplicationFactor: 1 Configs:
- Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- I have no name!@kafka:/opt/bitnami/kafka/config$
复制代码 现在题目来了,在生产者天生消息和消费消息时就噶了,天生消息也不报错,消费消息也不报错,但是消费者那不到消息,就很烦了。。。。。
既然下令行不行,那就换个其他的工具,本来还想省事呢,再加个 kafka-ui 瞅一眼;
3. 利用可视化工具 Kafka-ui
3.1 Kafka-ui 的底子利用
在上面 docker-compose 脚本中添加 Kafka-ui 的服务配置:
- kafka-ui:
- image: provectuslabs/kafka-ui
- container_name: kafka-ui
- hostname: kafka-ui
- privileged: true
- restart: always
- environment:
- #volumes:
- # - ~/kui/config.yml:/etc/kafkaui/dynamic_config.yaml
- ports:
- - "8080:8080"
- deploy:
- resources:
- limits:
- cpus: '1'
- memory: 2G
- reservations:
- cpus: '0.5'
复制代码 配置上面的内容,启动 kafka-ui 服务后,可以看到正常启动的日志如下:
- [root@localhost ~]# docker logs -f --tail 20 kafka-ui
- Standard Commons Logging discovery in action with spring-jcl: please remove commons-logging.jar from classpath in order to avoid potential conflicts
- _ _ ___ __ _ _ _ __ __ _
- | | | |_ _| / _|___ _ _ /_\ _ __ __ _ __| |_ ___ | |/ /__ _ / _| |_____
- | |_| || | | _/ _ | '_| / _ \| '_ / _` / _| ' \/ -_) | ' </ _` | _| / / _`|
- \___/|___| |_| \___|_| /_/ \_| .__\__,_\__|_||_\___| |_|\_\__,_|_| |_\_\__,|
- |_|
- 2024-05-04 02:37:44,032 WARN [main] c.p.k.u.u.DynamicConfigOperations: Dynamic config file /etc/kafkaui/dynamic_config.yaml doesnt exist or not readable
- 2024-05-04 02:37:44,098 INFO [main] c.p.k.u.KafkaUiApplication: Starting KafkaUiApplication using Java 17.0.6 with PID 1 (/kafka-ui-api.jar started by kafkaui in /)
- 2024-05-04 02:37:44,099 DEBUG [main] c.p.k.u.KafkaUiApplication: Running with Spring Boot v3.1.3, Spring v6.0.11
- 2024-05-04 02:37:44,100 INFO [main] c.p.k.u.KafkaUiApplication: No active profile set, falling back to 1 default profile: "default"
- 2024-05-04 02:37:59,321 INFO [main] c.p.k.u.c.a.BasicAuthSecurityConfig: Configuring LOGIN_FORM authentication.
- 2024-05-04 02:38:00,411 INFO [main] o.s.b.a.e.w.EndpointLinksResolver: Exposing 3 endpoint(s) beneath base path '/actuator'
- 2024-05-04 02:38:03,523 INFO [main] o.s.b.w.e.n.NettyWebServer: Netty started on port 8080
- 2024-05-04 02:38:03,615 INFO [main] c.p.k.u.KafkaUiApplication: Started KafkaUiApplication in 22.402 seconds (process running for 25.788)
复制代码 这里并没有初始化配置连接 Kafka 的信息,仅仅在官方 Demo 上配置了个登录验证,登录页面如下:

这里登录的账号和密码就是上面脚本中配置的 admin和admin123;
登录成功之后,会自动进入 Kafka-ui 的控制台页面:http://IP:8080/ui/clusters/create-new-cluster,来配置连接 Kafka 集群的信息;

这里我们必要配置下必填项,如集群名称Cluster name,是否只读一样平常是不勾选的,我们可以利用这个来测试 Kafka 的发送消息,接着是 Bootstrap Servers 这里假如是kafka 集群,则可以配置多个;
假如没有配置 Kafka 的SASL验证,可以直接拉到最下面 利用 Validate 来测试是否能正常连接 Kafka;
由于我这里配置了 SASL验证,所以在 Authentication 这一项里面,配置Authentication Method为 SASL/PLAIN,Security Protocol 配置为:SASL_PLAINTEXT,接着勾选:Secured with auth?, 配置用户名和密码;
这里必要注意的是,用户名和密码,得是 Kafka 的 KafkaClient 里面配置的用户,不能是 KafkaServer里面的,否则在下面验证时报错:

KafkaClient 里面配置的用户:producer 再次验证 
可以看到,连接成功了,也就是说这里的用户名和密码配置是作为 Kafka 的客户端来连接的。
点击 Submit 按钮,提交当前配置,自动进入 Kafka-UI 的控制台页面;

可以看到控制台面板下显示了我们的集群:test, 点击下面的 Brokes 、Topic 、Consumers 可以正常显示我们的集群信息:

但凡在连接集群的时间那里配置的不对的,在上面菜单中,就可能不会正常显示我们的 Kafka 集群信息;
3.2 Kafka-ui 的配置文件
- [root@localhost ~]# docker exec -it kafka-ui sh
- / $
- / $ cd /etc/kafkaui/
- /etc/kafkaui $ ls
- dynamic_config.yaml
- /etc/kafkaui $
- /etc/kafkaui $ cat dynamic_config.yaml
- auth:
- type: LOGIN_FORM
- kafka:
- clusters:
- - bootstrapServers: kafka:9092
- name: test
- properties:
- security.protocol: SASL_PLAINTEXT
- sasl.mechanism: PLAIN
- sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required
- username="producer" password="Abc123456";
- readOnly: false
- rbac:
- roles: []
- webclient: {}
- /etc/kafkaui $
复制代码 可以看到这个文件里面的内容,也就是上面配置的连接 Kafka 集群信息;
- kafka:
- clusters:
- - bootstrapServers: xxx
复制代码 但是我试了下直接把上面配置文件的内容转成如下的环境变量:
- - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
- - KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="Abc123456";'
复制代码 但是启动的时间就报错了。然后还是注释掉,在启动 Kafka-ui 之后,在页面配置要连接的 Kafka 集群信息就好了。
假如在配置Kafka账号的时间提示验证失败,这玩意好像有点不稳固还是怎么着,多试几下 说不定就好了。
假如账号配置错误,在连接 kafka 的时间,会报错,比如我刚开始配置了 KafkaClient 的账号,然后就报错了,异常如下:
- 2024-05-04 02:28:06,338 ERROR [parallel-1] c.p.k.u.s.StatisticsService: Failed to collect cluster local info
- java.lang.IllegalStateException: Error while creating AdminClient for Cluster local
- at com.provectus.kafka.ui.service.AdminClientServiceImpl.lambda$createAdminClient$5(AdminClientServiceImpl.java:56)
- at reactor.core.publisher.Mono.lambda$onErrorMap$28(Mono.java:3783)
- at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
- at reactor.core.publisher.Operators.error(Operators.java:198)
- at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:135)
- at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
- at reactor.core.publisher.Mono.subscribe(Mono.java:4480)
- at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
- at reactor.core.publisher.Operators.complete(Operators.java:137)
- at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
- at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
- at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:427)
- at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
- at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
- at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
- at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
- at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
- at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
- at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
- at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
- at java.base/java.lang.Thread.run(Thread.java:833)
- Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
- at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:551)
- at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:488)
- at org.apache.kafka.clients.admin.Admin.create(Admin.java:134)
- at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
- at com.provectus.kafka.ui.service.AdminClientServiceImpl.lambda$createAdminClient$2(AdminClientServiceImpl.java:53)
- at reactor.core.publisher.MonoSupplier.call(MonoSupplier.java:67)
- at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:127)
- ... 16 common frames omitted
- Caused by: java.lang.IllegalArgumentException: Login module control flag not specified in JAAS config
- at org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:110)
- at org.apache.kafka.common.security.JaasConfig.<init>(JaasConfig.java:63)
- at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:93)
- at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:87)
- at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:167)
- at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
- at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
- at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:522)
- ... 22 common frames omitted
- 2024-05-04 02:28:06,339 DEBUG [parallel-1] c.p.k.u.s.ClustersStatisticsScheduler: Metrics updated for cluster: local
复制代码 这里要配置成 KafkaServer 的 admin 账号;假如 KafkaServer 的 admin 账号也不好使,就再试KafkaClient 的账号。只要 Kafka 配置的没题目,这里连接应该也题目不大。
4. 题目记录
4.1 消费者监听Topic时异常:Replication factor: 3 larger than available brokers: 1.
- [2024-06-05 09:51:42,266] INFO [Admin Manager on Broker 0]: Error processing create topic request CreatableTopic(name='__consumer_offsets', numPartitions=50, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='compression.type', value='producer'), CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='segment.bytes', value='104857600')]) (kafka.server.ZkAdminManager)
- org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
复制代码 这个异常是由于 Kafka 环境配置题目,错误信息中显示在尝试创建一个名为__consumer_offsets的主题,该主题的复制因子(replicationFactor)被设置为3,但是Kafka集群中当前可用的Broker数量只有1个。在Kafka中,复制因子表现一个分区的数据将被复制到多少个Broker上,以提供数据冗余和容错本领。
我的 Kafak 服务配置了环境变量:
- # 平替默认配置文件里面的值 TOPIC_REPLICATION_FACTOR 为集群中节点个数
- # 控制着与事务相关的日志主题的复制因子
复制代码 在报错的时间,这里KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR配置的是3,集群中只有一个节点,所以报错了,现在改成1;
在修改了上述设置后,你可能必要重新创建或更新__consumer_offsets主题, 这里直接删除 zookeeper 的数据,重新启动 zookeeper ,然后再重新创建 Kafka 服务。这时间生产者发送消息、消费者消费消息都可以正常工作了。
