下载地点:
Apache Kafka
Apache ZooKeeper
下载并解压缩Kafka
启动 ZooKeeper
当前版本 Kafka 软件内部依然依靠 ZooKeeper 进行多节点协调调度,以是启动 Katka软件之前,需要先启动 ZooKeeper 软件。不外因为Kafka 软件本身内置了 ZooKeeper 软件以是无需额外安装 ZooKeeper 软件,直接调用脚本下令启动即可。详细操作步骤如下:
- 进入 Kafka 解压缩文件夹的 config 目录,修改 zookeeper.properties 配置文件
- 新建
- 修改
- 启动下令:
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
直接把这条下令放到一个新建的zk.cmd文件当中,以后直接点击该cmd下令直接启动
启动Kafka:
- 修改server.properties:
- 配置cmd 启动文件
留意:启动先zookeeper再kafka,关闭先kafka再zookeeper
下令行操作:
调用D:\kafka\local\bin\windows下的kafka-topics.bat脚本创建主题topic:
kafka-topics.bat --bootstrap-server localhost:9092 --topic test123 --create
- kafka-topics.bat:这是 Kafka 提供的一个脚本,用于执行与 Kafka 主题相关的操作。.bat 扩展名表明这是一个 Windows 批处理文件。
- --bootstrap-server localhost:9092:这个参数指定了 Kafka 集群中的一个或多个代理服务器(broker)的地点和端口。在这个例子中,它指向本田主机(localhost)上的 Kafka 服务,端口号为 9092。
- --topic test123:这个参数指定了要创建的主题的名称。在这个例子中,主题名称为 test123。
- --create:这个参数告诉 Kafka 创建指定的主题。
创建生产者:
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test123
创建消耗者:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test123
代码演示
生产者代码:
- import org.apache.kafka.clients.producer.*;
- import java.util.Properties;
- public class SimpleProducer {
- public static void main(String[] args) {
- // 1. 配置生产者参数
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("acks", "all"); // 消息确认机制
- // 2. 创建生产者实例
- Producer<String, String> producer = new KafkaProducer<>(props);
- // 3. 发送消息
- for (int i = 0; i < 10; i++) {
- String message = "Message-" + i;
- ProducerRecord<String, String> record =
- new ProducerRecord<>("test123", "key-" + i, message);
- // 异步发送(带回调)
- producer.send(record, (metadata, exception) -> {
- if (exception == null) {
- System.out.printf("消息发送成功 -> topic:%s, partition:%d, offset:%d%n",
- metadata.topic(), metadata.partition(), metadata.offset());
- } else {
- exception.printStackTrace();
- }
- });
- }
- // 4. 关闭生产者
- producer.close();
- }
- }
复制代码 消耗者:
- import org.apache.kafka.clients.consumer.*;
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
- public class SimpleConsumer {
- public static void main(String[] args) {
- // 1. 配置消费者参数
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("group.id", "test-group"); // 消费者组ID
- props.put("auto.offset.reset", "earliest"); // 从最早的消息开始消费
- // 2. 创建消费者实例
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- // 3. 订阅主题
- consumer.subscribe(Collections.singletonList("test123"));
- // 4. 轮询消费消息
- try {
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("收到消息 -> topic:%s, partition:%d, offset:%d, key:%s, value:%s%n",
- record.topic(), record.partition(), record.offset(),
- record.key(), record.value());
- }
- }
- } finally {
- consumer.close();
- }
- }
- }
复制代码 运行后生产者收到消息:

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |