利用 Docker-Compose 部署 ZooKeeper + Kafka + Kafka-UI

打印 上一主题 下一主题

主题 513|帖子 513|积分 1549

这里利用的镜像是 bitnami的,假如利用的是其他项目下的镜像,可能环境变量的参数名不一样,不能直接照抄了,必要看下对应镜像支持的环境变量参数名,后面的值是底层利用的,应该是一样的;
1. 无密码验证部署

1.1 启动 ZooKeeper

创建挂载 zookeeper 数据的文件夹:
  1. mkdir -p zookeeper/{conf,data,logs}
复制代码
再conf文件夹下创建一个自定义配置文件zoo.cfg:
  1. # 服务器之间或客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒
  2. tickTime=2000
  3. # 集群中leader服务器与follower服务器第一次连接最多次数
  4. initLimit=10
  5. # 集群中leader服务器与follower服务器第一次连接最多次数
  6. syncLimit=5
  7. # 客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
  8. clientPort=2181
  9. # 存放数据文件
  10. dataDir=/opt/bitnami/zookeeper/data
  11. # 存放日志文件
  12. dataLogDir=/opt/bitnami/zookeeper/logs
复制代码
  注:假如不配置dataLogDir,那么事务日志也会写在data目次中。这样会严肃影响zookeeper的性能。由于在zookeeper吞吐量很高的时间,产生的事务日志和快照日志太多。
  修改文件夹权限
  1. chown -R 1001.1001 zookeeper
复制代码
利用 docker-compose.yaml
  1. version: "3"
  2. services:
  3.   
  4.   zookeeper:
  5.     image: docker.io/bitnami/zookeeper:3.9
  6.     container_name: zookeeper
  7.     hostname: zookeeper
  8.     privileged: true
  9.     restart: always
  10.     environment:
  11.       ALLOW_ANONYMOUS_LOGIN: yes
  12.     volumes:
  13.       - ./zookeeper/data:/opt/bitnami/zookeeper/data
  14.       - ./zookeeper/logs:/opt/bitnami/zookeeper/logs
  15.       - ./zookeeper/conf/zoo.cfg:/opt/bitnami/zookeeper/conf/zoo.cfg
  16.     ports:
  17.       - "12181:2181"
  18.     deploy:
  19.       resources:
  20.         limits:
  21.           cpus: '4'
  22.           memory: 4G
  23.         reservations:
  24.           cpus: '0.5'
  25.           memory: 200M
复制代码
查看启动日志如下:
  1. [root@localhost deploy]# docker logs -f --tail 20 zookeeper
  2. 2024-05-03 03:10:19,022 [myid:] - INFO  [main:o.e.j.s.AbstractConnector@333] - Started ServerConnector@4b7dc788{HTTP/1.1, (http/1.1)}{0.0.0.0:8080}
  3. 2024-05-03 03:10:19,023 [myid:] - INFO  [main:o.e.j.s.Server@415] - Started @1256ms
  4. 2024-05-03 03:10:19,023 [myid:] - INFO  [main:o.a.z.s.a.JettyAdminServer@201] - Started AdminServer on address 0.0.0.0, port 8080 and command URL /commands
  5. 2024-05-03 03:10:19,030 [myid:] - INFO  [main:o.a.z.s.ServerCnxnFactory@169] - Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory
  6. 2024-05-03 03:10:19,032 [myid:] - WARN  [main:o.a.z.s.ServerCnxnFactory@309] - maxCnxns is not configured, using default value 0.
  7. 2024-05-03 03:10:19,034 [myid:] - INFO  [main:o.a.z.s.NIOServerCnxnFactory@652] - Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 8 worker threads, and 64 kB direct buffers.
  8. 2024-05-03 03:10:19,036 [myid:] - INFO  [main:o.a.z.s.NIOServerCnxnFactory@660] - binding to port 0.0.0.0/0.0.0.0:2181
  9. 2024-05-03 03:10:19,057 [myid:] - INFO  [main:o.a.z.s.w.WatchManagerFactory@42] - Using org.apache.zookeeper.server.watch.WatchManager as watch manager
  10. 2024-05-03 03:10:19,057 [myid:] - INFO  [main:o.a.z.s.w.WatchManagerFactory@42] - Using org.apache.zookeeper.server.watch.WatchManager as watch manager
  11. 2024-05-03 03:10:19,059 [myid:] - INFO  [main:o.a.z.s.ZKDatabase@134] - zookeeper.snapshotSizeFactor = 0.33
  12. 2024-05-03 03:10:19,059 [myid:] - INFO  [main:o.a.z.s.ZKDatabase@154] - zookeeper.commitLogCount=500
  13. 2024-05-03 03:10:19,067 [myid:] - INFO  [main:o.a.z.s.p.SnapStream@61] - zookeeper.snapshot.compression.method = CHECKED
  14. 2024-05-03 03:10:19,067 [myid:] - INFO  [main:o.a.z.s.p.FileTxnSnapLog@480] - Snapshotting: 0x0 to /opt/bitnami/zookeeper/data/version-2/snapshot.0
  15. 2024-05-03 03:10:19,071 [myid:] - INFO  [main:o.a.z.s.ZKDatabase@291] - Snapshot loaded in 13 ms, highest zxid is 0x0, digest is 1371985504
  16. 2024-05-03 03:10:19,073 [myid:] - INFO  [main:o.a.z.s.p.FileTxnSnapLog@480] - Snapshotting: 0x0 to /opt/bitnami/zookeeper/data/version-2/snapshot.0
  17. 2024-05-03 03:10:19,088 [myid:] - INFO  [main:o.a.z.s.ZooKeeperServer@589] - Snapshot taken in 15 ms
  18. 2024-05-03 03:10:19,105 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::o.a.z.s.PrepRequestProcessor@138] - PrepRequestProcessor (sid:0) started, reconfigEnabled=false
  19. 2024-05-03 03:10:19,106 [myid:] - INFO  [main:o.a.z.s.RequestThrottler@75] - zookeeper.request_throttler.shutdownTimeout = 10000 ms
  20. 2024-05-03 03:10:19,134 [myid:] - INFO  [main:o.a.z.s.ContainerManager@83] - Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0
  21. 2024-05-03 03:10:19,136 [myid:] - INFO  [main:o.a.z.a.ZKAuditProvider@42] - ZooKeeper audit is disabled.
