Kafka的简介、架构、安装利用、生产者、消耗者、高吞吐、持久化及与Flume整 ...

打印 上一主题 下一主题

主题 1026|帖子 1026|积分 3078

Apache Kafka是一个分布式流处理平台,最初由LinkedIn公司开辟,后来成为Apache软件基金会的一个顶级项目。Kafka主要用于构建实时数据管道和流处理应用程序。Kafka广泛应用于日志聚合、实时分析、变乱源、流处理等场景。它与各种数据处理框架和数据库集成,如Apache Storm、Apache Flink、Apache Cassandra等。
最新的稳固版本是3.7.0,发布日期为2024年2月27日。您可以通过以下链接访问Apache Kafka的下载页面并获取最新版本:
官方下载页面: Apache Kafka Downloads

1. 异步通讯原理

1.1. 观察者模式

观察者模式(Observer),又叫发布-订阅模式(Publish/Subscribe)
界说对象间一种一对多的依赖关系,使得每当一个对象改变状态,则全部依赖于它的对象都会得到通知并主动更新。
一个对象(目标对象)的状态发生改变,全部的依赖对象(观察者对象)都将得到通知。

实际生活中的应用场景:

  • 日志处理与分析:Kafka最初设计目标之一就是日志收集,可以收集各种服务的日志,例如web服务器、数据库服务器等,并通过同一接口服务开放给各种消耗者,如Flink、Hadoop、Hbase、ElasticSearch等,实现分布式系统中海量日志数据的处理与分析。
  • 推荐系统:Kafka可以作为流式处理平台的数据源或数据输出,与Spark、Streaming、Storm、Flink等框架集成,实现对实时数据的处理和分析,如过滤、转换、聚合等,用于计算用户相似度,为用户推荐可能感兴趣的商品。
  • 系统监控与报警:Kafka常用于传输监控指标数据,例如CPU利用率、内存利用环境等,然后监控应用程序可以利用这些指标来进行实时可视化、警报和非常检测。
  • 数据变更捕获(CDC):Kafka的毗连器组件可以支持CDC功能,将数据库中的更改以流的形式传输到其他系统,用于数据的复制或缓存以及索引更新等。
  • 系统迁移:Kafka可以用作老系统升级到新系统过程中的消息传递中心件,降低迁移风险。
  • 变乱溯源:在微服务架构中,Kafka可以记载微服务间的变乱,如订单创建、支付完成等,实现业务逻辑的协调和同步。
  • 消息队列:Kafka作为消息队列,实现差别系统间的解耦和异步通讯,如订单系统、支付系统、库存系统等。
  • 应用解耦:通过引入Kafka,订单系统和库存系统可以淘汰耦合,订单系统完成持久化处理后将消息写入消息队列,库存系统订阅消息并进行库存操纵,进步系统的灵活性和可维护性。
  • 流量削峰:在秒杀或团购活动中,Kafka可以作为前端的消息队列,控制活动的人数,缓解短时间内高流量对应用的压力。
  • 日志处理:Kafka可以用于日志收集和处理,解决大量日志传输的问题。
  • 消息通讯:Kafka内置高效的通讯机制,可以用在纯的消息通讯场景,如实现点对点消息队列或聊天室等。
1.2. 生产者消耗者模式



  • 传统模式
生产者直接将消息传递给指定的消耗者
耦合性特别高,当生产者大概消耗者发生变化,都必要重写业务逻辑


  • 生产者消耗者模式
通过一个容器来解决生产者和消耗者的强耦合问题。生产者和消耗者相互之间不直接通讯,而通过阻塞队列来进行通讯



  • 数据传递流程
生产者消耗者模式,即N个线程进行生产,同时N个线程进行消耗,两种角色通过内存缓冲区进行通讯;
生产者负责向缓冲区内里添加数据单位;
消耗者负责从缓冲区内里取出数据单位;
一般遵循先进先出的原则
1.3. 缓冲区



  • 解耦
假设生产者和消耗者分别是两个类。假如让生产者直接调用消耗者的某个方法,那么生产者对于消耗者就会产生依赖


  • 支持并发
生产者直接调用消耗者的某个方法过程中函数调用是同步的
万一消耗者处理数据很慢,生产者就会白白摧残大好韶光


  • 支持忙闲不均
缓冲区还有另一个好处。假如制造数据的速度时快时慢,缓冲区的好处就体现出来了。
当数据制造快的时间,消耗者来不及处理,未处理的数据可以临时存在缓冲区中。
等生产者的制造速度慢下来,消耗者再慢慢处理掉。
1.4. 数据单位



  • 关联到业务对象
数据单位必须关联到某种业务对象


  • 完备性
就是在传输过程中,要包管该数据单位的完备


  • 独立性
就是各个数据单位之间没有相互依赖
某个数据单位传输失败不应该影响已经完成传输的单位;也不应该影响尚未传输的单位。


  • 颗粒度
数据单位必要关联到某种业务对象。那么数据单位和业务对象应该处于的关系(一对一?一对多)
假如颗粒度过小会增加数据传输的次数
假如颗粒度过大会增加单个数据传输的时间,影响后期消耗
2. 消息系统原理

一个消息系统负责将数据从一个应用传递到别的一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。
2.1. 点对点消息传递



  • 在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消耗者消耗队列中的数据。但是一条消息只能被消耗一次。
  • 当一个消耗者消耗了队列中的某条数据之后,该条数据则从消息队列中删除。 该模式纵然有多个消耗者同时消耗数据,也能包管数据处理的顺序。
  • 基于推送模子的消息系统,由消息代理记载消耗状态。
    消息代理将消息推送(push)到消耗者后,标记这条消息为已经被消耗,但是这种方式无法很好地包管消耗的处理语义。

2.2. 发布订阅消息传递



  • 在发布-订阅消息系统中,消息被持久化到一个topic中。
  • 消耗者可以订阅一个或多个topic,消耗者可以消耗该topic中全部的数据,同一条数据可以被多个消耗者消耗,数据被消耗后不会立马删除。
  • 在发布-订阅消息系统中,消息的生产者称为发布者,消耗者称为订阅者。
  • Kafka采取拉取模子(Poll),由本身控制消耗速度,以及消耗的进度,消耗者可以按照恣意的偏移量进行消耗。

3. Kafka简介

官网:http://kafka.apache.org/
Kafka是由Apache软件基金会开辟的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消耗者在网站中的全部动作流数据。
3.1. 设计目标

以时间复杂度为O(1)的方式提供消息持久化本领,纵然对TB级以上数据也能包管常数时间的访问性能。
高吞吐率。纵然在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
支持Kafka Server间的消息分区,及分布式消耗,同时包管每个partition内的消息顺序传输。
同时支持离线数据处理和实时数据处理。
支持在线水平扩展
3.2. Kafka的长处



  • 解耦:
在项目启动之初来预测未来项目会遇到什么需求,是极其困难的。消息系统在处理过程中心插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。


  • 冗余
有些环境下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所接纳的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,必要你的处理系统明白的指出该消息已经被处理完毕,从而确保你的数据被安全的生存直到你利用完毕。


  • 扩展性
因为消息队列解耦了你的处理过程,以是增大消息入队和处理的频率是很轻易的,只要别的增加处理过程即可。不必要改变代码、不必要调节参数。扩展就像调大电力按钮一样简单。


  • 灵活性&峰值处理本领
在访问量剧增的环境下,应用仍然必要继续发挥作用,但是如许的突发流量并不常见;假如为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。利用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。


  • 可恢复性
系统的一部门组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,以是纵然一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。


  • 顺序包管
在大多利用场景下,数据处理的顺序都很告急。大部门消息队列本来就是排序的,并且能包管数据会按照特定的顺序来处理。Kafka包管一个Partition内的消息的有序性。


  • 缓冲
在任何告急的系统中,都会有必要差别的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助使命最高效率的执行———写入队列的处剖析尽可能的快速。
该缓冲有助于控制和优化数据流经过系统的速度。


  • 异步通讯
许多时间,用户不想也不必要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在必要的时间再行止理它们。
4. Kafka系统架构


4.1. Broker

Kafka 集群包罗一个或多个服务器,服务器节点称为broker。
4.2. Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
雷同于数据库的表名大概ES的Index
物理上差别Topic的消息分开存储
逻辑上一个Topic的消息虽然生存于一个或多个broker上但用户只需指定消息的Topic即可生产或消耗数据而不必关心数据存于何处)
创建流程

删除流程

4.3. Partition




  • topic中的数据分割为一个或多个partition。
每个topic至少有一个partition,当生产者产生数据的时间,根据分配策略,选择分区,然后将消息追加到指定的分区的末端(队列)



  • 每条消息都会有一个自增的编号
标识顺序
用于标识消息的偏移量


  • 每个partition中的数据利用多个segment文件存储。
  • partition中的数据是有序的,差别partition间的数据丢失了数据的顺序。
  • 假如topic有多个partition,消耗数据时就不能包管数据的顺序。严格包管消息的消耗顺序的场景下,必要将partition数目设为1。
4.4. Leader

每个partition有多个副本,此中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

4.5. Follower



  • Follower跟随Leader,全部写请求都通过Leader路由,数据变更会广播给全部Follower,Follower与Leader保持数据同步。
  • 假如Leader失效,则从Follower中推举出一个新的Leader。
  • 当Follower挂掉、卡住大概同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
4.6. replication



  • 数据会存放到topic的partation中,但是有可能分区会损坏
  • 我们必要对分区的数据进行备份(备份多少取决于你对数据的器重程度)
  • 我们将分区的分为Leader(1)和Follower(N)
Leader负责写入和读取数据
Follower只负责备份
包管了数据的一致性


  • 备份数设置为N,表现主+备=N(参考HDFS)
  1. ## Kafka 分配 Replica 的算法如下
  2. 1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
  3. 2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
  4. 3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上
复制代码

4.7. producer



  • 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。
  • broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。
  • 生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
4.8. consumer



  • 消耗者可以从broker中读取数据。消耗者可以消耗多个topic中的数据。
  • kafka 提供了两套 consumer API:

  • high-level consumer API 提供了一个从 kafka 消耗数据的高层抽象,而 SimpleConsumer API 则必要开辟人员更多地关注细节。

4.9. Consumer Group



  • 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
  • 将多个消耗者集中到一起行止理某一个Topic的数据,可以更快的进步数据的消耗本领
  • 整个消耗者组共享一组偏移量(防止数据被重复读取),因为一个Topic有多个分区

4.10. offset偏移量



  • 可以唯一的标识一条消息
  • 偏移量决定读取数据的位置,不会有线程安全的问题,消耗者通过偏移量来决定下次读取的消息
  • 消息被消耗之后,并不被立刻删除,如许多个业务就可以重复利用kafka的消息
  • 我们某一个业务也可以通过修改偏移量到达重新读取消息的目的,偏移量由用户控制
  • 消息终极还是会被删除的,默认生命周期为1周(7*24小时)
4.11. Zookeeper

kafka 通过 zookeeper 来存储集群的 meta 信息。



5. Kafka环境搭建



  • 基于Zookeeper搭建并开启
验证ZK的可用性
【123】zkServer.sh start


  • 配置Kafka
修改配置文件

修改环境变量

将文件目次拷贝到其他机器

修改其他机器上的配置

启动集群
  1. kafka-server-start.sh /opt/lzj/kafka_2.11-0.8.2.1/config/server.properties
复制代码
常见命令
  1. //创建主题
  2. kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic serlog
  3. kafka-topics.sh --zookeeper node01:2181 --create --replicationfactor 2 --partitions 6 --topic studentlog
  4. kafka-topics.sh --zookeeper node01:2181 --delete --replicationfactor 2 --partitions 6 --topic baidu
  5. //查看所有主题
  6. kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
  7. //查看主题
  8. kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic userlog
  9. //创建生产者
  10. kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic userlog
  11. //创建消费者
  12. kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic userlog
复制代码
6. Kafka数据检索机制



  • topic在物理层面以partition为分组,一个topic可以分成多少个partition
  • partition还可以细分为Segment,一个partition物理上由多个Segment组成
