Kafka 是一个分布式流处理平台,它允许你发布和订阅记录流,类似于消息队列或企业消息系统。下面我将为你展示如何使用 Java 来发送(生产者)和接收(消耗者)Kafka 中的消息。
1. 添加依赖
首先,你需要将 Kafka 客户端的依赖添加到你的 Java 项目中。如果你使用 Maven,可以添加以下依赖到你的 pom.xml 文件中:
- <dependencies>
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>你的Kafka客户端版本号</version>
- </dependency>
- <!-- 其他依赖... -->
- </dependencies>
复制代码 确保将 你的Kafka客户端版本号 更换为当前你需要的 Kafka 客户端版本。
2. 发送消息(生产者)
以下是一个简单的 Kafka 生产者示例,用于向 Kafka 主题发送消息:
- import org.apache.kafka.clients.producer.*;
- import java.util.Properties;
- public class KafkaProducerExample {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<String, String> producer = new KafkaProducer<>(props);
- for (int i = 0; i < 100; i++) {
- ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", Integer.toString(i), "Hello Kafka " + i);
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- exception.printStackTrace();
- } else {
- System.out.printf("Record sent to partition %d with offset %d%n", metadata.partition(), metadata.offset());
- }
- }
- });
- }
- producer.close();
- }
- }
复制代码 3. 接收消息(消耗者)
以下是一个简单的 Kafka 消耗者示例,用于从 Kafka 主题接收消息:
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.Properties;
- public class KafkaConsumerExample {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("group.id", "test");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Collections.singletonList("my-topic"));
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- }
- }
复制代码 注意:在实际应用中,你大概需要处理更多的异常和关闭资源,以及使用更复杂的配置和分区策略。上面的示例只是为了展示根本的使用方式。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |