IT评测·应用市场-qidao123.com

标题: Kafka简单入门02——ISR机制 [打印本页]

作者: 惊雷无声    时间: 2024-7-31 11:19
标题: Kafka简单入门02——ISR机制
目次
ISR机制
ISR 关键概念
HW和LEO
Java利用Kafka通讯
Kafka 生产者示例
Kafka 消耗者示例


ISR机制

Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的向导者(Leader)和追随者(Follower)副本,这些副本与向导者保持数据同步。
ISR 关键概念

其中,需要留意的的概念:

HW和LEO

在 Kafka 中,HW(High Watermark)和 LEO(Log End Offset)是与数据复制和消耗有关的两个重要概念。
HW(High Watermark):HW 是指在分区中,已经被所有追随者(Follower)副本复制的消息的位置。HW 是每个分区的属性,它体现已经提交的消息。只有在 HW 之前的消息才被以为是已经提交的,这些消息已经被写入分区的所有追随者副本,而且被以为是安全的,不会丢失。HW 是为了确保数据一致性和可靠性而引入的。
LEO(Log End Offset):LEO 是指在分区中当前最新消息的位置。LEO 体现分区日志中的末了一条消息的偏移量。LEO 包括已经被写入但尚未被所有追随者副本复制的消息,以及正在等待被写入的消息。LEO 是一个动态的属性,它会随着新消息的写入而渐渐增加。
HW 和 LEO 之间的关系非常重要,它们可以帮助确保数据的可靠性和一致性:

Kafka的消息同步流程:
可以看出,Kafka的复制机制既不是完全的同步复制,也不是单纯异步复制。

而Kafka利用的ISR机制则有效地权衡了数据可靠性和性能之间的关系。
Java利用Kafka通讯

以下是 Kafka 生产者和消耗者的简单示例,利用 Kafka 的 Java 客户端库(Kafka Producer 和 Kafka Consumer)来创建一个基本的消息通报示例。
Kafka 生产者示例

  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class KafkaProducerExample {
  4.    public static void main(String[] args) {
  5.        String bootstrapServers = "localhost:9092"; // Kafka 服务器地址
  6.        String topic = "my-topic"; // Kafka 主题名称
  7.        Properties properties = new Properties();
  8.        properties.put("bootstrap.servers", bootstrapServers);
  9.        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10.        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  11.        Producer<String, String> producer = new KafkaProducer<>(properties);
  12.        // 发送消息
  13.        producer.send(new ProducerRecord<>(topic, "key", "Hello, Kafka!"), (metadata, exception) -> {
  14.            if (exception == null) {
  15.                System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());
  16.            } else {
  17.                System.err.println("Error sending message: " + exception.getMessage());
  18.            }
  19.        });
  20.        producer.close();
  21.    }
  22. }
复制代码
Kafka 消耗者示例

  1. import org.apache.kafka.clients.consumer.*;
  2. import java.util.Properties;
  3. import java.time.Duration;
  4. import java.util.Collections;
  5. public class KafkaConsumerExample {
  6.    public static void main(String[] args) {
  7.        String bootstrapServers = "localhost:9092"; // Kafka 服务器地址
  8.        String groupId = "my-group"; // 消费者组 ID
  9.        String topic = "my-topic"; // Kafka 主题名称
  10.        Properties properties = new Properties();
  11.        properties.put("bootstrap.servers", bootstrapServers);
  12.        properties.put("group.id", groupId);
  13.        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  14.        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  15.        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  16.        consumer.subscribe(Collections.singletonList(topic));
  17.        while (true) {
  18.            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  19.            for (ConsumerRecord<String, String> record : records) {
  20.                System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
  21.            }
  22.        }
  23.    }
  24. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4