Kafka简介
Kafka 是分布式发布-订阅消息体系。Kafka 是一个分布式的,可划分的,冗余备份的持久性的日志服务。它重要用于处理活跃的流式数据。
Kafka 的重要特点:
- 同时为发布和订阅提供高吞吐量。据了解,Kafka 每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
- 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,比方 ETL,以及实时应用步伐。通过将数据持久化到硬盘防止数据丢失。
- 分布式体系,易于向外扩展。所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展呆板。
- 消息被处理的状态是在 consumer 端维护,而不是由 broker 端维护。当失败时能自动均衡。
- 支持online和offline的场景。
Kafka 的整体架构非常简单,是显式分布式架构,producer、broker 和 consumer 都可以有多个。producer,consumer 实现Kafka 注册的接口,数据从 producer 发送到 broker,broker 承担一个中间缓存和分发的作用。broker 分发注册到体系中的 consumer。broker 的作用类似于缓存,即活跃的数据和离线处理体系之间的缓存。客户端和服务器端的通讯,是基于简单,高性能,且与编程语言无关的 TCP 协议。
- topic:特指 Kafka 处理的消息源的不同分类。
- partition:topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id。
- message:消息,是通讯的基本单元,每个 producer 可以向一个 topic 发布一些消息。
- producers:消息和数据生产者,向 Kafka 的一个 topic 发布消息的过程叫做 producers。
- consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。
- broker:缓存署理,Kafka 集群中的一台或多台服务器统称为 broker。
Kafka 安装
在安装 Kafka 之前需要先安装 zookeepper,随后点击 Kafka 的 安装地点,进行一下步调安装:
- ## 解压命令:
- tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
-
- ## 改名命令:
- mv kafka_2.12-2.1.0/ kafka_2.12
-
- ## 进入解压后的目录,修改server.properties文件:
- vim /usr/local/kafka_2.12/config/server.properties
-
- ## 修改配置:
- broker.id=0
- port=9092
- host.name=192.168.11.51
- advertised.host.name=192.168.11.51
- log.dirs=/usr/local/kafka_2.12/kafka-logs
- num.partitions=2
- zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
-
- ## 建立日志文件夹:
- mkdir /usr/local/kafka_2.12/kafka-logs
-
- ##启动kafka:
- /usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
复制代码 常用命令:
- ## 简单操作:
- #(1)创建topic主题命令:(创建名为test的topic, 1个分区分别存放数据,数据备份总共1份)
- kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic topic1 --partitions 1 --replication-factor
- ## --zookeeper 为zk服务列表
- ## --create 命令后 --topic 为创建topic 并指定 topic name
- ## --partitions 为指定分区数量
- ## --replication-factor 为指定副本集数量
- #(2)查看topic列表命令:
- kafka-topics.sh --zookeeper 192.168.11.111:2181 --list
- #(3)kafka命令发送数据:(然后我们就可以编写数据发送出去了)
- kafka-console-producer.sh --broker-list 192.168.11.51:9092 --topic topic1
- #(4)kafka命令接受数据:(然后我们就可以看到消费的信息了)
- kafka-console-consumer.sh --bootstrap-server 192.168.11.51:9092 --topic topic1 --from-beginning
- #(5)删除topic命令:
- kafka-topics.sh --zookeeper 192.168.11.111:2181 --delete --topic topic1
- #(6)kafka查看消费进度:(当我们需要查看一个消费者组的消费进度时,则使用下面的命令)
- kafka-consumer-groups.sh --bootstrap-server 192.168.11.111:9092 --describe --group 消费组名称
复制代码 Kafka 使用
SpringBoot 联合使用 Kafka 需要使用到以下依赖:
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
复制代码 1.1 生产端
生产端的 yml 文件配置:
- spring:
- kafka:
- # kafka 集群
- bootstrap-servers: 192.168.212.128:9092
- producer:
- # producer 发送消息失败时的重试次数
- retries: 0
- # 批量发送数据的配置
- batch-size: 16384
- # 设置kafka 生产者内存缓存区的大小(32 M)
- buffer-memory: 33554432
- # kafka 消息的序列化配置
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- # kafka 发送消息的 ack 可靠性投递的配置项
- # acks=0 : 生产者投递消息后,不会等待任何来自服务器的响应
- # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应
- # acks=-1 : 表示分区 leader 必须等待消息被成功写入到所有的节点中,才认为 producer 请求成功,这种方案提供最高的消息持久性保证,但理论上吞吐量是最低的
- acks: 1
复制代码 简易生产者Demo代码:
- @Component
- public class KafkaProducerService {
- final static Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);
- // key 是 String 类型,value 是 Object 类型
- @Resource
- private KafkaTemplate<String, Object> kafkaTemplate;
- /**
- * 发送消息
- * @param topic topic 主题
- * @param object 消息主体
- */
- public void sendMessage(String topic, Object object)
- {
- // future 模式
- ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);
- // 添加回调函数
- future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
- // 发送成功
- @Override
- public void onSuccess(SendResult<String, Object> result) {
- logger.info("发送消息成功:" + result.toString());
- }
- @Override
- public void onFailure(Throwable ex) {
- logger.info("发送消息失败:" + ex.getMessage());
- }
- });
- }
- }
复制代码 1.2 消费端
消费端的 yml 文件配置:
- spring:
- kafka:
- # kafka 集群
- bootstrap-servers: 192.168.212.128:9092
- consumer:
- # 消息的签收机制:手动签收
- enable-auto-commit: false
- # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理
- # latest(默认值),在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
- # earliest,在偏移量无效的情况下,消费者将从起始位置读取分区的记录
- auto-offset-reset: earliest
- # kafka 消息的序列化配置
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- listener:
- # 手动签收
- ack-mode: manual
- # 并行数
- concurrency: 5
复制代码 简易生产者Demo代码:
- @Component
- public class KafkaConsumer {
- final static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
- /**
- * 消费消息
- * groupId 定义消费者组,下面脚本查看该消费组的消费进度
- * kafka-consumer-groups.sh --bootstrap-server 192.168.11.111:9092 --describe --group 消费组名称
- * topics 定义监听哪个 topic
- * @param record 发送过来的记录
- * @param acknowledgment 手动 ack
- * @param consumer 本消费者对应的一系列信息
- */
- @KafkaListener(groupId = "group01", topics = "topic1")
- public void onMessage(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer)
- {
- logger.info("消费端接收消息:{}", record.value().toString());
- // 手动签收
- acknowledgment.acknowledge();
- }
- }
复制代码 1.3 运行测试
运行测试的时候,在服务器可以使用以下命令检察对应消费组的消费进度,输出的结果如下图:
- kafka-consumer-groups.sh --bootstrap-server 192.168.11.111:9092 --describe --group 消费组名称
复制代码
其中各个字段的意思分别是:
- topic:主题
- partition:当前分区的索引
- current-offset:当前的进度,消费,ack 后的消息数
- log-end-offset:日志的进度,比如10条消息发送后,无需消费,ack,该数值就是 10
- LAG:耽误,可以理解为还没消费的条数,比如消息发送了10条,这10条都没有 ack,则耽误数为10条
- consumer-id:注册消费者 Id,假如消费者不是运行状态,则为 -
- host:消费者地点
- client-id:消费者名
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |