kafka服务介绍

打印 上一主题 下一主题

主题 713|帖子 713|积分 2143

Apache Kafka 是一个开源的分布式事件流平台,主要用于及时数据传输和流处理。它最初由 LinkedIn 开辟,并在 2011 年成为 Apache 基金会的顶级项目。Kafka 计划的目的是处理大规模的数据流,同时提供高吞吐量、低延迟和高容错性
Kafka 的工作原理可以从几个关键方面来明白:



    • 消息的生产
       

    • Producer:生产者是发送消息的客户端,向 Kafka 发送消息。生产者将消息发布到特定的主题(Topic)中,主题内部可以有多个分区(Partition)。
    • Partitioning:消息会被分配到差异的分区。分区是主题的逻辑分片,有助于进步并发处理能力。消息的分配通常基于某种计谋,好比消息键(key)大概轮询。


    • 消息的存储
       

    • Broker:Kafka 的服务器组件,负责存储和管理消息。每个 Kafka 实例都是一个 Broker。多个 Brokers 构成 Kafka 集群。
    • Log Segments:在每个分区中,消息被追加到日记(log)中。日记是一个有序的、不可变的消息序列。为了管理日记文件的大小,Kafka 会定期将日记分段成多个文件。


    • 消息的消费
       

    • Consumer:消费者从 Kafka 中读取消息。消费者订阅一个或多个主题,然后拉取(fetch)数据。
    • Consumer Group:消费者可以构成一个消费组(Consumer Group)。同一消费组中的消费者会分担读取同一个主题的差异分区的任务,从而实现负载均衡。每个消息只会被消费组中的一个消费者读取。


    • 消息的存储与复制
       

    • Replication:为了进步数据的可靠性,Kafka 利用副本(replica)。每个分区有一个主副本(leader)和若干个从副本(follower)。所有的读写请求都由主副本处理,从副本则从主副本同步数据。
    • Leader-Follower:在分区中,领导者负责所有的读写请求,从副本负责从领导者同步数据。领导者失败时,从副本会推举新的领导者,确保高可用性。


    • 和谐与管理
       

    • Zookeeper:早期 Kafka 利用 Zookeeper 来管理集群的元数据和和谐集群中的 Broker 和分区状态。Zookeeper 负责领导者的推举、配置管理和状态监控。
    • Kafka 2.8.0 及以后:Kafka 开始渐渐减少对 Zookeeper 的依赖,尝试用 Kafka 自身的协议来管理集群的元数据,这种模式称为 KRaft 模式(Kafka Raft Metadata Mode)。

数据流示例


  • 生产:生产者将消息发送到一个主题,主题有多个分区。每条消息附带一个时间戳。
  • 存储:消息被追加到分区的日记中。日记分为多个段,Kafka 按次序存储消息。
  • 消费:消费者从主题的分区中读取消息。消费者可以跟踪已处理的消息位置,以实现断点续传。
  • 复制:数据在主副本和从副本之间同步,包管数据在 Broker 失败时不会丢失。
通过这些机制,Kafka 能够实现高吞吐量、低延迟和高可靠性的消息转达和数据流处理。
安装利用

Kafka 依赖于 Java 运行环境,因此首先需要安装 Java 11 或更高版本
  1. apt install -y openjdk-11-jdk
复制代码
  1. root@huhy:~# java --version
  2. openjdk 11.0.23 2024-04-16
  3. OpenJDK Runtime Environment (build 11.0.23+9-post-Ubuntu-1ubuntu1)
  4. OpenJDK 64-Bit Server VM (build 11.0.23+9-post-Ubuntu-1ubuntu1, mixed mode, sharing)
复制代码
官网下载;https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz
  1. tar -xf kafka_2.13-3.7.1.tgz
复制代码
  1. cd kafka_2.13-3.7.1/
