谈谈Flink消耗kafka的偏移量

打印 上一主题 下一主题

主题 891|帖子 891|积分 2673

offset设置:

flinkKafkaConsumer.setStartFromEarliest():从topic的最早offset位置开始处理数据,如果kafka中生存有消耗者组的消耗位置将被忽略。
flinkKafkaConsumer.setStartFromLatest():从topic的最新offset位置开始处理数据,如果kafka中生存有消耗者组的消耗位置将被忽略。
flinkKafkaConsumer.setStartFromTimestamp(…):从指定的时间戳(毫秒)开始消耗数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消耗的位置。如果kafka中生存有消耗者组的消耗位置将被忽略。
flinkKafkaConsumer.setStartFromGroupOffsets():默认的设置。根据代码中设置的group.id设置的消耗者组,去kafka中大概zookeeper中找到对应的消耗者offset位置消耗数据。如果没有找到对应的消耗者组的位置,那么将按照auto.offset.reset设置的策略读取offset。
消耗者offset提交设置:

设置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。5000单元毫秒,代表每5秒举行依次checkpoint
关闭checkpoint:如何禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的设置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)设置是否开启主动提交offset, auto.commit.interval.ms决定主动提交offset的周期。
开启checkpoint:如果开启了checkpoint,那么当checkpoint生存状态完成后,将checkpoint中生存的offset位置提交到kafka。如许保证了Kafka中生存的offset和checkpoint中生存的offset一致,可以通过设置setCommitOffsetsOnCheckpoints(boolean)来设置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中设置的kafka offset主动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。
总结:Flink提供了消耗kafka数据的offset如何提交给Kafka大概zookeeper(kafka0.8之前,因为0.8之前offset是维护在zookeeper中的)的设置 ;关闭checkpoint的话,flink消耗kafka数据 offset取决于kafka客户端的设置;开启checkpoint的话,flink消耗kafka offset由jobmanager中的checkpoint维护,并同步到kafka中保持一置,注意,Flink并不依赖提交给Kafka大概zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消耗的环境。
使用checkpoint + 两阶段提交来保证仅消耗一次kafka中的数据:
Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。
当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当呆板出现故障大概程序出现错误时,也没有重复的数据大概未被处理的数据出现,实现仅一次处理的语义。
checkpoint中包罗:
当前应用的状态;
当前消耗流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;
使用两阶段提交协议保证flink毗连外部系统数据仅一次处理

Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。
当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当呆板出现故障大概程序出现错误时,也没有重复的数据大概未被处理的数据出现,实现仅一次处理的语义。
checkpoint中包罗:
当前应用的状态;
当前消耗流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;
使用两阶段提交协议保证flink毗连外部系统数据仅一次处理;
当Flink处理完的数据需要写入外部系统时,不保证仅一次处理数据。为了提供端到端的仅一次处理数据,在将数据写入外部系统时也要保证仅一次处理数据,这些外部系统必须提供一种手段来允许程序提交大概回滚写入操纵,同时还要保证与Flink的checkpoint机制和谐使用,在分布式系统中和谐提交和回滚的常见方法就是两阶段提交协议。下面给出一个实例相识Flink如何使用两阶段提交协议来实现数据仅一次处理语义。
该实例是从kafka中读取数据,经过处理数据之后将结果再写回kafka。kafka0.11版本之后支持事务,这也是Flink与kafka交互时仅一次处理的必要条件。【注意:当Flink处理完的数据写入kafka时,即当sink为kafka时,主动封装了两阶段提交协议】。Flink支持仅一次处理数据不仅仅限于和Kafka的结合,只要sink提供了必要的两阶段和谐实现,可以对任何sink都能实现仅一次处理数据语义。
其原理如下:

上图Flink程序包罗以下组件:
一个从kafka中读取数据的source
一个窗口聚合操纵
一个将结果写往kafka的sink。
要使sink支持仅一次处理数据语义,必须以事务的方式将数据写往kafka,将两次checkpoint之间的操纵当做一个事务提交,确保出现故障时操纵可以大概被回滚。假设出现故障,在分布式多并发执行sink的应用程序中,仅仅执行单次提交或回滚事务是不敷的,因为分布式中的各个sink程序都必须对这些提交大概回滚达成共识,如许才气保证两次checkpoint之间的数据得到一个一致性的结果。Flink使用两阶段提交协议(pre-commit+commit)来实现这个题目。
Filnk checkpointing开始时就进入到pre-commit阶段,详细来说,一旦checkpoint开始,Flink的JobManager向输入流中写入一个checkpoint barrier将流中所有消息分隔成属于本次checkpoint的消息以及属于下次checkpoint的消息,barrier也会在操纵算子间流转,对于每个operator来说,该barrier会触发operator的State Backend来为当前的operator来打快照。如下图示:

Flink DataSource中存储着Kafka消耗的offset,当完成快照生存后,将chechkpoint barrier传递给下一个operator。这种方式只有在Flink内部状态的场景是可行的,内部状态指的是由Flink的State Backend管理状态,比方上面的window的状态就是内部状态管理。只有当内部状态时,pre-commit阶段无需执行额外的操纵,仅仅是写入一些定义好的状态变量即可,checkpoint乐成时Flink负责提交这些状态写入,否则就不写入当前状态。
但是,一旦operator操纵包罗外部状态,事情就不一样了。我们不能像处理内部状态一样处理外部状态,因为外部状态涉及到与外部系统的交互。这种环境下,外部系统必须要支持可以与两阶段提交协议绑定的事务才气保证仅一次处理数据。
本例中的data sink是将数据写往kafka,因为写往kafka是有外部状态的,这种环境下,pre-commit阶段下data sink 在生存状态到State Backend的同时,还必须pre-commit外部的事务。如下图:
当checkpoint barrier在所有的operator都传递一遍切对应的快照都乐成完成之后,pre-commit阶段才算完成。这个过程中所有创建的快照都被视为checkpoint的一部分,checkpoint中生存着整个应用的全局状态,固然也包罗pre-commit阶段提交的外部状态。当程序出现崩溃时,我们可以回滚状态到最新已经完成快照的时间点。
下一步就是通知所有的operator,告诉它们checkpoint已经完成,这便是两阶段提交的第二个阶段:commit阶段。这个阶段中JobManager会为应用中的每个operator发起checkpoint已经完成的回调逻辑。本例中,DataSource和Winow操纵都没有外部状态,因此在该阶段,这两个operator无需执行任何逻辑,但是Data Sink是有外部状态的,因此此时我们需要提交外部事务。如下图示:

汇总以上信息,总结得出:
一旦所有的operator完成各自的pre-commit,他们会发起一个commit操纵。
如果一个operator的pre-commit失败,所有其他的operator 的pre-commit必须被终止,而且Flink会回滚到最近乐成完成的checkpoint位置。
一旦pre-commit完成,必须要确保commit也要乐成,内部的operator和外部的系统都要对此举行保证。假设commit失败【网络故障原因】,Flink程序就会崩溃,然后根据用户重启策略执行重启逻辑,重启之后会再次commit。
因此,所有的operator必须对checkpoint最终结果达成共识,即所有的operator都必须认定命据提交要么乐成执行,要么被终止然后回滚。
参考:https://blog.51cto.com/u_16213620/7923389


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

美丽的神话

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表