Kafka-1

打印 上一主题 下一主题

主题 988|帖子 988|积分 2964

下载地点:
Apache Kafka
Apache ZooKeeper



  • 消息分区有序,根据offset偏移量顺序消耗消息

下载并解压缩Kafka


启动 ZooKeeper

当前版本 Kafka 软件内部依然依靠 ZooKeeper 进行多节点协调调度,以是启动 Katka软件之前,需要先启动 ZooKeeper 软件。不外因为Kafka 软件本身内置了 ZooKeeper 软件以是无需额外安装 ZooKeeper 软件,直接调用脚本下令启动即可。详细操作步骤如下:


  • 进入 Kafka 解压缩文件夹的 config 目录,修改 zookeeper.properties 配置文件

  •  新建

  • 修改

  • 启动下令:
    call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
    直接把这条下令放到一个新建的zk.cmd文件当中,以后直接点击该cmd下令直接启动

启动Kafka:



  •  修改server.properties:

  • 配置cmd 启动文件

 留意:启动先zookeeper再kafka,关闭先kafka再zookeeper

下令行操作:

调用D:\kafka\local\bin\windows下的kafka-topics.bat脚本创建主题topic:
   kafka-topics.bat --bootstrap-server localhost:9092 --topic test123 --create 
  

  • kafka-topics.bat:这是 Kafka 提供的一个脚本,用于执行与 Kafka 主题相关的操作。.bat 扩展名表明这是一个 Windows 批处理文件。
  • --bootstrap-server localhost:9092:这个参数指定了 Kafka 集群中的一个或多个代理服务器(broker)的地点和端口。在这个例子中,它指向本田主机(localhost)上的 Kafka 服务,端口号为 9092。
  • --topic test123:这个参数指定了要创建的主题的名称。在这个例子中,主题名称为 test123。
  • --create:这个参数告诉 Kafka 创建指定的主题。
 创建生产者:

   kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test123
  

创建消耗者:

   kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test123
  


代码演示 

生产者代码:
  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class SimpleProducer {
  4.     public static void main(String[] args) {
  5.         // 1. 配置生产者参数
  6.         Properties props = new Properties();
  7.         props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址
  8.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  9.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10.         props.put("acks", "all"); // 消息确认机制
  11.         // 2. 创建生产者实例
  12.         Producer<String, String> producer = new KafkaProducer<>(props);
  13.         // 3. 发送消息
  14.         for (int i = 0; i < 10; i++) {
  15.             String message = "Message-" + i;
  16.             ProducerRecord<String, String> record =
  17.                 new ProducerRecord<>("test123", "key-" + i, message);
  18.             // 异步发送(带回调)
  19.             producer.send(record, (metadata, exception) -> {
  20.                 if (exception == null) {
  21.                     System.out.printf("消息发送成功 -> topic:%s, partition:%d, offset:%d%n",
  22.                             metadata.topic(), metadata.partition(), metadata.offset());
  23.                 } else {
  24.                     exception.printStackTrace();
  25.                 }
  26.             });
  27.         }
  28.         // 4. 关闭生产者
  29.         producer.close();
  30.     }
  31. }
复制代码
 消耗者:
  1. import org.apache.kafka.clients.consumer.*;
  2. import java.time.Duration;
  3. import java.util.Collections;
  4. import java.util.Properties;
  5. public class SimpleConsumer {
  6.     public static void main(String[] args) {
  7.         // 1. 配置消费者参数
  8.         Properties props = new Properties();
  9.         props.put("bootstrap.servers", "localhost:9092");
  10.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  11.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  12.         props.put("group.id", "test-group"); // 消费者组ID
  13.         props.put("auto.offset.reset", "earliest"); // 从最早的消息开始消费
  14.         // 2. 创建消费者实例
  15.         Consumer<String, String> consumer = new KafkaConsumer<>(props);
  16.         // 3. 订阅主题
  17.         consumer.subscribe(Collections.singletonList("test123"));
  18.         // 4. 轮询消费消息
  19.         try {
  20.             while (true) {
  21.                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  22.                 for (ConsumerRecord<String, String> record : records) {
  23.                     System.out.printf("收到消息 -> topic:%s, partition:%d, offset:%d, key:%s, value:%s%n",
  24.                             record.topic(), record.partition(), record.offset(),
  25.                             record.key(), record.value());
  26.                 }
  27.             }
  28.         } finally {
  29.             consumer.close();
  30.         }
  31.     }
  32. }
复制代码
运行后生产者收到消息:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

慢吞云雾缓吐愁

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表