segment 的参数有两个:
log.segment.bytes:单个segment可容纳的最大数据量,默以为1GB
log.segment.ms:Kafka在commit一个未写满的segment前,所等待的时间(默以为7天)


  • LogSegment 文件由两部门组成,分别为“.index”文件和“.log”文件,分别表现为 Segment 索引文件和数据文件。
partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值
数值大小为64位,20位数字字符长度,没有数字用0填充

消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。

7. 数据的安全性

7.1. producer delivery guarantee




  • Producers可以选择是否为数据的写入接收ack,有以下几种ack的选项:request.required.acks
    acks=0:
    Producer 在 ISR 中的 Leader 已成功收到的数据并得到确认后发送下一条 Message。
    acks=1:
    这意味着 Producer 无需等待来自 Broker 的确认而继续发送下一批消息。
    acks=all:
    Producer 必要等待 ISR 中的全部 Follower 都确认接收到数据后才算一次发送完成,可靠性最高。
7.2. ISR机制



  • 关键词
    AR : Assigned Replicas 用来标识副本的全集
    OSR : out -sync Replicas 离开同步队列的副本
    ISR : in -sync Replicas 加入同步队列的副本
    ISR = Leader + 没有落伍太多的副本;AR = OSR+ ISR。
  • 我们备份数据就是防止数据丢失,当主节点挂掉时,可以启用备份节点
    producer–push–>leader
    leader–pull–>follower
    Follower每隔断一定时间去Leader拉取数据,来包管数据的同步
  • ISR(in-syncReplica)
    当主节点挂点,并不是去Follower选择主,而是从ISR中选择主
    判断标准
    超过10秒钟没有同步数据
    replica.lag.time.max.ms=10000
    主副节点差4000条数据
    rerplica.lag.max.messages=4000
  • 脏节点推举
    kafka接纳一种降级措施来处理:
    推举第一个恢复的node作为leader提供服务,以它的数据为基准,这个措施被称为脏leader推举
7.3. Broker数据存储机制

无论消息是否被消耗,kafka 都会保留全部消息。有两种策略可以删除旧数据:
基于时间:log.retention.hours=168
基于大小:log.retention.bytes=1073741824
7.4. consumer delivery guarantee



  • 假如将 consumer 设置为 autocommit,consumer 一旦读到数据立即主动commit。假如只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。
  • 读完消息先 commit 再处理消息。
    假如 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息
    这就对应于 At most once
  • 读完消息先处理再 commit。
    假如在处理完消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。
    这就对应于 At least once。
  • 假如一定要做到 Exactly once,就必要协调 offset 和实际操纵的输出。
    经典的做法是引入两阶段提交。
  • Kafka 默认包管 At least once,并且允许通过设置 producer 异步提交来实现 At most once
7.5. 数据的消耗



  • partiton_num=2,启动一个consumer进程订阅这个topic,对应的,stream_num设为2,也就是说启两个线程并行处理message。
假如auto.commit.enable=true,
当consumer fetch了一些数据但还没有完全处理掉的时间,刚好到commit interval出发了提交offset操纵,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机遇再次被处理,数据丢失。
假如auto.commit.enable=false,
假设consumer的两个fetcher各自拿了一条数据,并且由两个线程同时处理,这时线程t1处理完partition1的数据,手动提交offset,这里必要着重说明的是,当手动执行commit的时间,实际上是对这个consumer进程所占据的全部partition进行commit,kafka临时还没有提供更细粒度的commit方式,也就是说,纵然t2没有处理完partition2的数据,offset也被t1提交掉了。假如这时consumer crash掉,t2正在处理的这条数据就丢失了。


  • 方法1:(将多线程问题转成单线程)
手动commit offset,并针对partition_num启同样数目的consumer进程,如许就能包管一个consumer进程占据一个partition,commit offset的时间不会影响别的partition的offset。但这个方法比较局限,因为partition和consumer进程的数目必须严格对应


  • 方法2:(参考HDFS数据写入流程)
手动commit offset,别的在consumer端再将全部fetch到的数据缓存到queue里,当把queue里全部的数据处理完之后,再批量提交offset,如许就能包管只有处理完的数据才被commit。
8. JavaAPI