复制代码
可以看到 zookeeper 正常启动了,并且末了一行日志显示验证被禁用了,这是由于上面在启动的时间添加了环境变量: ALLOW_ANONYMOUS_LOGIN=yes 答应任何人连接;后面配置 Kafka 密码验证的时间,就不这么配置了;
1.2 查看 zookeeper 状态

进入容器:
  1. docker exec -it zookeeper bash
复制代码
  1. I have no name!@zookeeper:/$ cd /opt/bitnami/zookeeper/bin
  2. I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$ ls
  3. README.txt    zkCli.sh   zkServer-initialize.sh  zkSnapShotToolkit.cmd   zkSnapshotComparer.sh                  zkTxnLogToolkit.cmd
  4. zkCleanup.sh  zkEnv.cmd  zkServer.cmd            zkSnapShotToolkit.sh    zkSnapshotRecursiveSummaryToolkit.cmd  zkTxnLogToolkit.sh
  5. zkCli.cmd     zkEnv.sh   zkServer.sh             zkSnapshotComparer.cmd  zkSnapshotRecursiveSummaryToolkit.sh
  6. I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$
复制代码
查看 zookeeper 状态:
  1. I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$ zkServer.sh status
  2. /opt/bitnami/java/bin/java
  3. ZooKeeper JMX enabled by default
  4. Using config: /opt/bitnami/zookeeper/bin/../conf/zoo.cfg
  5. Client port found: 2181. Client address: localhost. Client SSL: false.
  6. Mode: standalone
  7. I have no name!@zookeeper:/opt/bitnami/zookeeper/bin$
复制代码
1.3 启动 Kafka

创建挂载目次:
  1. mkdir -p kafka/data
  2. chown -R 1001.1001 kafka
复制代码
docker-compose.yaml
  1.   kafka:
  2.     image: docker.io/bitnami/kafka:3.4
  3.     container_name: kafka
  4.     hostname: kafka
  5.     privileged: true
  6.     restart: always
  7.     environment:
  8.       KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper:2181'
  9.     volumes:
  10.       - ./kafka/data:/opt/bitnami/kafka/data
  11.     ports:
  12.       - "9092:9092"
  13.     deploy:
  14.       resources:
  15.         limits:
  16.           cpus: '4'
  17.           memory: 4G
  18.         reservations:
  19.           cpus: '0.5'
  20.           memory: 200M
  21.     depends_on:
  22.       - zookeeper
复制代码
启动 Kafka :docker-compose up -d kafka
  1. [root@localhost kafka-with-zookeeper]# docker-compose up -d kafka
  2. [+] Running 2/2
  3. ⠿ Container zookeeper  Running                                                                                                                                     0.0s
  4. ⠿ Container kafka      Started
