kafka java 小记录

打印 上一主题 下一主题

主题 1710|帖子 1710|积分 5130

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
本文章 方便日后检察使用。

创建kafka消费者(此处没啥可讲的,看表明吧。 手动版本的,更加灵活)
  1.     public KafkaConsumer<String, String> getCustomer() {
  2.         // 1. 配置属性参数
  3.         Properties properties = new Properties();
  4.         // 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口
  5.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  6.         // 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
  7.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  8.         // 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
  9.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  10.         // 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息
  11.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
  12.         // 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)
  13.         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  14.         // 设置消费者是否自动提交offset,true表示自动提交
  15.         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  16.         // 设置自动提交offset的时间间隔(单位:毫秒)
  17.         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  18.         // 设置每次poll操作返回的最大记录数
  19.         properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);
  20.         // 根据配置属性创建Kafka消费者实例
  21.         return new KafkaConsumer<>(properties);
  22.     }
复制代码
使用kafka开始消费
  1.     @Test
  2.     void KafkaConsumerTest() {
  3.         KafkaConsumer<String, String> consumer = kafkaNdpiCustomer.getCustomer();
  4.         // 订阅要消费的主题,这里是 "test Topic"
  5.         consumer.subscribe(Collections.singletonList("test-Topic"));
  6.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  7.         for (ConsumerRecord<String, String> record : records) {
  8.                 // 处理消息的逻辑
  9.                 System.out.printf("消息为:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  10. //                boolean flag = true;
  11. //                if (flag){
  12. //                    consumer.commitAsync();
  13. //                    break;
  14. //                }
  15.         }
  16.         consumer.close();
  17.         System.out.println("结束消费");
  18.     }
复制代码
其中  consumer.commitAsync(); 为异步提交,不选择则不会像kafka报告已经消费,可以重复使用,

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

何小豆儿在此

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表