Kafka技术详解[3]: 生产与消费数据

十念  金牌会员 | 2024-11-7 20:35:38 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 887|帖子 887|积分 2661

目次
Kafka 生产与消费数据详解
 生产数据
 下令行操作
 工具操作
Java API
 消费数据
 下令行操作
Java API


Kafka 生产与消费数据详解

 生产数据

一旦消息主题创建完成,就可以通过Kafka客户端向Kafka服务器的主题中发送消息。Kafka生产者客户端是一套API接口,任何能够通过这些接口连接Kafka并发送数据的组件都可以称为Kafka生产者。以下是几种差别的生产数据方式:
 下令行操作


  • 打开DOS窗口,进入E:\kafka_2.12-3.6.1\bin\windows目次。
  • 在DOS窗口输入指令,进入生产者控制台。Kafka通过kafka-console-producer.bat文件进行消息生产者操作。必须传递的参数包罗:

    • --bootstrap-server: 连接Kafka服务器,默认端口为9092,参数值为localhost:9092。
    • --topic: 已经创建好的主题名称。
    指令如下:
    1. kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
    复制代码

  • 控制台生产数据时,每次输入的数据需要回车确认才能发送到Kafka服务器。
 工具操作

对于需要更直观操作的情况,可以使用专门的工具进行快速访问,比方kafkatool_64bit.exe。

  • 安装该工具后,打开工具。
  • 点击左上角按钮File -> Add New Connection...建立连接。
  • 点击Test按钮测试连接。
  • 增加连接后,按照工具中的步骤生产数据。
  • 增加乐成后,点击绿色箭头按钮进行查询,工具会显示当前数据。
Java API

通常,也可通过Java程序来生产数据。以下是在IDEA中使用Kafka Java API来生产数据的示例:

  • 创建Kafka项目。
  • 修改pom.xml文件,增加Maven依赖。
    1. <dependencies>
    2.     <dependency>
    3.         <groupId>org.apache.kafka</groupId>
    4.         <artifactId>kafka-clients</artifactId>
    5.         <version>3.6.1</version>
    6.     </dependency>
    7. </dependencies>
    复制代码
  • 创建com.lzl.kafka.test.KafkaProducerTest类,并添加main方法以及生产者代码。
    1. package com.lzl.kafka.test;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.ProducerConfig;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import java.util.HashMap;
    6. import java.util.Map;
    7. public class KafkaProducerTest {
    8.     public static void main(String[] args) {
    9.         // 配置属性集合
    10.         Map<String, Object> configMap = new HashMap<>();
    11.         // 配置属性:Kafka服务器集群地址
    12.         configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    13.         // 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
    14.         configMap.put(
    15.                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    16.                 "org.apache.kafka.common.serialization.StringSerializer");
    17.         configMap.put(
    18.                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    19.                 "org.apache.kafka.common.serialization.StringSerializer");
    20.         // 创建Kafka生产者对象,建立Kafka连接
    21.         KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
    22.         // 准备数据,定义泛型
    23.         // 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
    24.         ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1", "value1");
    25.         // 生产(发送)数据
    26.         producer.send(record);
    27.         // 关闭生产者连接
    28.         producer.close();
    29.     }
    30. }
    复制代码
 消费数据

一旦消息通过生产者客户端发送到了Kafka服务器,就可以通过消费者客户端对特定主题的消息进行消费。
 下令行操作


  • 打开DOS窗口,进入E:\kafka_2.12-3.6.1\bin\windows目次。
  • 输入指令,进入消费者控制台。使用kafka-console-consumer.bat文件进行消息消费者操作。必须的参数包罗:

    • --bootstrap-server: 连接Kafka服务器,默认端口为9092。
    • --topic: 主题名称。
    • --from-beginning: 标记参数,确保从第一条数据开始消费。
    指令如下:
    1. kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
    复制代码

Java API

同样地,可以通过Java程序来消费数据。以下是在IDEA中使用Kafka Java API消费数据的示例:

  • 创建Maven项目并增加Kafka依赖。
  • 创建com.lzl.kafka.test.KafkaConsumerTest类,并添加main方法以及消费者代码。
    1. package com.lzl.kafka.test;
    2. import org.apache.kafka.clients.consumer.ConsumerConfig;
    3. import org.apache.kafka.clients.consumer.ConsumerRecord;
    4. import org.apache.kafka.clients.consumer.ConsumerRecords;
    5. import org.apache.kafka.clients.consumer.KafkaConsumer;
    6. import java.time.Duration;
    7. import java.util.Arrays;
    8. import java.util.Collections;
    9. import java.util.HashMap;
    10. import java.util.Map;
    11. public class KafkaConsumerTest {
    12.     public static void main(String[] args) {
    13.         // 配置属性集合
    14.         Map<String, Object> configMap = new HashMap<String, Object>();
    15.         // 配置属性:Kafka集群地址
    16.         configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    17.         // 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化
    18.         configMap.put(
    19.                 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    20.                 "org.apache.kafka.common.serialization.StringDeserializer");
    21.         configMap.put(
    22.                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    23.                 "org.apache.kafka.common.serialization.StringDeserializer");
    24.         // 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)
    25.         configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    26.         // 配置属性: 消费者组
    27.         configMap.put("group.id", "lzl");
    28.         // 配置属性: 自动提交偏移量
    29.         configMap.put("enable.auto.commit", "true");
    30.         // 创建消费者对象
    31.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configMap);
    32.         // 消费者订阅指定主题的数据
    33.         consumer.subscribe(Collections.singletonList("test"));
    34.         while (true) {
    35.             // 每隔100毫秒,抓取一次数据
    36.             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    37.             // 打印抓取的数据
    38.             for (ConsumerRecord<String, String> record : records) {
    39.                 System.out.println("K = " + record.key() + ", V = " + record.value());
    40.             }
    41.         }
    42.     }
    43. }
    复制代码

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

十念

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表