复制代码
Kafka 的配置文件位于 config 目次中。主要的配置文件包括:


  • server.properties:Kafka 的服务器配置文件
  • zookeeper.properties:Zookeeper 的配置文件(如果利用 Zookeeper)

    • broker.id:描述:Kafka Broker 的唯一标识符。每个 Broker 必须有一个唯一的 broker.id。
      默认值:无
      示例:broker.id=0
    • listeners:描述:Kafka Broker 监听的地点和端口。指定了 Kafka 接收客户端请求的地点。
      默认值:PLAINTEXT://:9092
      示例:listeners=PLAINTEXT://localhost:9092
    • advertised.listeners:描述:Kafka 向客户端公开的地点。客户端会通过此地点与 Broker 进行通讯。
      默认值:无
      示例:advertised.listeners=PLAINTEXT://your-hostname:9092
    • log.dirs:
      描述:Kafka 存储日记文件的目次。可以设置多个目次,Kafka 会将数据分散存储。
      默认值:/tmp/kafka-logs
      示例:log.dirs=/var/lib/kafka/logs
    • log.retention.hours:
      描述:日记文件的生存时间,单位是小时。超过这个时间的数据会被删除。
      默认值:168(7 天)
      示例:log.retention.hours=168
    • log.segment.bytes:
      描述:每个日记段的大小,单位是字节。日记文件会被分段存储。
      默认值:1073741824(1 GB)
      示例:log.segment.bytes=536870912(512 MB)
    • num.partitions:
      描述:默认创建的主题的分区数目。
      默认值:1
      示例:num.partitions=3
    • replication.factor:
      描述:主题的副本因子,表示每个分区有多少副本。进步副本因子可以增长数据的可靠性。
      默认值:无(主题创建时指定)
      示例:replication.factor=2
    • message.max.bytes:
      描述:Kafka 答应的最大消息大小,单位是字节。
      默认值:1000012(1 MB)
      示例:message.max.bytes=2097152(2 MB)
    • log.retention.bytes:
      描述:每个分区的日记文件最大生存大小,超过这个大小的日记会被删除。
      默认值:-1(不限制)
      示例:log.retention.bytes=1073741824(1 GB)
    • log.cleaner.enable:
      描述:启用日记清算器,用于压缩日记中的重复数据。
      默认值:false
      示例:log.cleaner.enable=true
    • security.inter.broker.protocol:
      描述:Kafka Broker 之间的通讯协议,支持 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。
      默认值:PLAINTEXT
      示例:security.inter.broker.protocol=SSL
    • ssl.keystore.location:
      描述:SSL 密钥库的位置,用于 SSL/TLS 加密。
      默认值:无
      示例:ssl.keystore.location=/path/to/keystore.jks
    • zookeeper.connect:
      描述:Zookeeper 的连接字符串,包括 Zookeeper 的主机名和端标语。
      默认值:localhost:2181
      示例:zookeeper.connect=localhost:2181
    • zookeeper.connection.timeout.ms:
      描述:Zookeeper 连接超时设置,单位是毫秒。
      默认值:6000
      示例:zookeeper.connection.timeout.ms=10000
    • auto.create.topics.enable:
      描述:是否自动创建主题。如果设置为 true,当客户端向不存在的主题发送消息时,Kafka 会自动创建该主题。
      默认值:true
      示例:auto.create.topics.enable=false
    • delete.topic.enable:
      描述:是否答应删除主题。如果设置为 true,可以通过 Kafka 提供的脚本删除主题。
      默认值:false
      示例:delete.topic.enable=true

通常环境下,默认配置就可以开始利用。如果需要自界说配置,可以编辑这些文件
启动 Zookeeper;Kafka 需要 Zookeeper 来管理集群的元数据。Kafka 附带了一个简单的 Zookeeper 实例,开启后另起一个终端
  1. bin/zookeeper-server-start.sh config/zookeeper.properties
复制代码
启动 Kafka Broker;在另一个终端中,启动 Kafka Broker
  1. bin/kafka-server-start.sh config/server.properties
复制代码
另起一个终端3;Kafka 利用主题来组织消息。可以利用 Kafka 提供的脚本创建主题。例如,创建一个名为 test-topic 的主题:
  1. bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
复制代码
  1. root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  2. Created topic test-topic.
复制代码
生产消息:可以利用 Kafka 提供的生产者工具向主题中发送消息。打开一个终端并运行,然后在终端4中输入消息并按回车键发送消息
  1. bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
复制代码
  1. root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
  2. >huhy>
复制代码
消费消息;在另一个终端3中,可以运行消费者工具来读取消息,只有最开始两个终端是不能终端,后两个有交互界面可以直接用
  1. root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
  2. huhy
复制代码
获取信息如下

管理 Kafka

检察主题:
  1. bin/kafka-topics.sh --list --bootstrap-server localhost:9092
复制代码
  1. root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  2. __consumer_offsetstest-topic
复制代码
检察主题详情:
  1. bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
复制代码
  1. root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
  2. Topic: test-topic       TopicId: rXHPQIqJRkO5lQDOsco3NQ PartitionCount: 1       ReplicationFactor: 1    Configs:        Topic: test-topic       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
复制代码
删除主题;
  1. bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092
复制代码
验证检察
  1. root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092
  2. root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  3. __consumer_offsets
复制代码
停止 Kafka Broker
  1. bin/kafka-server-stop.sh
复制代码
停止 Zookeeper:
  1. bin/zookeeper-server-stop.sh
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

东湖之滨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表