8.1. 生产者



  • 创建一线程重复的向kafka输入数据
  1. 创建生产者线程类
  2. public class Hello01Producer extends Thread {
  3. //创建Kafka的生产者
  4. private Producer<String, String> producer;
  5. /**
  6. * 创建构造器
  7. */
  8. public Hello01Producer(String pname) {
  9. //设置线程的名字
  10. super.setName(pname);
  11. //创建配置文件列表
  12. Properties properties = new Properties();
  13. // kafka地址,多个地址用逗号分割
  14. properties.put("metadata.broker.list",
  15. "192.168.58.161:9092,192.168.58.162:9092,192.168.58.163:9092");
  16. //设置写出数据的格式
  17. properties.put("serializer.class", StringEncoder.class.getName());
  18. //写出的应答方式
  19. properties.put("acks", 1);
  20. //批量写出
  21. properties.put("batch.size", 16384);
  22. //创建生产者对象
  23. producer = new Producer<String, String>(new
  24. kafka.producer.ProducerConfig(properties));
  25. }
  26. @Override
  27. public void run() {
  28. //初始化一个计数器
  29. int count = 0;
复制代码
  1. System.out.println("Hello01Producer.run--开始发送数据");
  2. //迭代發送消息
  3. while (count < 100000) {
  4. String key = String.valueOf(++count);
  5. String value = Thread.currentThread().getName() + "--" + count;
  6. //封装消息对象
  7. KeyedMessage<String, String> message = new KeyedMessage<>("userlog",
  8. key, value);
  9. //发送消息到服务器
  10. producer.send(message);
  11. //打印消息
  12. System.out.println("Producer.run--" + key + "--" + value);
  13. //每个1秒发送1条
  14. try {
  15. Thread.sleep(100);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. public static void main(String[] args) {
  21. Hello01Producer producer = new Hello01Producer("学堂");
  22. producer.start();
  23. }
  24. }
复制代码
8.2. 消耗者



  • 创建一线程重复的向kafka消耗数据
  1. //创建消费者对象
  2. private ConsumerConnector consumer;
  3. /**
  4. * 创建构造器
  5. */
  6. public Hello01Consumer(String cname) {
  7. super.setName(cname);
  8. //读取配置文件
  9. Properties properties = new Properties();
  10. //ZK地址
  11. properties.put("zookeeper.connect",
  12. "192.168.58.161:2181,192.168.58.162:2181,192.168.58.163:2181");
  13. //消费者所在组的名称
  14. properties.put("group.id", "shsxt-bigdata");
  15. //ZK超时时间
  16. properties.put("zookeeper.session.timeout.ms", "400");
  17. //当消费者第一次消费时,从最低的偏移量开始消费
  18. properties.put("auto.offset.reset", "smallest");
  19. //自动提交偏移量
  20. properties.put("auto.commit.enable", "true");
  21. //消费者自动提交偏移量的时间间隔
  22. properties.put("auto.commit.interval.ms", "1000");
  23. //创建消费者对象
  24. consumer = Consumer.createJavaConsumerConnector(new
  25. ConsumerConfig(properties));
  26. }
  27. @Override
  28. public void run() {
  29. // 描述读取哪个topic,需要几个线程读
  30. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  31. topicCountMap.put("userlog", 1);
  32. //消费者给句配置信息开始读取消息流
  33. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
  34. consumer.createMessageStreams(topicCountMap);
  35. // 每个线程对应于一个KafkaStream
  36. List<KafkaStream<byte[], byte[]>> list = consumerMap.get("userlog");
  37. // 获取kafkastream流
  38. KafkaStream stream0 = list.get(0);
  39. ConsumerIterator<byte[], byte[]> it = stream0.iterator();
  40. //开始迭代并获取数据
  41. while (it.hasNext()) {
  42. }
复制代码
8.3. 重复消耗和数据的丢失



  • 有可能一个消耗者取出了一条数据(offset=88),但是还没有处理完成,但是消耗者被关闭了
    假如下次还能从88重新处理就属于完美环境;
    假如下次数据从86开始,就属于数据的重复消耗;
    假如下次数据从89开始,就是与数据的丢失;

9. Kafka优化

9.1. Partition 数目



  • 一般来说,每个partition能处理的吞吐为几MB/s(仍必要基于根据本地环境测试后获取准确指标),增加更多的partitions意味着:
    更高的并行度与吞吐
    可以扩展更多的(同一个consumer group中的)consumers
    假如集群中有较多的brokers,则可更大程度上利用闲置的brokers
    但是会造成Zookeeper的更多推举
    也会在Kafka中打开更多的文件
  • 调整准则
    一般来说,假如集群较小(小于6个brokers),则配置2 x broker数的partition数。在这里主要思量的是之后的扩展。假如集群扩展了一倍(例如12个),则不用担心会有partition不足的现象发生;
    一般来说,假如集群较大(大于12个),则配置1 x broker 数的partition数。因为这里不必要再思量集群的扩展环境,与broker数雷同的partition数已经充足应付常规场景。若有必要,则再手动调整;
    思量最高峰吞吐必要的并行consumer数,调整partition的数目。假如应用场景必要有20个(同一个consumer group中的)consumer并行消耗,则据此设置为20个partition;
    思量producer所需的吞吐,调整partition数目(假如producer的吞吐非常高,或是在接下来两年内都比较高,则增加partition的数目)
9.2. Replication factor



  • 此参数决定的是records复制的数目,发起至少 设置为2,一般是3,最高设置为4。
  • 更高的replication factor(假设数目为N)意味着:
    系统更稳固(允许N-1个broker宕机)
    更多的副本(假如acks=all,则会造成较高的延时)
    系统磁盘的利用率会更高(一般假如RF为3,则相对于RF为2时,会占据更多50% 的磁盘空间)
  • 调整准则:
    – 以3为起始(固然至少必要有3个brokers,同时也不发起一个Kafka 集群中节点数少于3个节点)
    – 假如replication 性能成为了瓶颈或是一个issue,则发起利用一个性能更好的broker,而不是降低RF的数目
    – 永久不要在生产环境中设置RF为1
9.3. 批量写入

为了大幅度进步producer写入吞吐量,必要定期批量写文件

10. Flume+Kafka集成



  • 搭建Flume并编写配置文件
  1. vim /opt/lzj/flume-1.6.0/options/f2k.conf
复制代码
  1. #flume-ng agent -n a1 -f /opt/lzj/flume-1.6.0/options/f2k.conf -
  2. flume.root.logger=INFO,console
  3. # Name the components on this agent
  4. a1.sources = r1
  5. a1.sinks = k1
  6. a1.channels = c1
  7. # Describe/configure the source
  8. a1.sources.r1.type = exec
  9. a1.sources.r1.command = tail -F /var/bdp/baidu.ping
  10. # Describe the sink
  11. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  12. a1.sinks.k1.topic = baidu
  13. a1.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
  14. a1.sinks.k1.requiredAcks = 1
  15. a1.sinks.k1.batchSize = 10
  16. a1.sinks.k1.channel = c1
  17. # Use a channel which buffers events in memory
  18. a1.channels.c1.type = memory
  19. a1.channels.c1.capacity = 1000000
  20. a1.channels.c1.transactionCapacity = 10000
  21. # Bind the source and sink to the channel
  22. a1.sources.r1.channels = c1
  23. a1.sinks.k1.channel = c1
复制代码


  • 启动Zookeeper,Kafka,然后创建Topic(baidu),开启消耗者
  1. 【123】zkServer.sh start
  2. 【123】kafka-server-start.sh /opt/lzj/kafka_2.11/config/server.properties
  3. 【1】kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 3 --topic baidu
  4. 【1】kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic baidu
复制代码


  • 开启Flume
  1. 【1】flume-ng agent -n a1 -f /opt/lzj/apache-flume-1.6.0-bin/options/f2k.conf -Dflume.root.logger=INFO,console
复制代码


  • 开始ping百度的脚本
  1. ping www.baidu.com >> /var/bdp/baidu.ping 2>&1 &
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宝塔山

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表