kafka用java收发消息

打印 上一主题 下一主题

主题 863|帖子 863|积分 2589

Kafka 是一个分布式流处理平台,它允许你发布和订阅记录流,类似于消息队列或企业消息系统。下面我将为你展示如何使用 Java 来发送(生产者)和接收(消耗者)Kafka 中的消息。
1. 添加依赖

首先,你需要将 Kafka 客户端的依赖添加到你的 Java 项目中。如果你使用 Maven,可以添加以下依赖到你的 pom.xml 文件中:
  1. <dependencies>
  2.     <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  3.     <dependency>
  4.         <groupId>org.apache.kafka</groupId>
  5.         <artifactId>kafka-clients</artifactId>
  6.         <version>你的Kafka客户端版本号</version>
  7.     </dependency>
  8.     <!-- 其他依赖... -->
  9. </dependencies>
复制代码
确保将 你的Kafka客户端版本号 更换为当前你需要的 Kafka 客户端版本。
2. 发送消息(生产者)

以下是一个简单的 Kafka 生产者示例,用于向 Kafka 主题发送消息:
  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class KafkaProducerExample {
  4.     public static void main(String[] args) {
  5.         Properties props = new Properties();
  6.         props.put("bootstrap.servers", "localhost:9092");
  7.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  8.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  9.         Producer<String, String> producer = new KafkaProducer<>(props);
  10.         for (int i = 0; i < 100; i++) {
  11.             ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", Integer.toString(i), "Hello Kafka " + i);
  12.             producer.send(record, new Callback() {
  13.                 @Override
  14.                 public void onCompletion(RecordMetadata metadata, Exception exception) {
  15.                     if (exception != null) {
  16.                         exception.printStackTrace();
  17.                     } else {
  18.                         System.out.printf("Record sent to partition %d with offset %d%n", metadata.partition(), metadata.offset());
  19.                     }
  20.                 }
  21.             });
  22.         }
  23.         producer.close();
  24.     }
  25. }
复制代码
3. 接收消息(消耗者)

以下是一个简单的 Kafka 消耗者示例,用于从 Kafka 主题接收消息:
  1. import org.apache.kafka.clients.consumer.*;
  2. import org.apache.kafka.common.TopicPartition;
  3. import java.time.Duration;
  4. import java.util.Arrays;
  5. import java.util.Collections;
  6. import java.util.Properties;
  7. public class KafkaConsumerExample {
  8.     public static void main(String[] args) {
  9.         Properties props = new Properties();
  10.         props.put("bootstrap.servers", "localhost:9092");
  11.         props.put("group.id", "test");
  12.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  13.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  14.         Consumer<String, String> consumer = new KafkaConsumer<>(props);
  15.         consumer.subscribe(Collections.singletonList("my-topic"));
  16.         while (true) {
  17.             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  18.             for (ConsumerRecord<String, String> record : records) {
  19.                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  20.             }
  21.         }
  22.     }
  23. }
复制代码
注意:在实际应用中,你大概需要处理更多的异常和关闭资源,以及使用更复杂的配置和分区策略。上面的示例只是为了展示根本的使用方式。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

反转基因福娃

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表