泉缘泉 发表于 2025-2-17 23:07:16

中心件之--一文全解Kafka

1、Kafka简介

Kafka是一款最初由Linkedin公司开发的基于Zookeeper协调的分布式的、消息发布订阅的消息体系,常用于日志收集、消息队列、实时流数据、指标监控诉警、事件驱动架构等等领域,Linkedin于2010年将其贡献给Apache基金会并成为顶级开源项目。
2、明白消息队列

消息队列(Message Queue)是一种用于在分布式体系传递消息的通讯机制,它允许应用程序通过发生消息和担当处理消息实现解耦批次的直接依靠,消息队列的焦点头脑,生产者(Producer)生产消息发送到队列,消费者(Consumer)从队列中获取消息举行处理。消息队列通常用于异步通讯、欣赏削峰、体系解耦。
https://i-blog.csdnimg.cn/direct/af17658015584e2293e21eac47cbc09f.png
2.1、消息队列的上风



[*]解耦:体系直接通过接口交互,体系A不须要直接请与体系B通讯,较低了体系直接的耦合度。
        https://i-blog.csdnimg.cn/direct/a1c9bf5716d043989a7f550762e291eb.png


[*] 异步通讯:体系A发送消息到体系B不须要等待相应,提高了体系的处理能力。 
[*]流量削峰: 消息队列可以缓冲流量高峰,避免体系过载。
[*]扩展性:恰当增加消费者可以提高体系处理能力。
3、Kafka的特性



[*]高吞吐量、低耽误:Kafka支持批量发送和压缩消息,每秒百万条消息,实现毫秒级消息传递和耽误。
[*]可扩展性:Kafka是一个分布式体系,集群支持程度扩展。
[*]持久性、可靠性:Kafka可以将消息生存到磁盘,并且支持备份可以设置生存时间。
[*]高可用、容错性:通过分区和副本机制实现了并行处理消息,纵然Broker宕机数据仍然可用。
[*]消息顺序生存:同一个分区内,Kafka保证消息的顺序性。
[*]多种消息传递语义:At Most Once(至多一次)、At Least Once(至少一次)、Exactly Once(刚好一次)
[*]流处理支持:提供了Kafka Streams API,支持实时流处理。
[*]安全性:支持多种安全机制(SSL/TLS、SASL认证,ACL授权,加密)。
[*]消费者组:支持多个消费者共同消费一个Topic。
4、Kafka的应用场景



[*]日志收集:通过Kafka收集各个体系日志,然后统一开放给消费端例如Hadoop、Hbase、Solr、Elasticsearch等
[*]消息体系:解耦体系、缓存消息等
[*]用户活动跟踪:Kafka经常被用来记载web用户大概app用户的各种活动,如欣赏网页、搜索记载、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,大概装载到hadoop、数据仓库中做离线分析和挖掘。
[*]运营指标:Kafka也经常用来记载运营监控数据。
[*]实时流处理:Kafka 作为流处理平台的焦点组件,支持实时数据处理和分析。例如IOT中设备数据实时采集,通过传感器采集大量实时数据举行处理传递。
5、Kafka架构

https://i-blog.csdnimg.cn/direct/e7e0c6aa289549148a3835f7913d6431.png
Kafka根本架构比力简单清晰,涉及的焦点概念包括以下:


[*]Producer:消息生产者,主要负责生产消息发布到Kafka。
[*]Broker:Kafka代理,Kafka集群中单个服务器节点,负责消息存储、生产者和消费者请求处理、副本维护。
[*]Consumer:消息消费者,从Kafka订阅消费消息。
[*]Topic:消息主题,消息的种别大概名称,生产者将消息发送到特定的主题,消费者订阅主题举行消费。
[*]Partition:分区,每个主题(Topic)下面可以包含多个分区,每个分区是有序的、不可变的消息序列。
[*]Offset:偏移量,消息在分区中的唯一标识,是消息在分区中的位置。
[*]Consumer Group:消费者组,是一组消费者的集合,这些消费者可以共同消费一个大概多个主题的消息。
[*]Zookeeper:是Kafka管理元数据、Broker状态、消息偏移量分布式协调服务。
[*]Replication:副本是分区的备份,用来提供容错性和可用性。
[*]Leader:领导者副本,每个分区都有一个Leader副本用来举行数据的读写处理。
[*]Follower:跟随者副本,每个分区有多个Follower副本,用来举行数据的同步。
[*]ISR:In Sync Replicas,与Leader副本保持同步的副本,保证数据的一致性和可靠性。
[*]Log Compaction:日志压缩,是Kafka生存数据的一种策略。
[*]Kafka Stream:Kafka用于构建流处理的客户端库。
6、Spring Boot整合Kafka

6.1、引入依靠

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency> 6.2、application.yml设置Kafka

spring:
application:
    name: kafka
kafka:
    #Kafka地址,对应server.properties中配置
    bootstrap-servers: localhost:9092
    producer:
      batch-size: 16384
      #应答级别:多少个分区副本完成同步向生产者发送ack确认(0、1、-1/all)
      acks: -1
      #消息重发次数
      retries: 3
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: testGroup
      #是否自动提交offset
      enable-auto-commit: true
      #自动提交延时
      auto-commit-interval: 2000
      #当Kafka中没有初始offset或offset超出范围时自动重置offset,earliest:重置为分区最小的offset,
      #latest:重置为分区最新的offset,none:只要有一个分区不存在已提交offset就抛异常
      auto-offset-reset: latest
      #单次拉取消息最大条数
      max-poll-records: 500 6.3、创建Producer类

package com.mqtt.mqttproject.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
* @ Author : Gridsum
* @ Description :
*/
@Component
public class Producer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void sendMessageToKafka(String message){
      kafkaTemplate.send("test_topic", message);
    }
} 6.4、启动Kafka

进入到config目次,修改设置文件,zookeeper.properties可以修改 数据地点
https://i-blog.csdnimg.cn/direct/8ea39023e8a1486880093ab06dca4677.png
修改kafka设置文件server.properties
https://i-blog.csdnimg.cn/direct/142a018133744dc89fb6519400b2a2cc.png
以Windows版本为例,下载压缩包,解压后进入bin目次下windows目次,使用管理员打开cmd,首先启动zookeeper服务
https://i-blog.csdnimg.cn/direct/eb0cb34744bf433289c62846a4ff4756.png
https://i-blog.csdnimg.cn/direct/41c18a6cfdf64d0485dc196d6bf8ec93.png
然后启动kafka服务
https://i-blog.csdnimg.cn/direct/039a44a2bb624621b27032bdd273511a.png
启动Spring Boot服务,使用接口发送消息到Kafka
@Autowired
    private Producer producer;

    // http://127.0.0.1:8080/hello?name=lisi
    @RequestMapping("/hello")
    @ResponseBody
    public String hello(@RequestParam(name = "name", defaultValue = "unknown user") String name) {
      producer.sendMessageToKafka("这是一个Kafka测试消息");
      return "Hello " + name;
    } 查察kafka消息,使用工具offset连接kafka举行查察
https://i-blog.csdnimg.cn/direct/8419c996a3454e53adbb50cae39458f8.png
可以看到发送了两条消息到Kafka,已经发送乐成。
创建消费者Consumer类
package com.mqtt.mqttproject.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
* @ Author : Gridsum
* @ Description :
*/
@Component
public class Consumer {

    @KafkaListener(topics = "test_topic")
    public void getKafkaMessage(ConsumerRecord<String, Object> record){
      System.out.println(record.topic() + "-" + record.value());
    }
} https://i-blog.csdnimg.cn/direct/d1a7b87b0a684cdcb938d1dab6065397.png
 7、Kafka工作原理


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 中心件之--一文全解Kafka