java集成kafka案例

打印 上一主题 下一主题

主题 1677|帖子 1677|积分 5031

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
要在 Java 项目中集成 Apache Kafka 以实现消息的生产和消耗,步调如下:
1. 引入 Maven 依靠
在您的 pom.xml 文件中添加以下依靠,以包罗 Kafka 客户端库:
  1. <dependencies>
  2.     <!-- Kafka Clients -->
  3.     <dependency>
  4.         <groupId>org.apache.kafka</groupId>
  5.         <artifactId>kafka-clients</artifactId>
  6.         <version>2.8.0</version>
  7.     </dependency>
  8.     <!-- 如果使用 Spring Boot,可添加以下依赖 -->
  9.     <dependency>
  10.         <groupId>org.springframework.kafka</groupId>
  11.         <artifactId>spring-kafka</artifactId>
  12.         <version>2.7.0</version>
  13.     </dependency>
  14. </dependencies>
复制代码
2. 配置 Kafka 生产者
首先,设置生产者的配置属性:
  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.Producer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.Properties;
  7. public class KafkaProducerExample {
  8.     public static void main(String[] args) {
  9.         // 配置属性
  10.         Properties props = new Properties();
  11.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  12.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14.         // 创建生产者
  15.         Producer<String, String> producer = new KafkaProducer<>(props);
  16.         // 发送消息
  17.         for (int i = 0; i < 10; i++) {
  18.             ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "key" + i, "value" + i);
  19.             producer.send(record);
  20.         }
  21.         // 关闭生产者
  22.         producer.close();
  23.     }
  24. }
复制代码
3. 配置 Kafka 消耗者
接下来,设置消耗者的配置属性,并订阅主题以消耗消息:
  1. import org.apache.kafka.clients.consumer.Consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import org.apache.kafka.common.serialization.StringDeserializer;
  6. import java.time.Duration;
  7. import java.util.Collections;
  8. import java.util.Properties;
  9. public class KafkaConsumerExample {
  10.     public static void main(String[] args) {
  11.         // 配置属性
  12.         Properties props = new Properties();
  13.         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  14.         props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");
  15.         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  16.         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  17.         // 创建消费者
  18.         Consumer<String, String> consumer = new KafkaConsumer<>(props);
  19.         // 订阅主题
  20.         consumer.subscribe(Collections.singletonList("your_topic"));
  21.         // 持续消费消息
  22.         try {
  23.             while (true) {
  24.                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  25.                 records.forEach(record -> {
  26.                     System.out.printf("Consumed message: key = %s, value = %s, offset = %d%n",
  27.                             record.key(), record.value(), record.offset());
  28.                 });
  29.             }
  30.         } finally {
  31.             // 关闭消费者
  32.             consumer.close();
  33.         }
  34.     }
  35. }
复制代码
4. 使用 Spring Boot 集成 Kafka
如果您使用 Spring Boot,可以通过配置 KafkaTemplate(用于生产消息)和使用 @KafkaListener 注解(用于消耗消息)来简化 Kafka 的集成。
生产者配置:
  1. import org.apache.kafka.clients.producer.ProducerConfig;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.kafka.core.ProducerFactory;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @Configuration
  11. public class KafkaProducerConfig {
  12.     @Bean
  13.     public ProducerFactory<String, String> producerFactory() {
  14.         Map<String, Object> configProps = new HashMap<>();
  15.         configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  16.         configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  17.         configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  18.         return new DefaultKafkaProducerFactory<>(configProps);
  19.     }
  20.     @Bean
  21.     public KafkaTemplate<String, String> kafkaTemplate() {
  22.         return new KafkaTemplate<>(producerFactory());
  23.     }
  24. }
复制代码
使用 KafkaTemplate 发送消息:
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaProducerService {
  6.     @Autowired
  7.     private KafkaTemplate<String, String> kafkaTemplate;
  8.     public void sendMessage(String topic, String key, String value) {
  9.         kafkaTemplate.send(topic, key, value);
  10.     }
  11. }
