中心件之--一文全解Kafka

打印 上一主题 下一主题

主题 995|帖子 995|积分 2995

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1、Kafka简介

Kafka是一款最初由Linkedin公司开发的基于Zookeeper协调的分布式的、消息发布订阅的消息体系,常用于日志收集、消息队列、实时流数据、指标监控诉警、事件驱动架构等等领域,Linkedin于2010年将其贡献给Apache基金会并成为顶级开源项目。
2、明白消息队列

消息队列(Message Queue)是一种用于在分布式体系传递消息的通讯机制,它允许应用程序通过发生消息和担当处理消息实现解耦批次的直接依靠,消息队列的焦点头脑,生产者(Producer)生产消息发送到队列,消费者(Consumer)从队列中获取消息举行处理。消息队列通常用于异步通讯、欣赏削峰、体系解耦。

2.1、消息队列的上风



  • 解耦:体系直接通过接口交互,体系A不须要直接请与体系B通讯,较低了体系直接的耦合度。
        



  •  异步通讯:体系A发送消息到体系B不须要等待相应,提高了体系的处理能力。 
  • 流量削峰: 消息队列可以缓冲流量高峰,避免体系过载。
  • 扩展性:恰当增加消费者可以提高体系处理能力。
3、Kafka的特性



  • 高吞吐量、低耽误:Kafka支持批量发送和压缩消息,每秒百万条消息,实现毫秒级消息传递和耽误。
  • 可扩展性:Kafka是一个分布式体系,集群支持程度扩展。
  • 持久性、可靠性:Kafka可以将消息生存到磁盘,并且支持备份可以设置生存时间。
  • 高可用、容错性:通过分区和副本机制实现了并行处理消息,纵然Broker宕机数据仍然可用。
  • 消息顺序生存:同一个分区内,Kafka保证消息的顺序性。
  • 多种消息传递语义:At Most Once(至多一次)、At Least Once(至少一次)、Exactly Once(刚好一次)
  • 流处理支持:提供了Kafka Streams API,支持实时流处理。
  • 安全性:支持多种安全机制(SSL/TLS、SASL认证,ACL授权,加密)。
  • 消费者组:支持多个消费者共同消费一个Topic。
4、Kafka的应用场景



  • 日志收集:通过Kafka收集各个体系日志,然后统一开放给消费端例如Hadoop、Hbase、Solr、Elasticsearch等
  • 消息体系:解耦体系、缓存消息等
  • 用户活动跟踪:Kafka经常被用来记载web用户大概app用户的各种活动,如欣赏网页、搜索记载、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,大概装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记载运营监控数据。
  • 实时流处理:Kafka 作为流处理平台的焦点组件,支持实时数据处理和分析。例如IOT中设备数据实时采集,通过传感器采集大量实时数据举行处理传递。
5、Kafka架构


Kafka根本架构比力简单清晰,涉及的焦点概念包括以下:


  • Producer:消息生产者,主要负责生产消息发布到Kafka。
  • Broker:Kafka代理,Kafka集群中单个服务器节点,负责消息存储、生产者和消费者请求处理、副本维护。
  • Consumer:消息消费者,从Kafka订阅消费消息。
  • Topic:消息主题,消息的种别大概名称,生产者将消息发送到特定的主题,消费者订阅主题举行消费。
  • Partition:分区,每个主题(Topic)下面可以包含多个分区,每个分区是有序的、不可变的消息序列。
  • Offset:偏移量,消息在分区中的唯一标识,是消息在分区中的位置。
  • Consumer Group:消费者组,是一组消费者的集合,这些消费者可以共同消费一个大概多个主题的消息。
  • Zookeeper:是Kafka管理元数据、Broker状态、消息偏移量分布式协调服务。
  • Replication:副本是分区的备份,用来提供容错性和可用性。
  • Leader:领导者副本,每个分区都有一个Leader副本用来举行数据的读写处理。
  • Follower:跟随者副本,每个分区有多个Follower副本,用来举行数据的同步。
  • ISR:In Sync Replicas,与Leader副本保持同步的副本,保证数据的一致性和可靠性。
  • Log Compaction:日志压缩,是Kafka生存数据的一种策略。
  • Kafka Stream:Kafka用于构建流处理的客户端库。
6、Spring Boot整合Kafka

6.1、引入依靠

  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka</artifactId>
  4. </dependency>
复制代码
6.2、application.yml设置Kafka

  1. spring:
  2.   application:
  3.     name: kafka
  4.   kafka:
  5.     #Kafka地址,对应server.properties中配置
  6.     bootstrap-servers: localhost:9092
  7.     producer:
  8.       batch-size: 16384
  9.       #应答级别:多少个分区副本完成同步向生产者发送ack确认(0、1、-1/all)
  10.       acks: -1
  11.       #消息重发次数
  12.       retries: 3
  13.       buffer-memory: 33554432
  14.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  15.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  16.     consumer:
  17.       group-id: testGroup
  18.       #是否自动提交offset
  19.       enable-auto-commit: true
  20.       #自动提交延时
  21.       auto-commit-interval: 2000
  22.       #当Kafka中没有初始offset或offset超出范围时自动重置offset,earliest:重置为分区最小的offset,
  23.       #latest:重置为分区最新的offset,none:只要有一个分区不存在已提交offset就抛异常
  24.       auto-offset-reset: latest
  25.       #单次拉取消息最大条数
  26.       max-poll-records: 500
复制代码
6.3、创建Producer类

  1. package com.mqtt.mqttproject.kafka;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @ Author : Gridsum
  7. * @ Description :
  8. */
  9. @Component
  10. public class Producer {
  11.     @Autowired
  12.     private KafkaTemplate<String, Object> kafkaTemplate;
  13.     public void sendMessageToKafka(String message){
  14.         kafkaTemplate.send("test_topic", message);
  15.     }
  16. }
复制代码
6.4、启动Kafka

进入到config目次,修改设置文件,zookeeper.properties可以修改 数据地点

修改kafka设置文件server.properties

以Windows版本为例,下载压缩包,解压后进入bin目次下windows目次,使用管理员打开cmd,首先启动zookeeper服务


然后启动kafka服务

启动Spring Boot服务,使用接口发送消息到Kafka
  1. @Autowired
  2.     private Producer producer;
  3.     // http://127.0.0.1:8080/hello?name=lisi
  4.     @RequestMapping("/hello")
  5.     @ResponseBody
  6.     public String hello(@RequestParam(name = "name", defaultValue = "unknown user") String name) {
  7.         producer.sendMessageToKafka("这是一个Kafka测试消息");
  8.         return "Hello " + name;
  9.     }
复制代码
查察kafka消息,使用工具offset连接kafka举行查察

可以看到发送了两条消息到Kafka,已经发送乐成。
创建消费者Consumer类
  1. package com.mqtt.mqttproject.kafka;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @ Author : Gridsum
  7. * @ Description :
  8. */
  9. @Component
  10. public class Consumer {
  11.     @KafkaListener(topics = "test_topic")
  12.     public void getKafkaMessage(ConsumerRecord<String, Object> record){
  13.         System.out.println(record.topic() + "-" + record.value());
  14.     }
  15. }
复制代码

 7、Kafka工作原理


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

泉缘泉

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表