Kafka消费者api编写教程

打印 上一主题 下一主题

主题 810|帖子 810|积分 2430

1.基本属性配置

输入new Properties().var 回车
  1. //创建属性
  2.         Properties properties = new Properties();
  3.        //连接集群
  4.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
  5.         //反序列化
  6.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  7.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
  8.         //指定消费者组id
  9.         properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");
复制代码

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

  1. //创建消费者
  2.         KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
复制代码

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

  1.         //创建一个数组列表变量接收topics值
  2.         ArrayList<String> topics = new ArrayList<>();
  3.         //指定要订阅的主题
  4.         topics.add("customers");
  5.         //订阅主题
  6.         kafkaConsumer.subscribe(topics);
复制代码
3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

  1. //消费数据
  2.         while (true){
  3.             //if (flag  == true) flag 标志位置
  4.             //break;
  5.             //}生产中退出循环的位置;
  6.             ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  7.             //将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历
  8.             for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
  9.                 System.out.println(consumerRecord);
  10.             }
  11.         }
复制代码
5.运行MyConsumer,通过生产者api发送消息


输出台上可以看到输出的都是订阅的主题/分区的信息

6.完备代码

  1. package com.ljr.kafka.replay;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.ArrayList;import java.util.Properties;public class MyConsumer {    public static void main(String[] args) {    //创建属性        Properties properties = new Properties();       //连接集群        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");        //反序列化        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());        //指定消费者组id        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");    //创建消费者
  2.         KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);    /*//订阅主题        //创建一个数组列表变量接收topics值
  3.         ArrayList<String> topics = new ArrayList<>();
  4.         //指定要订阅的主题
  5.         topics.add("customers");
  6.         //订阅主题
  7.         kafkaConsumer.subscribe(topics);*/    //订阅分区        //创建一个数组列表变量吸收主题分区值        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();        //指定要订阅的分区        topicPartitions.add(new TopicPartition("customers",2));        //订阅分区        kafkaConsumer.assign(topicPartitions);    //消费数据        while (true){            //if (flag  == true) flag 标记位置            //break;            //}生产中退出循环的位置;            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));            //将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){                System.out.println(consumerRecord);            }        }    }}
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

悠扬随风

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表