复制代码
消耗者配置:
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.kafka.annotation.EnableKafka;
  6. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  7. import org.springframework.kafka.core.ConsumerFactory;
  8. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. @EnableKafka
  12. @Configuration
  13. public class KafkaConsumerConfig {
  14.     @Bean
  15.     public ConsumerFactory<String, String> consumerFactory() {
  16.         Map<String, Object> props = new HashMap<>();
  17.         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  18.         props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");
  19.         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  20.         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  21.         return new DefaultKafkaConsumerFactory<>(props);
  22.     }
  23.     @Bean
  24.     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  25.         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  26.         factory.setConsumerFactory(consumerFactory());
  27.         return factory;
  28.     }
  29. }
复制代码
使用 @KafkaListener 消耗消息:
在 Spring Boot 中,@KafkaListener 注解用于监听指定的 Kafka 主题,并在收到消息时触发相应的方法。以下是一个基本示例:
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class KafkaConsumerService {
  5.     @KafkaListener(topics = "your_topic", groupId = "your_group_id")
  6.     public void listen(String message) {
  7.         System.out.println("Received message: " + message);
  8.         // 在此处添加处理逻辑
  9.     }
  10. }
复制代码
在上述代码中:


  • topics:指定要监听的 Kafka 主题。
  • groupId:指定消耗者组 ID。
listen 方法:当有新消息发布到指定主题时,该方法会被调用,message 参数包罗消息的内容。
批量消耗消息
如果希望一次处理多条消息,可以启用批量监听。首先,必要配置一个支持批量消耗的 KafkaListenerContainerFactory:
  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.kafka.annotation.EnableKafka;
  4. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  5. import org.springframework.kafka.core.ConsumerFactory;
  6. @EnableKafka
  7. @Configuration
  8. public class KafkaConsumerConfig {
  9.     @Bean
  10.     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
  11.             ConsumerFactory<String, String> consumerFactory) {
  12.         ConcurrentKafkaListenerContainerFactory<String, String> factory =
  13.                 new ConcurrentKafkaListenerContainerFactory<>();
  14.         factory.setConsumerFactory(consumerFactory);
  15.         factory.setBatchListener(true); // 启用批量监听
  16.         return factory;
  17.     }
  18. }
复制代码
然后,在消耗者服务中使用 @KafkaListener 注解,并指定使用上述配置的工厂:
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Service;
  3. import java.util.List;
  4. @Service
  5. public class KafkaBatchConsumerService {
  6.     @KafkaListener(
  7.         topics = "your_topic",
  8.         groupId = "your_group_id",
  9.         containerFactory = "kafkaListenerContainerFactory"
  10.     )
  11.     public void listen(List<String> messages) {
  12.         System.out.println("Received batch messages: " + messages);
  13.         // 在此处添加批量处理逻辑
  14.     }
  15. }
复制代码
在上述代码中:


  • containerFactory:指定使用支持批量消耗的工厂。
listen 方法的参数范例为 List<String>,用于接收一批消息。
控制消耗者的启动和停止
在某些情况下,大概必要在运行时控制 Kafka 消耗者的启动和停止。可以通过 KafkaListenerEndpointRegistry 来实现:
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.listener.KafkaListenerEndpointRegistry;
  3. import org.springframework.kafka.listener.MessageListenerContainer;
  4. import org.springframework.scheduling.annotation.Scheduled;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class KafkaListenerManager {
  8.     @Autowired
  9.     private KafkaListenerEndpointRegistry registry;
  10.     // 启动监听器
  11.     public void startListener(String listenerId) {
  12.         MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
  13.         if (listenerContainer != null && !listenerContainer.isRunning()) {
  14.             listenerContainer.start();
  15.         }
  16.     }
  17.     // 停止监听器
  18.     public void stopListener(String listenerId) {
  19.         MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
  20.         if (listenerContainer != null && listenerContainer.isRunning()) {
  21.             listenerContainer.stop();
  22.         }
  23.     }
  24. }
复制代码
在上述代码中:


  • startListener 方法用于启动指定的监听器。
  • stopListener 方法用于停止指定的监听器。
  • listenerId 对应于 @KafkaListener 注解中的 id 属性。
通过这种方式,可以在应用运行时根据必要动态地控制 Kafka 消耗者的举动。
参考资料


  • Spring for Apache Kafka: @KafkaListener 的使用示例(消耗)
  • SpringBoot + Kafka 使用@KafkaListener注解批量消耗
  • 在Spring Boot 中动态管理Kafka Listener
通过上述配置和代码示例,可以在 Spring Boot 项目中有用地集成 Kafka,实现消息的生产和消耗功能。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万有斥力

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表