Ubuntu 环境安装 Kafka、设置运行测试 Kafka 流程笔记

打印 上一主题 下一主题

主题 564|帖子 564|积分 1692

Kafka 介绍

Kafka 是一个由 Apache 软件基金会开辟的开源流式处置惩罚平台。它被设计用于处置惩罚大规模数据流,提供高可靠性、高吞吐量和低延迟的消息传递系统。Kafka 可以用于构建实时数据管道和流式应用程序,让不同应用、系统或者数据源之间能够高效地进行数据交换和通信。

Kafka 的核心概念包罗以下几个部分:

  • 消息: Kafka 是基于发布/订阅模式的消息系统,它通过主题(Topics)来构造消息。消息由生产者发布到主题,消费者可以订阅一个或多个主题以接收消息。
  • 主题: 主题是消息的分类,每个主题可以包罗一个或多个分区(Partitions)。消息发布到主题后,会根据肯定规则被分发到不同的分区中。
  • 分区: 主题可以被分为多个分区,每个分区都是有序且长期化的消息记录序列。分区使得 Kafka 能够水平扩展,允许多个消费者并行地处置惩罚消息。
  • 生产者: 生产者负责向 Kafka 的主题发布消息。
  • 消费者: 消费者从 Kafka 主题订阅并处置惩罚消息。
  • 代理(Broker): Kafka 集群由多个代理构成,每个代理是一个独立的 Kafka 服务器,负责存储数据和处置惩罚消息。
Kafka 的特点包罗:


  • 长期性: Kafka 将消息长期化存储在磁盘上,保证消息不会丢失。
  • 高吞吐量: Kafka 能够处置惩罚大量数据并保持低延迟,适用于大规模的数据处置惩罚和分析场景。
  • 可扩展性: 可以水平扩展以处置惩罚更多数据和更高的负载。
  • 容错性: Kafka 集群通过副本机制实现数据备份和容错,纵然部分节点出现故障,仍能保证数据可靠性和可用性。
Kafka 在数据流处置惩罚、实时日记处置惩罚、指标监控等领域有着广泛的应用,被许多公司用于构建实时数据管道和处置惩罚大规模数据。
在 Ubuntu 环境下如何安装 Kafka、Kafka with Kraft

安装 Kafka 在 Ubuntu 环境下可以通过以下步骤进行。请注意,这里描述的是安装 Kafka 3.6.0 版本的方法。在安装之前,请确保已经安装了 Java 8 或更新版本。

了解一下 Kraft

Kafka 2.8 版本引入了 KRaft(Kafka Raft)作为 Kafka 的新的元数据管理方式,用来替代原来依靠 ZooKeeper 的方案。KRaft 是一个基于 Raft 一致性协议实现的元数据管理系统,它可以作为 Kafka 的替代方案,不再依靠 ZooKeeper。
Kafka with KRaft 使用 Raft 协议来管理和维护 Kafka 的元数据信息,包罗分区分配、集群设置等。如允许以简化 Kafka 摆设和管理过程,不再需要维护额外的 ZooKeeper 集群。
步骤:

1. 安装 Java

查抄是否已经安装 Java:
  1. java -version
复制代码
如果未安装 Java 或需要更新,可以使用以下下令安装 OpenJDK:
  1. sudo apt update
  2. sudo apt install default-jdk
复制代码
2. 下载 Kafka

在 Apache Kafka 的官方网站下载所需的 Kafka 版本,例如 3.6.0 版本。

   Kafka 的版本号按照 <Scala 版本>-<Kafka 版本> 的格式命名。例如,kafka_2.13-3.6.0.tgz 中的 3.6.0 是 Kafka 的版本号,而 2.13 表现这个 Kafka 版本是用 Scala 2.13 构建的。Kafka 发布的软件包已经包罗了编译后的 Scala 代码,因此你只需按照 Kafka 的安装步骤进行操作即可,无需单独安装 Scala。
  1. wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
复制代码
3. 解压并移动 Kafka

解压下载的 Kafka 压缩包:
  1. tar -xzf kafka_2.13-3.6.0.tgz
复制代码
将解压后的文件夹移动到所需位置,例如 /opt 目录:
  1. sudo mv kafka_2.13-3.6.0 /opt/kafka
复制代码
4. 以 Kraft 方式启动 Kafka

