悠扬随风 发表于 2024-6-25 03:53:24

Kafka消费者api编写教程

1.基本属性配置

输入new Properties().var 回车
//创建属性

        Properties properties = new Properties();

       //连接集群

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");

        //反序列化

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //指定消费者组id

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");
2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称
//创建消费者
      KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics
      //创建一个数组列表变量接收topics值
      ArrayList<String> topics = new ArrayList<>();
      //指定要订阅的主题
      topics.add("customers");
      //订阅主题
      kafkaConsumer.subscribe(topics); 3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions
4.消费数据

//消费数据
      while (true){
            //if (flag== true) flag 标志位置
            //break;
            //}生产中退出循环的位置;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                System.out.println(consumerRecord);
            }
      }
5.运行MyConsumer,通过生产者api发送消息

https://img-blog.csdnimg.cn/direct/18c53f239f5b47c0b727f3b3a6656d47.png
输出台上可以看到输出的都是订阅的主题/分区的信息
6.完备代码

package com.ljr.kafka.replay;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.ArrayList;import java.util.Properties;public class MyConsumer {    public static void main(String[] args) {    //创建属性      Properties properties = new Properties();       //连接集群      properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");      //反序列化      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());      //指定消费者组id      properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");    //创建消费者
      KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);    /*//订阅主题      //创建一个数组列表变量接收topics值
      ArrayList<String> topics = new ArrayList<>();
      //指定要订阅的主题
      topics.add("customers");
      //订阅主题
      kafkaConsumer.subscribe(topics);*/    //订阅分区      //创建一个数组列表变量吸收主题分区值      ArrayList<TopicPartition> topicPartitions = new ArrayList<>();      //指定要订阅的分区      topicPartitions.add(new TopicPartition("customers",2));      //订阅分区      kafkaConsumer.assign(topicPartitions);    //消费数据      while (true){            //if (flag== true) flag 标记位置            //break;            //}生产中退出循环的位置;            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));            //将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){                System.out.println(consumerRecord);            }      }    }}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Kafka消费者api编写教程