复制代码
查看 Kafka 日志:
  1. [root@localhost ~]# docker logs -f --tail 20 kafka
  2. [2024-05-03 04:12:03,381] INFO [ReplicaStateMachine controllerId=1001] Triggering online replica state changes (kafka.controller.ZkReplicaStateMachine)
  3. [2024-05-03 04:12:03,391] INFO [ReplicaStateMachine controllerId=1001] Triggering offline replica state changes (kafka.controller.ZkReplicaStateMachine)
  4. [2024-05-03 04:12:03,391] DEBUG [ReplicaStateMachine controllerId=1001] Started replica state machine with initial state -> Map() (kafka.controller.ZkReplicaStateMachine)
  5. [2024-05-03 04:12:03,393] INFO [PartitionStateMachine controllerId=1001] Initializing partition state (kafka.controller.ZkPartitionStateMachine)
  6. [2024-05-03 04:12:03,394] INFO [PartitionStateMachine controllerId=1001] Triggering online partition state changes (kafka.controller.ZkPartitionStateMachine)
  7. [2024-05-03 04:12:03,396] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to kafka:9092 (id: 1001 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
  8. [2024-05-03 04:12:03,399] DEBUG [PartitionStateMachine controllerId=1001] Started partition state machine with initial state -> Map() (kafka.controller.ZkPartitionStateMachine)
  9. [2024-05-03 04:12:03,400] INFO [Controller id=1001] Ready to serve as the new controller with epoch 1 (kafka.controller.KafkaController)
  10. [2024-05-03 04:12:03,411] INFO [Controller id=1001] Partitions undergoing preferred replica election:  (kafka.controller.KafkaController)
  11. [2024-05-03 04:12:03,412] INFO [Controller id=1001] Partitions that completed preferred replica election:  (kafka.controller.KafkaController)
  12. [2024-05-03 04:12:03,412] INFO [Controller id=1001] Skipping preferred replica election for partitions due to topic deletion:  (kafka.controller.KafkaController)
  13. [2024-05-03 04:12:03,413] INFO [Controller id=1001] Resuming preferred replica election for partitions:  (kafka.controller.KafkaController)
  14. [2024-05-03 04:12:03,416] INFO [Controller id=1001] Starting replica leader election (PREFERRED) for partitions  triggered by ZkTriggered (kafka.controller.KafkaController)
  15. [2024-05-03 04:12:03,431] INFO [Controller id=1001] Starting the controller scheduler (kafka.controller.KafkaController)
  16. [2024-05-03 04:12:03,461] INFO [BrokerToControllerChannelManager broker=1001 name=forwarding]: Recorded new controller, from now on will use node kafka:9092 (id: 1001 rack: null) (kafka.server.BrokerToControllerRequestThread)
  17. [2024-05-03 04:12:03,551] INFO [BrokerToControllerChannelManager broker=1001 name=alterPartition]: Recorded new controller, from now on will use node kafka:9092 (id: 1001 rack: null) (kafka.server.BrokerToControllerRequestThread)
  18. [2024-05-03 04:12:08,434] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
  19. [2024-05-03 04:12:08,435] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
  20. [2024-05-03 04:17:08,439] INFO [Controller id=1001] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
  21. [2024-05-03 04:17:08,439] TRACE [Controller id=1001] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
复制代码
可以看到 kafka 正常启动了。
1.4 Kafka 配置文件

进入 Kafka 容器,看下上面什么环境变量都没配置时,配置文件server.properties的内容:
  1. I have no name!@kafka:/opt/bitnami/kafka/config$ cat server.properties
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements.  See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License.  You may obtain a copy of the License at
  8. #
  9. #    http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. # This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
  18. # See kafka.server.KafkaConfig for additional details and defaults
  19. #
  20. ############################# Server Basics #############################
  21. # The id of the broker. This must be set to a unique integer for each broker.
  22. #broker.id=0
  23. ############################# Socket Server Settings #############################
  24. # The address the socket server listens on. If not configured, the host name will be equal to the value of
  25. # java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
  26. #   FORMAT:
  27. #     listeners = listener_name://host_name:port
  28. #   EXAMPLE:
  29. #     listeners = PLAINTEXT://your.host.name:9092
  30. #listeners=PLAINTEXT://:9092
  31. # Listener name, hostname and port the broker will advertise to clients.
  32. # If not set, it uses the value for "listeners".
  33. #advertised.listeners=PLAINTEXT://your.host.name:9092
  34. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
  35. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  36. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  37. num.network.threads=3
  38. # The number of threads that the server uses for processing requests, which may include disk I/O
  39. num.io.threads=8
  40. # The send buffer (SO_SNDBUF) used by the socket server
  41. socket.send.buffer.bytes=102400
  42. # The receive buffer (SO_RCVBUF) used by the socket server
  43. socket.receive.buffer.bytes=102400
  44. # The maximum size of a request that the socket server will accept (protection against OOM)
  45. socket.request.max.bytes=104857600
  46. ############################# Log Basics #############################
  47. # A comma separated list of directories under which to store log files
  48. log.dirs=/bitnami/kafka/data
  49. # The default number of log partitions per topic. More partitions allow greater
  50. # parallelism for consumption, but this will also result in more files across
  51. # the brokers.
  52. num.partitions=1
  53. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
  54. # This value is recommended to be increased for installations with data dirs located in RAID array.
  55. num.recovery.threads.per.data.dir=1
  56. ############################# Internal Topic Settings  #############################
  57. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
  58. # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
  59. offsets.topic.replication.factor=1
  60. transaction.state.log.replication.factor=1
  61. transaction.state.log.min.isr=1
  62. ############################# Log Flush Policy #############################
  63. # Messages are immediately written to the filesystem but by default we only fsync() to sync
  64. # the OS cache lazily. The following configurations control the flush of data to disk.
  65. # There are a few important trade-offs here:
  66. #    1. Durability: Unflushed data may be lost if you are not using replication.
  67. #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
  68. #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
  69. # The settings below allow one to configure the flush policy to flush data after a period of time or
  70. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
  71. # The number of messages to accept before forcing a flush of data to disk
  72. #log.flush.interval.messages=10000
  73. # The maximum amount of time a message can sit in a log before we force a flush
  74. #log.flush.interval.ms=1000
  75. ############################# Log Retention Policy #############################
  76. # The following configurations control the disposal of log segments. The policy can
  77. # be set to delete segments after a period of time, or after a given size has accumulated.
  78. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
  79. # from the end of the log.
  80. # The minimum age of a log file to be eligible for deletion due to age
  81. log.retention.hours=168
  82. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
  83. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
  84. #log.retention.bytes=1073741824
  85. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
  86. #log.segment.bytes=1073741824
  87. # The interval at which log segments are checked to see if they can be deleted according
  88. # to the retention policies
  89. log.retention.check.interval.ms=300000
  90. ############################# Zookeeper #############################
  91. # Zookeeper connection string (see zookeeper docs for details).
  92. # This is a comma separated host:port pairs, each corresponding to a zk
  93. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  94. # You can also append an optional chroot string to the urls to specify the
  95. # root directory for all kafka znodes.
  96. zookeeper.connect=zookeeper:2181
  97. # Timeout in ms for connecting to zookeeper
  98. #zookeeper.connection.timeout.ms=18000
  99. ############################# Group Coordinator Settings #############################
  100. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
  101. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
  102. # The default value for this is 3 seconds.
  103. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
  104. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
  105. #group.initial.rebalance.delay.ms=0
  106. sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
复制代码
  这里配置文件的值都可以利用环境变量来指定;
服务底子配置:broker.id 服务器节点ID,同一集群中必须唯一
可以看到,脚本中没有设置 listeners 和 advertised.listeners 的值,配置文件中这两项的配置是注释掉的,假如必要外部呆板访问当前 Kafka 则必要配置这两项:
  

  • KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
  • KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
  • KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLERLAINTEXT,EXTERNALLAINTEXT,PLAINTEXTLAINTEXT
  • KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    这里配置了三个监听器连接方式:


    • 第一组是明文端口9092,可以利用服务名kafka进行访问;



    • 第二组是集群内部通信,9093端口;



    • 第三组是外部呆板明文访问,9094端口,在现实利用时,localhost 必要改为当前呆板的IP;
      假如为单节点 Kafka,答应外部呆板访问,则只必要配置KAFKA_CFG_LISTENERS、KAFKA_CFG_ADVERTISED_LISTENERS就可以了。

  1.4 利用下令操作 Kafak 生产、消费

进入 Kafka 容器:
  1. docker exec -it kafka bash
  2. cd /opt/bitnami/bin/
复制代码
  1. I have no name!@kafka:/opt/bitnami/kafka/bin$ ls
  2. connect-distributed.sh        kafka-console-producer.sh    kafka-leader-election.sh       kafka-run-class.sh                  kafka-verifiable-producer.sh
  3. connect-mirror-maker.sh       kafka-consumer-groups.sh     kafka-log-dirs.sh              kafka-server-start.sh               trogdor.sh
  4. connect-standalone.sh         kafka-consumer-perf-test.sh  kafka-metadata-quorum.sh       kafka-server-stop.sh                windows
  5. kafka-acls.sh                 kafka-delegation-tokens.sh   kafka-metadata-shell.sh        kafka-storage.sh                    zookeeper-security-migration.sh
  6. kafka-broker-api-versions.sh  kafka-delete-records.sh      kafka-mirror-maker.sh          kafka-streams-application-reset.sh  zookeeper-server-start.sh
  7. kafka-cluster.sh              kafka-dump-log.sh            kafka-producer-perf-test.sh    kafka-topics.sh                     zookeeper-server-stop.sh
  8. kafka-configs.sh              kafka-features.sh            kafka-reassign-partitions.sh   kafka-transactions.sh               zookeeper-shell.sh
  9. kafka-console-consumer.sh     kafka-get-offsets.sh         kafka-replica-verification.sh  kafka-verifiable-consumer.sh
  10. I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码
这里看下脚本中的配置KAFKA_HEAP_OPTS:
  1. I have no name!@kafka:/opt/bitnami/kafka/bin$ cat kafka-console-producer.sh
  2. #!/bin/bash
  3. # Licensed to the Apache Software Foundation (ASF) under one or more
  4. # contributor license agreements.  See the NOTICE file distributed with
  5. # this work for additional information regarding copyright ownership.
  6. # The ASF licenses this file to You under the Apache License, Version 2.0
  7. # (the "License"); you may not use this file except in compliance with
  8. # the License.  You may obtain a copy of the License at
  9. #
  10. #    http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  18.     export KAFKA_HEAP_OPTS="-Xmx512M"
  19. fi
  20. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
  21. I have no name!@kafka:/opt/bitnami/kafka/bin$
  22. I have no name!@kafka:/opt/bitnami/kafka/bin$ cat kafka-console-consumer.sh
  23. #!/bin/bash
  24. # Licensed to the Apache Software Foundation (ASF) under one or more
  25. # contributor license agreements.  See the NOTICE file distributed with
  26. # this work for additional information regarding copyright ownership.
  27. # The ASF licenses this file to You under the Apache License, Version 2.0
  28. # (the "License"); you may not use this file except in compliance with
  29. # the License.  You may obtain a copy of the License at
  30. #
  31. #    http://www.apache.org/licenses/LICENSE-2.0
  32. #
  33. # Unless required by applicable law or agreed to in writing, software
  34. # distributed under the License is distributed on an "AS IS" BASIS,
  35. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  36. # See the License for the specific language governing permissions and
  37. # limitations under the License.
  38. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  39.     export KAFKA_HEAP_OPTS="-Xmx512M"
  40. fi
  41. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
  42. I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码
可以看到生产者和消费者的参数配置仅配置了一个环境变量:KAFKA_HEAP_OPTS,并且内容仅为 -Xmx512M,记着这个玩意,等下密码验证时,必要对这个值进行扩展;
1.4.1 创建topic

创建名为 test,partitions(分区)为10,replication(副本)为1的topic
  1. I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --create --bootstrap-server kafka:9092 --partitions 10 --replication-factor 1 --topic test
  2. Created topic test.
  3. I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码
这里利用的是 kafka 来访问的,假如配置了 KAFKA_CFG_ADVERTISED_LISTENERS 也可以利用当前呆板的Ip进行访问;
1.4.2 查看某个 topic

查看 Topic 分区情况,利用 describe
  1. I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic test
  2. Topic: test     TopicId: nAakv4u8SJO022gRe-ahVQ PartitionCount: 10      ReplicationFactor: 1    Configs:
  3.         Topic: test     Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
  4.         Topic: test     Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
  5.         Topic: test     Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
  6.         Topic: test     Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
  7.         Topic: test     Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001
  8.         Topic: test     Partition: 5    Leader: 1001    Replicas: 1001  Isr: 1001
  9.         Topic: test     Partition: 6    Leader: 1001    Replicas: 1001  Isr: 1001
  10.         Topic: test     Partition: 7    Leader: 1001    Replicas: 1001  Isr: 1001
  11.         Topic: test     Partition: 8    Leader: 1001    Replicas: 1001  Isr: 1001
  12.         Topic: test     Partition: 9    Leader: 1001    Replicas: 1001  Isr: 1001
  13. I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码
1.4.3 获取所有 topic

利用关键字 list
  1. I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --list --bootstrap-server kafka:9092
  2. test
  3. I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码
1.4.4 删除 topic

  1. I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic test
  2. I have no name!@kafka:/opt/bitnami/kafka/bin$
  3. I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-topics.sh --list --bootstrap-server kafka:9092               
  4. I have no name!@kafka:/opt/bitnami/kafka/bin$
复制代码
1.4.4 发送消息

首先创建一个 topic:
  1. kafka-topics.sh --create --bootstrap-server kafka:9092 --partitions 10 --replication-factor 1 --topic test
复制代码
发送两条消息
  1. I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-console-producer.sh --broker-list kafka:9092 --topic test
  2. >hello
  3. >hello everyone
  4. >
复制代码
1.4.5 消费消息

再打开一个窗口,作为消费者连接 kafka
  1. [root@localhost ~]# docker exec -it kafka bash
  2. I have no name!@kafka:/$
  3. I have no name!@kafka:/$ cd /opt/bitnami/kafka/bin/
  4. I have no name!@kafka:/opt/bitnami/kafka/bin$
  5. I have no name!@kafka:/opt/bitnami/kafka/bin$ kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
  6. hello
  7. hello everyone
复制代码
可以看到,已经正常消费了刚才天生的两条消息;
再次在生产者的窗口里天生消息,可以看到,消费者窗口,可以正常消费消息;
2. 有密码部署

这里利用SASL_PLAINTEXT模式,也就是明文密码,明文密码是直接配置在配置文件中,假如必要更安全的操作,可以利用jks证书的方式来配置 SASL_SSL密码验证。可参考:kafka-generate-ssl.sh
2.1 利用 SASL 验证模式

这里利用 无密码的 zookeeper,来搭配有密码的 Kafka;
首先创建一个配置文件secrets/kafka_server_jaas.conf,用来配置 Kafka 的账号和密码:
  1. KafkaServer {
  2.   org.apache.kafka.common.security.plain.PlainLoginModule required
  3.     username="admin"
  4.     password="Abc123456"
  5.     user_admin="Abc123456"
  6.     user_producer="Abc123456"
  7.     user_consumer="Abc123456";
  8. };
  9. KafkaClient {
  10.   org.apache.kafka.common.security.plain.PlainLoginModule required
  11.     username="producer"
  12.     password="Abc123456";
  13. };
复制代码
  该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule指定采用PLAIN机制,定义了用户。
  

  • usemame和password指定该署理与集群其他署理初始化连接的用户名和密码
  • "user_"为前缀后接用户名方式创建连接署理的用户名和密码,比方,user_producer=“Abc123456”是指用户名为producer,密码为Abc123456
  • username为admin的用户,和user为admin的用户,密码要保持划一,否则会认证失败
  上述配置中,在KafkaServer里,创建了三个用户,分别为admin、producer和consumer(创建多少个用户,可根据业务必要配置,用户名和密码可自定义设置;
在KafkaClient里配置了一个用户,用户名和密码必要和服务端配置的账号密码保持划一,这里配置了producer这个用户。
修改配置文件权限:
  1. chown -R 1001.1001 secrets
复制代码
接着是 doker-compose 中的 kafka 配置调解为:
  1.   kafka:
  2.     image: docker.io/bitnami/kafka:3.4
  3.     container_name: kafka
  4.     hostname: kafka
  5.     privileged: true
  6.     restart: always
  7.     environment:
  8.       KAFKA_OPTS: '-Djava.security.auth.login.config=/opt/bitnami/kafka/secrets/kafka_server_jaas.conf'
  9.       KAFKA_BROKER_ID: 0
  10.       KAFKA_CFG_LISTENERS: SASL_PLAINTEXT://:9092
  11.       KAFKA_CFG_ADVERTISED_LISTENERS: SASL_PLAINTEXT://192.168.104.147:9092
  12.       KAFKA_CFG_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
  13.       KAFKA_CFG_SASL_ENABLED_MECHANISMS: PLAIN
  14.       KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
  15.       KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
  16.       # 平替默认配置文件里面的值
  17.       KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
  18.       KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
  19.       KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2
  20.     volumes:
  21.       - ./kafka/data:/opt/bitnami/kafka/data
  22.       - ./secrets:/opt/bitnami/kafka/secrets
  23.     ports:
  24.       - "9092:9092"
  25.     deploy:
  26.       resources:
  27.         limits:
  28.           cpus: '4'
  29.           memory: 4G
  30.         reservations:
  31.           cpus: '0.5'
  32.           memory: 200M
  33.     depends_on:
  34.       - zookeeper
复制代码
启动成功日志末端为:
  1. [2024-05-03 07:26:17,096] TRACE [Controller id=0] Leader imbalance ratio for broker 1001 is 1.0 (kafka.controller.KafkaController)
  2. [2024-05-03 07:26:17,102] INFO [Controller id=0] Starting replica leader election (PREFERRED) for partitions  triggered by AutoTriggered (kafka.controller.KafkaController)
  3. [2024-05-03 07:26:20,808] INFO [RequestSendThread controllerId=0] Controller 0 connected to 192.168.104.147:9092 (id: 0 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
  4. [2024-05-03 07:26:20,850] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use node 192.168.104.147:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
  5. [2024-05-03 07:26:20,859] INFO [Broker id=0] Add 60 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 0 epoch 2 with correlation id 1 (state.change.logger)
  6. [2024-05-03 07:26:20,927] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use node 192.168.104.147:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
复制代码
可以看到,由于指定了环境变量:Broker ID 这里也确实见效了,由默认的ID 1001 改成了我们设置的 0;
2.2 客户端验证

进入容器,测试下令行操作 Kafka
利用原来操作 Topic 的下令:
利用下令:
  1. kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties
复制代码
上面下令不好使,执行会住,查看日志可以看到一直循环打印下面的信息:
  1. [2024-05-03 07:53:41,938] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:36094-17) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
复制代码
发现一直在报错,然后新加一个配置文件client.properties,内容如下:
  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=PLAIN
  3. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="Abc123456";
复制代码
接着利用查看 Topic 的下令:
  1. kafka-topics.sh --list --bootstrap-server kafka:9092 --command-config /opt/bitnami/kafka/secrets/client.properties
  2. __consumer_offsets
  3. test
  4. test1
复制代码
可以看到,可以正常看到 topic 列表;
查看某个 Topic 信息:
  1. I have no name!@kafka:/opt/bitnami/kafka/config$ kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic test --command-config /opt/bitnami/kafka/secrets/client.properties
  2. Topic: test     TopicId: _rY4JuTlQ-KRWdhbmIpklQ PartitionCount: 10      ReplicationFactor: 1    Configs:
  3.         Topic: test     Partition: 0    Leader: none    Replicas: 1001  Isr: 1001
  4.         Topic: test     Partition: 1    Leader: none    Replicas: 1001  Isr: 1001
  5.         Topic: test     Partition: 2    Leader: none    Replicas: 1001  Isr: 1001
  6.         Topic: test     Partition: 3    Leader: none    Replicas: 1001  Isr: 1001
  7.         Topic: test     Partition: 4    Leader: none    Replicas: 1001  Isr: 1001
  8.         Topic: test     Partition: 5    Leader: none    Replicas: 1001  Isr: 1001
  9.         Topic: test     Partition: 6    Leader: none    Replicas: 1001  Isr: 1001
  10.         Topic: test     Partition: 7    Leader: none    Replicas: 1001  Isr: 1001
  11.         Topic: test     Partition: 8    Leader: none    Replicas: 1001  Isr: 1001
  12.         Topic: test     Partition: 9    Leader: none    Replicas: 1001  Isr: 1001
  13. I have no name!@kafka:/opt/bitnami/kafka/config$
  14. I have no name!@kafka:/opt/bitnami/kafka/config$ kafka-topics.sh --describe --bootstrap-server kafka:9092 --topic test1 --command-config /opt/bitnami/kafka/secrets/client.properties
  15. Topic: test1    TopicId: pNlhOdhjQtaemjVB3jjM0g PartitionCount: 1       ReplicationFactor: 1    Configs:
  16.         Topic: test1    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
  17. I have no name!@kafka:/opt/bitnami/kafka/config$
复制代码
现在题目来了,在生产者天生消息和消费消息时就噶了,天生消息也不报错,消费消息也不报错,但是消费者那不到消息,就很烦了。。。。。
既然下令行不行,那就换个其他的工具,本来还想省事呢,再加个 kafka-ui 瞅一眼;
3. 利用可视化工具 Kafka-ui

3.1 Kafka-ui 的底子利用

在上面 docker-compose 脚本中添加 Kafka-ui 的服务配置:
  1.   kafka-ui:
  2.     image: provectuslabs/kafka-ui
  3.     container_name: kafka-ui
  4.     hostname: kafka-ui
  5.     privileged: true
  6.     restart: always
  7.     environment:
  8.       - DYNAMIC_CONFIG_ENABLED=true
  9.       - AUTH_TYPE=LOGIN_FORM
  10.       - SPRING_SECURITY_USER_NAME=admin
  11.       - SPRING_SECURITY_USER_PASSWORD=admin123
  12.     #volumes:
  13.     #  - ~/kui/config.yml:/etc/kafkaui/dynamic_config.yaml
  14.     ports:
  15.       - "8080:8080"
  16.     deploy:
  17.       resources:
  18.         limits:
  19.           cpus: '1'
  20.           memory: 2G
  21.         reservations:
  22.           cpus: '0.5'
复制代码
配置上面的内容,启动 kafka-ui 服务后,可以看到正常启动的日志如下:
  1. [root@localhost ~]# docker logs -f --tail 20 kafka-ui
  2. Standard Commons Logging discovery in action with spring-jcl: please remove commons-logging.jar from classpath in order to avoid potential conflicts
  3. _   _ ___    __             _                _          _  __      __ _
  4. | | | |_ _|  / _|___ _ _    /_\  _ __ __ _ __| |_  ___  | |/ /__ _ / _| |_____
  5. | |_| || |  |  _/ _ | '_|  / _ \| '_ / _` / _| ' \/ -_) | ' </ _` |  _| / / _`|
  6. \___/|___| |_| \___|_|   /_/ \_| .__\__,_\__|_||_\___| |_|\_\__,_|_| |_\_\__,|
  7.                                  |_|                                             
  8. 2024-05-04 02:37:44,032 WARN  [main] c.p.k.u.u.DynamicConfigOperations: Dynamic config file /etc/kafkaui/dynamic_config.yaml doesnt exist or not readable
  9. 2024-05-04 02:37:44,098 INFO  [main] c.p.k.u.KafkaUiApplication: Starting KafkaUiApplication using Java 17.0.6 with PID 1 (/kafka-ui-api.jar started by kafkaui in /)
  10. 2024-05-04 02:37:44,099 DEBUG [main] c.p.k.u.KafkaUiApplication: Running with Spring Boot v3.1.3, Spring v6.0.11
  11. 2024-05-04 02:37:44,100 INFO  [main] c.p.k.u.KafkaUiApplication: No active profile set, falling back to 1 default profile: "default"
  12. 2024-05-04 02:37:59,321 INFO  [main] c.p.k.u.c.a.BasicAuthSecurityConfig: Configuring LOGIN_FORM authentication.
  13. 2024-05-04 02:38:00,411 INFO  [main] o.s.b.a.e.w.EndpointLinksResolver: Exposing 3 endpoint(s) beneath base path '/actuator'
  14. 2024-05-04 02:38:03,523 INFO  [main] o.s.b.w.e.n.NettyWebServer: Netty started on port 8080
  15. 2024-05-04 02:38:03,615 INFO  [main] c.p.k.u.KafkaUiApplication: Started KafkaUiApplication in 22.402 seconds (process running for 25.788)
复制代码
这里并没有初始化配置连接 Kafka 的信息,仅仅在官方 Demo 上配置了个登录验证,登录页面如下:

这里登录的账号和密码就是上面脚本中配置的 admin和admin123;
官方支持的环境变量,可以参考这里:misc-configuration-properties
并且脚本里面注释了挂载的配置文件,这个配置文件等下我们来看看里面的内容到底是什么;
登录成功之后,会自动进入 Kafka-ui 的控制台页面:http://IP:8080/ui/clusters/create-new-cluster,来配置连接 Kafka 集群的信息;

这里我们必要配置下必填项,如集群名称Cluster name,是否只读一样平常是不勾选的,我们可以利用这个来测试 Kafka 的发送消息,接着是 Bootstrap Servers 这里假如是kafka 集群,则可以配置多个;
假如没有配置 Kafka 的SASL验证,可以直接拉到最下面 利用 Validate 来测试是否能正常连接 Kafka;
由于我这里配置了 SASL验证,所以在 Authentication 这一项里面,配置Authentication Method为 SASL/PLAIN,Security Protocol 配置为:SASL_PLAINTEXT,接着勾选:Secured with auth?, 配置用户名和密码;
这里必要注意的是,用户名和密码,得是 Kafka 的 KafkaClient 里面配置的用户,不能是 KafkaServer里面的,否则在下面验证时报错:

KafkaClient 里面配置的用户:producer 再次验证

可以看到,连接成功了,也就是说这里的用户名和密码配置是作为 Kafka 的客户端来连接的。
点击 Submit 按钮,提交当前配置,自动进入 Kafka-UI 的控制台页面;

可以看到控制台面板下显示了我们的集群:test, 点击下面的 Brokes 、Topic 、Consumers 可以正常显示我们的集群信息:



但凡在连接集群的时间那里配置的不对的,在上面菜单中,就可能不会正常显示我们的 Kafka 集群信息;
3.2 Kafka-ui 的配置文件

上面我们的脚本里面注释了一个挂载目次文件/etc/kafkaui/dynamic_config.yaml,现在我们来看下这个文件的内容
  1. [root@localhost ~]# docker exec -it kafka-ui sh
  2. / $
  3. / $ cd /etc/kafkaui/
  4. /etc/kafkaui $ ls
  5. dynamic_config.yaml
  6. /etc/kafkaui $
  7. /etc/kafkaui $ cat dynamic_config.yaml
  8. auth:
  9.   type: LOGIN_FORM
  10. kafka:
  11.   clusters:
  12.   - bootstrapServers: kafka:9092
  13.     name: test
  14.     properties:
  15.       security.protocol: SASL_PLAINTEXT
  16.       sasl.mechanism: PLAIN
  17.       sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required
  18.         username="producer" password="Abc123456";
  19.     readOnly: false
  20. rbac:
  21.   roles: []
  22. webclient: {}
  23. /etc/kafkaui $
复制代码
可以看到这个文件里面的内容,也就是上面配置的连接 Kafka 集群信息;
并且官方也说了这些配置是支持环境变量来配置的:
如:KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS 到配置文件里面就成了:
  1. kafka:
  2.   clusters:
  3.     - bootstrapServers: xxx
复制代码
但是我试了下直接把上面配置文件的内容转成如下的环境变量:
  1.       - KAFKA_CLUSTERS_0_NAME=local
  2.       - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
  3.       - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
  4.       - KAFKA_CLUSTERS_0_READONLY=false
  5.       - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_PLAINTEXT
  6.       - KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN
  7.       - KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="Abc123456";'
复制代码
但是启动的时间就报错了。然后还是注释掉,在启动 Kafka-ui 之后,在页面配置要连接的 Kafka 集群信息就好了。
假如在配置Kafka账号的时间提示验证失败,这玩意好像有点不稳固还是怎么着,多试几下 说不定就好了。
假如账号配置错误,在连接 kafka 的时间,会报错,比如我刚开始配置了 KafkaClient 的账号,然后就报错了,异常如下:
  1. 2024-05-04 02:28:06,338 ERROR [parallel-1] c.p.k.u.s.StatisticsService: Failed to collect cluster local info
  2. java.lang.IllegalStateException: Error while creating AdminClient for Cluster local
  3.         at com.provectus.kafka.ui.service.AdminClientServiceImpl.lambda$createAdminClient$5(AdminClientServiceImpl.java:56)
  4.         at reactor.core.publisher.Mono.lambda$onErrorMap$28(Mono.java:3783)
  5.         at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
  6.         at reactor.core.publisher.Operators.error(Operators.java:198)
  7.         at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:135)
  8.         at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
  9.         at reactor.core.publisher.Mono.subscribe(Mono.java:4480)
  10.         at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
  11.         at reactor.core.publisher.Operators.complete(Operators.java:137)
  12.         at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
  13.         at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
  14.         at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:427)
  15.         at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
  16.         at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
  17.         at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
  18.         at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
  19.         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  20.         at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
  21.         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  22.         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  23.         at java.base/java.lang.Thread.run(Thread.java:833)
  24. Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
  25.         at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:551)
  26.         at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:488)
  27.         at org.apache.kafka.clients.admin.Admin.create(Admin.java:134)
  28.         at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
  29.         at com.provectus.kafka.ui.service.AdminClientServiceImpl.lambda$createAdminClient$2(AdminClientServiceImpl.java:53)
  30.         at reactor.core.publisher.MonoSupplier.call(MonoSupplier.java:67)
  31.         at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:127)
  32.         ... 16 common frames omitted
  33. Caused by: java.lang.IllegalArgumentException: Login module control flag not specified in JAAS config
  34.         at org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:110)
  35.         at org.apache.kafka.common.security.JaasConfig.<init>(JaasConfig.java:63)
  36.         at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:93)
  37.         at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:87)
  38.         at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:167)
  39.         at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
  40.         at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
  41.         at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:522)
  42.         ... 22 common frames omitted
  43. 2024-05-04 02:28:06,339 DEBUG [parallel-1] c.p.k.u.s.ClustersStatisticsScheduler: Metrics updated for cluster: local
复制代码
这里要配置成 KafkaServer 的 admin 账号;假如 KafkaServer 的 admin 账号也不好使,就再试KafkaClient 的账号。只要 Kafka 配置的没题目,这里连接应该也题目不大。
4. 题目记录

4.1 消费者监听Topic时异常:Replication factor: 3 larger than available brokers: 1.

  1. [2024-06-05 09:51:42,266] INFO [Admin Manager on Broker 0]: Error processing create topic request CreatableTopic(name='__consumer_offsets', numPartitions=50, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='compression.type', value='producer'), CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='segment.bytes', value='104857600')]) (kafka.server.ZkAdminManager)
  2. org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
复制代码
这个异常是由于 Kafka 环境配置题目,错误信息中显示在尝试创建一个名为__consumer_offsets的主题,该主题的复制因子(replicationFactor)被设置为3,但是Kafka集群中当前可用的Broker数量只有1个。在Kafka中,复制因子表现一个分区的数据将被复制到多少个Broker上,以提供数据冗余和容错本领。
要办理这个题目,可以接纳以下几种方式之一:
增加Broker的数量:确保你的Kafka集群中有至少3个活跃的Broker。这可以通过启动更多的Kafka实例并将其配置加入到集群中来实现。
减少复制因子:假如你暂时无法增加Broker的数量,可以考虑减少__consumer_offsets主题的复制因子,使其小于或等于当前集群中的Broker数量。比方,假如只有一个Broker,那么复制因子应该设为1。
查抄Broker的康健状态:确保所有的Broker都处于正常运行状态,没有Broker由于故障而离线。可以利用Kafka的管理下令大概监控工具查抄Broker的状态。
我的 Kafak 服务配置了环境变量:
  1.      # 平替默认配置文件里面的值 TOPIC_REPLICATION_FACTOR 为集群中节点个数
  2.       KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  3.       # 控制着与事务相关的日志主题的复制因子
  4.       KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  5.       KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
复制代码
在报错的时间,这里KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR配置的是3,集群中只有一个节点,所以报错了,现在改成1;
在修改了上述设置后,你可能必要重新创建或更新__consumer_offsets主题, 这里直接删除 zookeeper 的数据,重新启动 zookeeper ,然后再重新创建 Kafka 服务。这时间生产者发送消息、消费者消费消息都可以正常工作了。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

来自云龙湖轮廓分明的月亮

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表