天生集群 UUID:
  1. KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
复制代码
使用 bin/kafka-storage.sh format 下令来为 Kafka with KRaft 集群的日记目录进行格式化
  1. bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
复制代码
启动 Kafka 服务器:
  1. # 正常运行
  2. bin/kafka-server-start.sh config/kraft/server.properties
  3. # 也可以选择后台运行
  4. nohup bin/kafka-server-start.sh config/kraft/server.properties > my_kafka_run.log 2>&1 &
复制代码
一旦 Kafka 服务器乐成启动,你就会拥有一个基本的 Kafka 环境,可以开始使用了。

启动后的输出信息:

  1. [2023-11-28 07:46:27,307] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
  2. [2023-11-28 07:46:27,603] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
  3. [2023-11-28 07:46:27,761] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
  4. [2023-11-28 07:46:27,764] INFO [ControllerServer id=1] Starting controller (kafka.server.ControllerServer)
  5. [2023-11-28 07:46:27,782] INFO authorizerStart completed for endpoint CONTROLLER. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures)
  6. [2023-11-28 07:46:28,132] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
  7. [2023-11-28 07:46:28,165] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(CONTROLLER) (kafka.network.SocketServer)
  8. [2023-11-28 07:46:28,166] INFO [SharedServer id=1] Starting SharedServer (kafka.server.SharedServer)
  9. [2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
  10. [2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding producer state from offset 0 (kafka.log.UnifiedLog$)
  11. [2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Producer state recovery took 0ms for snapshot load and 0ms for segment recovery from offset 0 (kafka.log.UnifiedLog$)
  12. [2023-11-28 07:46:28,262] INFO Initialized snapshots with IDs SortedSet() from /tmp/kraft-combined-logs/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)
  13. [2023-11-28 07:46:28,301] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
  14. [2023-11-28 07:46:28,490] INFO [RaftManager id=1] Completed transition to Unattached(epoch=0, voters=[1], electionTimeoutMs=1226) from null (org.apache.kafka.raft.QuorumState)
  15. [2023-11-28 07:46:28,563] INFO [RaftManager id=1] Completed transition to CandidateState(localId=1, epoch=1, retries=1, voteStates={1=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1279) from Unattached(epoch=0, voters=[1], electionTimeoutMs=1226) (org.apache.kafka.raft.QuorumState)
  16. [2023-11-28 07:46:28,572] INFO [RaftManager id=1] Completed transition to Leader(localId=1, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, voterStates={1=ReplicaState(nodeId=1, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) from CandidateState(localId=1, epoch=1, retries=1, voteStates={1=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1279) (org.apache.kafka.raft.QuorumState)
  17. [2023-11-28 07:46:28,596] INFO [kafka-1-raft-outbound-request-thread]: Starting (kafka.raft.RaftSendThread)
  18. [2023-11-28 07:46:28,596] INFO [kafka-1-raft-io-thread]: Starting (kafka.raft.KafkaRaftManager$RaftIoThread)
  19. [2023-11-28 07:46:28,617] INFO [RaftManager id=1] High watermark set to LogOffsetMetadata(offset=1, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=91)]) for the first time for epoch 1 based on indexOfHw 0 and voters [ReplicaState(nodeId=1, endOffset=Optional[LogOffsetMetadata(offset=1, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=91)])], lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)] (org.apache.kafka.raft.LeaderState)
  20. [2023-11-28 07:46:28,619] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
  21. [2023-11-28 07:46:28,620] INFO [ControllerServer id=1] Waiting for controller quorum voters future (kafka.server.ControllerServer)
  22. [2023-11-28 07:46:28,621] INFO [ControllerServer id=1] Finished waiting for controller quorum voters future (kafka.server.ControllerServer)
  23. [2023-11-28 07:46:28,659] INFO [controller-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
  24. [2023-11-28 07:46:28,660] INFO [controller-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
  25. [2023-11-28 07:46:28,661] INFO [controller-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
  26. [2023-11-28 07:46:28,662] INFO [controller-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
  27. [2023-11-28 07:46:28,678] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
  28. [2023-11-28 07:46:28,686] INFO [ControllerServer id=1] Waiting for the controller metadata publishers to be installed (kafka.server.ControllerServer)
  29. [2023-11-28 07:46:28,686] INFO [ControllerServer id=1] Finished waiting for the controller metadata publishers to be installed (kafka.server.ControllerServer)
  30. [2023-11-28 07:46:28,686] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
  31. [2023-11-28 07:46:28,686] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)
  32. [2023-11-28 07:46:28,690] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor)
  33. [2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)
  34. [2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)
  35. [2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)
  36. [2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)
  37. [2023-11-28 07:46:28,698] INFO [BrokerServer id=1] Transition from SHUTDOWN to STARTING (kafka.server.BrokerServer)
  38. [2023-11-28 07:46:28,699] INFO [BrokerServer id=1] Starting broker (kafka.server.BrokerServer)
  39. [2023-11-28 07:46:28,706] INFO [broker-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
  40. [2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
  41. [2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
  42. [2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
  43. [2023-11-28 07:46:28,724] INFO [BrokerServer id=1] Waiting for controller quorum voters future (kafka.server.BrokerServer)
  44. [2023-11-28 07:46:28,724] INFO [BrokerServer id=1] Finished waiting for controller quorum voters future (kafka.server.BrokerServer)
  45. [2023-11-28 07:46:28,729] INFO [broker-1-to-controller-forwarding-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)
  46. [2023-11-28 07:46:28,731] INFO [broker-1-to-controller-forwarding-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  47. [2023-11-28 07:46:28,755] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
  48. [2023-11-28 07:46:28,760] INFO [SocketServer listenerType=BROKER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
  49. [2023-11-28 07:46:28,764] INFO [broker-1-to-controller-alter-partition-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)
  50. [2023-11-28 07:46:28,764] INFO [broker-1-to-controller-alter-partition-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  51. [2023-11-28 07:46:28,782] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
  52. [2023-11-28 07:46:28,783] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
  53. [2023-11-28 07:46:28,784] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
  54. [2023-11-28 07:46:28,786] INFO [ExpirationReaper-1-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
  55. [2023-11-28 07:46:28,786] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
  56. [2023-11-28 07:46:28,786] INFO [ExpirationReaper-1-RemoteFetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
  57. [2023-11-28 07:46:28,801] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
  58. [2023-11-28 07:46:28,804] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
  59. [2023-11-28 07:46:28,836] INFO [broker-1-to-controller-heartbeat-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)
  60. [2023-11-28 07:46:28,836] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  61. [2023-11-28 07:46:28,837] INFO [BrokerLifecycleManager id=1] Incarnation rXokDA-kRI2e0TCw3qUr4g of broker 1 in cluster ktQqKm60RwiR-s4Dts0HDg is now STARTING. (kafka.server.BrokerLifecycleManager)
  62. [2023-11-28 07:46:28,857] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
  63. [2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Waiting for the broker metadata publishers to be installed (kafka.server.BrokerServer)
  64. [2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Finished waiting for the broker metadata publishers to be installed (kafka.server.BrokerServer)
  65. [2023-11-28 07:46:28,877] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
  66. [2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer)
  67. [2023-11-28 07:46:28,920] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
  68. [2023-11-28 07:46:28,921] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  69. [2023-11-28 07:46:28,972] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  70. [2023-11-28 07:46:28,977] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
  71. [2023-11-28 07:46:28,979] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
  72. [2023-11-28 07:46:28,979] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  73. [2023-11-28 07:46:29,029] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  74. [2023-11-28 07:46:29,035] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
  75. [2023-11-28 07:46:29,036] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  76. [2023-11-28 07:46:29,077] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
  77. [2023-11-28 07:46:29,086] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  78. [2023-11-28 07:46:29,091] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
  79. [2023-11-28 07:46:29,091] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  80. [2023-11-28 07:46:29,141] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  81. [2023-11-28 07:46:29,147] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
  82. [2023-11-28 07:46:29,147] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  83. [2023-11-28 07:46:29,178] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
  84. [2023-11-28 07:46:29,197] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  85. [2023-11-28 07:46:29,202] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
  86. [2023-11-28 07:46:29,202] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  87. [2023-11-28 07:46:29,253] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  88. [2023-11-28 07:46:29,270] INFO [BrokerToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient)
  89. [2023-11-28 07:46:29,271] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  90. [2023-11-28 07:46:29,276] INFO [RaftManager id=1] Registered the listener org.apache.kafka.image.loader.MetadataLoader@382374793 (org.apache.kafka.raft.KafkaRaftClient)
  91. [2023-11-28 07:46:29,276] INFO [RaftManager id=1] Registered the listener org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@859950147 (org.apache.kafka.raft.KafkaRaftClient)
  92. [2023-11-28 07:46:29,281] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
  93. [2023-11-28 07:46:29,288] INFO [MetadataLoader id=1] maybePublishMetadata(LOG_DELTA): The loader is still catching up because we have not loaded a controller record as of offset 0 and high water mark is 1 (org.apache.kafka.image.loader.MetadataLoader)
  94. [2023-11-28 07:46:29,320] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  95. [2023-11-28 07:46:29,332] INFO [MetadataLoader id=1] maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current high water mark of 5 (org.apache.kafka.image.loader.MetadataLoader)
  96. [2023-11-28 07:46:29,360] INFO [BrokerLifecycleManager id=1] Successfully registered broker 1 with broker epoch 5 (kafka.server.BrokerLifecycleManager)
  97. [2023-11-28 07:46:29,382] INFO [BrokerLifecycleManager id=1] The broker has caught up. Transitioning from STARTING to RECOVERY. (kafka.server.BrokerLifecycleManager)
  98. [2023-11-28 07:46:29,383] INFO [BrokerServer id=1] Finished waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer)
  99. [2023-11-28 07:46:29,383] INFO [BrokerServer id=1] Waiting for the initial broker metadata update to be published (kafka.server.BrokerServer)
  100. [2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing SnapshotGenerator with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
  101. [2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing FeaturesPublisher with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
  102. [2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DynamicConfigPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
  103. [2023-11-28 07:46:29,388] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DynamicClientQuotaPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
  104. [2023-11-28 07:46:29,389] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing ScramPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
  105. [2023-11-28 07:46:29,390] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DelegationTokenPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
  106. [2023-11-28 07:46:29,392] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing ControllerMetadataMetricsPublisher with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
  107. [2023-11-28 07:46:29,393] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing AclPublisher controller id=1 with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
  108. [2023-11-28 07:46:29,394] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing BrokerMetadataPublisher with a snapshot at offset 5 (org.apache.kafka.image.loader.MetadataLoader)
  109. [2023-11-28 07:46:29,394] INFO [BrokerMetadataPublisher id=1] Publishing initial metadata at offset OffsetAndEpoch(offset=5, epoch=1) with metadata.version 3.6-IV2. (kafka.server.metadata.BrokerMetadataPublisher)
  110. [2023-11-28 07:46:29,395] INFO [BrokerLifecycleManager id=1] The broker is in RECOVERY. (kafka.server.BrokerLifecycleManager)
  111. [2023-11-28 07:46:29,397] INFO Loading logs from log dirs ArraySeq(/tmp/kraft-combined-logs) (kafka.log.LogManager)
  112. [2023-11-28 07:46:29,402] INFO No logs found to be loaded in /tmp/kraft-combined-logs (kafka.log.LogManager)
  113. [2023-11-28 07:46:29,409] INFO Loaded 0 logs in 12ms (kafka.log.LogManager)
  114. [2023-11-28 07:46:29,410] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
  115. [2023-11-28 07:46:29,415] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
  116. [2023-11-28 07:46:29,555] INFO [kafka-log-cleaner-thread-0]: Starting (kafka.log.LogCleaner$CleanerThread)
  117. [2023-11-28 07:46:29,556] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
  118. [2023-11-28 07:46:29,557] INFO [AddPartitionsToTxnSenderThread-1]: Starting (kafka.server.AddPartitionsToTxnManager)
  119. [2023-11-28 07:46:29,557] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)
  120. [2023-11-28 07:46:29,561] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
  121. [2023-11-28 07:46:29,562] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
  122. [2023-11-28 07:46:29,563] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
  123. [2023-11-28 07:46:29,563] INFO [BrokerMetadataPublisher id=1] Updating metadata.version to 14 at offset OffsetAndEpoch(offset=5, epoch=1). (kafka.server.metadata.BrokerMetadataPublisher)
  124. [2023-11-28 07:46:29,566] INFO [TxnMarkerSenderThread-1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
  125. [2023-11-28 07:46:29,568] INFO [BrokerServer id=1] Finished waiting for the initial broker metadata update to be published (kafka.server.BrokerServer)
  126. [2023-11-28 07:46:29,570] INFO KafkaConfig values:
  127.         advertised.listeners = PLAINTEXT://localhost:9092
  128.         alter.config.policy.class.name = null
  129.         alter.log.dirs.replication.quota.window.num = 11
  130.         alter.log.dirs.replication.quota.window.size.seconds = 1
  131.         authorizer.class.name =
  132.         auto.create.topics.enable = true
  133.         auto.include.jmx.reporter = true
  134.         auto.leader.rebalance.enable = true
  135.         background.threads = 10
  136.         broker.heartbeat.interval.ms = 2000
  137.         broker.id = 1
  138.         broker.id.generation.enable = true
  139.         broker.rack = null
  140.         broker.session.timeout.ms = 9000
  141.         client.quota.callback.class = null
  142.         compression.type = producer
  143.         connection.failed.authentication.delay.ms = 100
  144.         connections.max.idle.ms = 600000
  145.         connections.max.reauth.ms = 0
  146.         control.plane.listener.name = null
  147.         controlled.shutdown.enable = true
  148.         controlled.shutdown.max.retries = 3
  149.         controlled.shutdown.retry.backoff.ms = 5000
  150.         controller.listener.names = CONTROLLER
  151.         controller.quorum.append.linger.ms = 25
  152.         controller.quorum.election.backoff.max.ms = 1000
  153.         controller.quorum.election.timeout.ms = 1000
  154.         controller.quorum.fetch.timeout.ms = 2000
  155.         controller.quorum.request.timeout.ms = 2000
  156.         controller.quorum.retry.backoff.ms = 20
  157.         controller.quorum.voters = [1@localhost:9093]
  158.         controller.quota.window.num = 11
  159.         controller.quota.window.size.seconds = 1
  160.         controller.socket.timeout.ms = 30000
  161.         create.topic.policy.class.name = null
  162.         default.replication.factor = 1
  163.         delegation.token.expiry.check.interval.ms = 3600000
  164.         delegation.token.expiry.time.ms = 86400000
  165.         delegation.token.master.key = null
  166.         delegation.token.max.lifetime.ms = 604800000
  167.         delegation.token.secret.key = null
  168.         delete.records.purgatory.purge.interval.requests = 1
  169.         delete.topic.enable = true
  170.         early.start.listeners = null
  171.         fetch.max.bytes = 57671680
  172.         fetch.purgatory.purge.interval.requests = 1000
  173.         group.consumer.assignors = [org.apache.kafka.coordinator.group.assignor.RangeAssignor]
  174.         group.consumer.heartbeat.interval.ms = 5000
  175.         group.consumer.max.heartbeat.interval.ms = 15000
  176.         group.consumer.max.session.timeout.ms = 60000
  177.         group.consumer.max.size = 2147483647
  178.         group.consumer.min.heartbeat.interval.ms = 5000
  179.         group.consumer.min.session.timeout.ms = 45000
  180.         group.consumer.session.timeout.ms = 45000
  181.         group.coordinator.new.enable = false
  182.         group.coordinator.threads = 1
  183.         group.initial.rebalance.delay.ms = 3000
  184.         group.max.session.timeout.ms = 1800000
  185.         group.max.size = 2147483647
  186.         group.min.session.timeout.ms = 6000
  187.         initial.broker.registration.timeout.ms = 60000
  188.         inter.broker.listener.name = PLAINTEXT
  189.         inter.broker.protocol.version = 3.6-IV2
  190.         kafka.metrics.polling.interval.secs = 10
  191.         kafka.metrics.reporters = []
  192.         leader.imbalance.check.interval.seconds = 300
  193.         leader.imbalance.per.broker.percentage = 10
  194.         listener.security.protocol.map = CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  195.         listeners = PLAINTEXT://:9092,CONTROLLER://:9093
  196.         log.cleaner.backoff.ms = 15000
  197.         log.cleaner.dedupe.buffer.size = 134217728
  198.         log.cleaner.delete.retention.ms = 86400000
  199.         log.cleaner.enable = true
  200.         log.cleaner.io.buffer.load.factor = 0.9
  201.         log.cleaner.io.buffer.size = 524288
  202.         log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
  203.         log.cleaner.max.compaction.lag.ms = 9223372036854775807
  204.         log.cleaner.min.cleanable.ratio = 0.5
  205.         log.cleaner.min.compaction.lag.ms = 0
  206.         log.cleaner.threads = 1
  207.         log.cleanup.policy = [delete]
  208.         log.dir = /tmp/kafka-logs
  209.         log.dirs = /tmp/kraft-combined-logs
  210.         log.flush.interval.messages = 9223372036854775807
  211.         log.flush.interval.ms = null
  212.         log.flush.offset.checkpoint.interval.ms = 60000
  213.         log.flush.scheduler.interval.ms = 9223372036854775807
  214.         log.flush.start.offset.checkpoint.interval.ms = 60000
  215.         log.index.interval.bytes = 4096
  216.         log.index.size.max.bytes = 10485760
  217.         log.local.retention.bytes = -2
  218.         log.local.retention.ms = -2
  219.         log.message.downconversion.enable = true
  220.         log.message.format.version = 3.0-IV1
  221.         log.message.timestamp.after.max.ms = 9223372036854775807
  222.         log.message.timestamp.before.max.ms = 9223372036854775807
  223.         log.message.timestamp.difference.max.ms = 9223372036854775807
  224.         log.message.timestamp.type = CreateTime
  225.         log.preallocate = false
  226.         log.retention.bytes = -1
  227.         log.retention.check.interval.ms = 300000
  228.         log.retention.hours = 168
  229.         log.retention.minutes = null
  230.         log.retention.ms = null
  231.         log.roll.hours = 168
  232.         log.roll.jitter.hours = 0
  233.         log.roll.jitter.ms = null
  234.         log.roll.ms = null
  235.         log.segment.bytes = 1073741824
  236.         log.segment.delete.delay.ms = 60000
  237.         max.connection.creation.rate = 2147483647
  238.         max.connections = 2147483647
  239.         max.connections.per.ip = 2147483647
  240.         max.connections.per.ip.overrides =
  241.         max.incremental.fetch.session.cache.slots = 1000
  242.         message.max.bytes = 1048588
  243.         metadata.log.dir = null
  244.         metadata.log.max.record.bytes.between.snapshots = 20971520
  245.         metadata.log.max.snapshot.interval.ms = 3600000
  246.         metadata.log.segment.bytes = 1073741824
  247.         metadata.log.segment.min.bytes = 8388608
  248.         metadata.log.segment.ms = 604800000
  249.         metadata.max.idle.interval.ms = 500
  250.         metadata.max.retention.bytes = 104857600
  251.         metadata.max.retention.ms = 604800000
  252.         metric.reporters = []
  253.         metrics.num.samples = 2
  254.         metrics.recording.level = INFO
  255.         metrics.sample.window.ms = 30000
  256.         min.insync.replicas = 1
  257.         node.id = 1
  258.         num.io.threads = 8
  259.         num.network.threads = 3
  260.         num.partitions = 1
  261.         num.recovery.threads.per.data.dir = 1
  262.         num.replica.alter.log.dirs.threads = null
  263.         num.replica.fetchers = 1
  264.         offset.metadata.max.bytes = 4096
  265.         offsets.commit.required.acks = -1
  266.         offsets.commit.timeout.ms = 5000
  267.         offsets.load.buffer.size = 5242880
  268.         offsets.retention.check.interval.ms = 600000
  269.         offsets.retention.minutes = 10080
  270.         offsets.topic.compression.codec = 0
  271.         offsets.topic.num.partitions = 50
  272.         offsets.topic.replication.factor = 1
  273.         offsets.topic.segment.bytes = 104857600
  274.         password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
  275.         password.encoder.iterations = 4096
  276.         password.encoder.key.length = 128
  277.         password.encoder.keyfactory.algorithm = null
  278.         password.encoder.old.secret = null
  279.         password.encoder.secret = null
  280.         principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
  281.         process.roles = [broker, controller]
  282.         producer.id.expiration.check.interval.ms = 600000
  283.         producer.id.expiration.ms = 86400000
  284.         producer.purgatory.purge.interval.requests = 1000
  285.         queued.max.request.bytes = -1
  286.         queued.max.requests = 500
  287.         quota.window.num = 11
  288.         quota.window.size.seconds = 1
  289.         remote.log.index.file.cache.total.size.bytes = 1073741824
  290.         remote.log.manager.task.interval.ms = 30000
  291.         remote.log.manager.task.retry.backoff.max.ms = 30000
  292.         remote.log.manager.task.retry.backoff.ms = 500
  293.         remote.log.manager.task.retry.jitter = 0.2
  294.         remote.log.manager.thread.pool.size = 10
  295.         remote.log.metadata.custom.metadata.max.bytes = 128
  296.         remote.log.metadata.manager.class.name = org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
  297.         remote.log.metadata.manager.class.path = null
  298.         remote.log.metadata.manager.impl.prefix = rlmm.config.
  299.         remote.log.metadata.manager.listener.name = null
  300.         remote.log.reader.max.pending.tasks = 100
  301.         remote.log.reader.threads = 10
  302.         remote.log.storage.manager.class.name = null
  303.         remote.log.storage.manager.class.path = null
  304.         remote.log.storage.manager.impl.prefix = rsm.config.
  305.         remote.log.storage.system.enable = false
  306.         replica.fetch.backoff.ms = 1000
  307.         replica.fetch.max.bytes = 1048576
  308.         replica.fetch.min.bytes = 1
  309.         replica.fetch.response.max.bytes = 10485760
  310.         replica.fetch.wait.max.ms = 500
  311.         replica.high.watermark.checkpoint.interval.ms = 5000
  312.         replica.lag.time.max.ms = 30000
  313.         replica.selector.class = null
  314.         replica.socket.receive.buffer.bytes = 65536
  315.         replica.socket.timeout.ms = 30000
  316.         replication.quota.window.num = 11
  317.         replication.quota.window.size.seconds = 1
  318.         request.timeout.ms = 30000
  319.         reserved.broker.max.id = 1000
  320.         sasl.client.callback.handler.class = null
  321.         sasl.enabled.mechanisms = [GSSAPI]
  322.         sasl.jaas.config = null
  323.         sasl.kerberos.kinit.cmd = /usr/bin/kinit
  324.         sasl.kerberos.min.time.before.relogin = 60000
  325.         sasl.kerberos.principal.to.local.rules = [DEFAULT]
  326.         sasl.kerberos.service.name = null
  327.         sasl.kerberos.ticket.renew.jitter = 0.05
  328.         sasl.kerberos.ticket.renew.window.factor = 0.8
  329.         sasl.login.callback.handler.class = null
  330.         sasl.login.class = null
  331.         sasl.login.connect.timeout.ms = null
  332.         sasl.login.read.timeout.ms = null
  333.         sasl.login.refresh.buffer.seconds = 300
  334.         sasl.login.refresh.min.period.seconds = 60
  335.         sasl.login.refresh.window.factor = 0.8
  336.         sasl.login.refresh.window.jitter = 0.05
  337.         sasl.login.retry.backoff.max.ms = 10000
  338.         sasl.login.retry.backoff.ms = 100
  339.         sasl.mechanism.controller.protocol = GSSAPI
  340.         sasl.mechanism.inter.broker.protocol = GSSAPI
  341.         sasl.oauthbearer.clock.skew.seconds = 30
  342.         sasl.oauthbearer.expected.audience = null
  343.         sasl.oauthbearer.expected.issuer = null
  344.         sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
  345.         sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
  346.         sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
  347.         sasl.oauthbearer.jwks.endpoint.url = null
  348.         sasl.oauthbearer.scope.claim.name = scope
  349.         sasl.oauthbearer.sub.claim.name = sub
  350.         sasl.oauthbearer.token.endpoint.url = null
  351.         sasl.server.callback.handler.class = null
  352.         sasl.server.max.receive.size = 524288
  353.         security.inter.broker.protocol = PLAINTEXT
  354.         security.providers = null
  355.         server.max.startup.time.ms = 9223372036854775807
  356.         socket.connection.setup.timeout.max.ms = 30000
  357.         socket.connection.setup.timeout.ms = 10000
  358.         socket.listen.backlog.size = 50
  359.         socket.receive.buffer.bytes = 102400
  360.         socket.request.max.bytes = 104857600
  361.         socket.send.buffer.bytes = 102400
  362.         ssl.cipher.suites = []
  363.         ssl.client.auth = none
  364.         ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
  365.         ssl.endpoint.identification.algorithm = https
  366.         ssl.engine.factory.class = null
  367.         ssl.key.password = null
  368.         ssl.keymanager.algorithm = SunX509
  369.         ssl.keystore.certificate.chain = null
  370.         ssl.keystore.key = null
  371.         ssl.keystore.location = null
  372.         ssl.keystore.password = null
  373.         ssl.keystore.type = JKS
  374.         ssl.principal.mapping.rules = DEFAULT
  375.         ssl.protocol = TLSv1.3
  376.         ssl.provider = null
  377.         ssl.secure.random.implementation = null
  378.         ssl.trustmanager.algorithm = PKIX
  379.         ssl.truststore.certificates = null
  380.         ssl.truststore.location = null
  381.         ssl.truststore.password = null
  382.         ssl.truststore.type = JKS
  383.         transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
  384.         transaction.max.timeout.ms = 900000
  385.         transaction.partition.verification.enable = true
  386.         transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
  387.         transaction.state.log.load.buffer.size = 5242880
  388.         transaction.state.log.min.isr = 1
  389.         transaction.state.log.num.partitions = 50
  390.         transaction.state.log.replication.factor = 1
  391.         transaction.state.log.segment.bytes = 104857600
  392.         transactional.id.expiration.ms = 604800000
  393.         unclean.leader.election.enable = false
  394.         unstable.api.versions.enable = false
  395.         zookeeper.clientCnxnSocket = null
  396.         zookeeper.connect = null
  397.         zookeeper.connection.timeout.ms = null
  398.         zookeeper.max.in.flight.requests = 10
  399.         zookeeper.metadata.migration.enable = false
  400.         zookeeper.session.timeout.ms = 18000
  401.         zookeeper.set.acl = false
  402.         zookeeper.ssl.cipher.suites = null
  403.         zookeeper.ssl.client.enable = false
  404.         zookeeper.ssl.crl.enable = false
  405.         zookeeper.ssl.enabled.protocols = null
  406.         zookeeper.ssl.endpoint.identification.algorithm = HTTPS
  407.         zookeeper.ssl.keystore.location = null
  408.         zookeeper.ssl.keystore.password = null
  409.         zookeeper.ssl.keystore.type = null
  410.         zookeeper.ssl.ocsp.enable = false
  411.         zookeeper.ssl.protocol = TLSv1.2
  412.         zookeeper.ssl.truststore.location = null
  413.         zookeeper.ssl.truststore.password = null
  414.         zookeeper.ssl.truststore.type = null
  415. (kafka.server.KafkaConfig)
  416. [2023-11-28 07:46:29,577] INFO [BrokerServer id=1] Waiting for the broker to be unfenced (kafka.server.BrokerServer)
  417. [2023-11-28 07:46:29,612] INFO [BrokerLifecycleManager id=1] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. (kafka.server.BrokerLifecycleManager)
  418. [2023-11-28 07:46:29,661] INFO [BrokerServer id=1] Finished waiting for the broker to be unfenced (kafka.server.BrokerServer)
  419. [2023-11-28 07:46:29,662] INFO authorizerStart completed for endpoint PLAINTEXT. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures)
  420. [2023-11-28 07:46:29,663] INFO [SocketServer listenerType=BROKER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)
  421. [2023-11-28 07:46:29,663] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
  422. [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
  423. [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
  424. [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
  425. [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
  426. [2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
  427. [2023-11-28 07:46:29,665] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)
  428. [2023-11-28 07:46:29,665] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)
  429. [2023-11-28 07:46:29,665] INFO Kafka startTimeMs: 1701157589664 (org.apache.kafka.common.utils.AppInfoParser)
  430. [2023-11-28 07:46:29,666] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
  431. [2023-11-28 07:53:16,542] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  432. [2023-11-28 07:53:16,543] INFO [BrokerLifecycleManager id=1] Unable to send a heartbeat because the RPC got timed out before it could be sent. (kafka.server.BrokerLifecycleManager)
复制代码
6. 测试 Kafka

创建一个主题(Topic)并发送/接收一些消息来测试 Kafka。例如,创建名为 test-topic 的主题:
  1. bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
复制代码
生产者发送消息到该主题:
  1. bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
复制代码
在另一个终端窗口中启动消费者以接收消息:
  1. bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning
复制代码

这些步骤将帮助你在 Ubuntu 上安装并启动 Kafka,并进行简单的测试以确保 Kafka 正常运行。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

去皮卡多

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表