throw new SerializationException("Error deserializing key/value for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", var17);
public void assign(Collection<TopicPartition> partitions):
给当前消耗者⼿动分配⼀系列主题分区。
⼿动分配分区不⽀持增量分配,如果先前有分配分区,则该操纵会覆盖之前的分配。
如果给出的主题分区是空的,则等价于调⽤unsubscribe⽅法。
⼿动分配主题分区的⽅法不使⽤消耗组管理功能。当消耗组成员变了,大概集群或主题的元数据改变了,不会触发分区分配的再平衡。
⼿动分区分配assign(Collection)不能和⾃动分区分配subscribe(Collection,ConsumerRebalanceListener)⼀起使⽤。
如果启⽤了⾃动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消耗偏移量进⾏异步提交。
public Set<TopicPartition> assignment():
获取给当前消耗者分配的分区集合。如果订阅是通过调⽤assign⽅法直接分配主题分区,则返回相同的集合。如果使⽤了主题订阅,该⽅法返回当前分配给该消耗者的主题分区集合。如果分区订阅还没开始进⾏分区分配,大概正在重新分配分区,则会返回none。
public Map<String, List<artitionInfo>> listTopics():
获取对⽤户授权的全部主题分区元数据。该⽅法会对服务器发起远程调⽤。
public List<artitionInfo> partitionsFor(String topic):
获取指定主题的分区元数据。如果当前消耗者没有关于该主题的元数据,就会对服务器发起远程调⽤。
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions):
对于给定的主题分区,列出它们第⼀个消息的偏移量。
注意,如果指定的分区不存在,该⽅法大概会永远壅闭。
该⽅法不改变分区的当前消耗者偏移量。
public void seekToEnd(Collection<TopicPartition> partitions):
将偏移量移动到每个给定分区的末了⼀个。
该⽅法耽误执⾏,只有当调⽤过poll⽅法或position⽅法之后才可以使⽤。
如果没有指定分区,则将当前消耗者分配的全部分区的消耗者偏移量移动到末了。
如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消耗偏移量移动到末了⼀个稳固的偏移量,即下⼀个要消耗的消息如今还是未提交状态的事务消息。
public void seek(TopicPartition partition, long offset)说明:
将给定主题分区的消耗偏移量移动到指定的偏移量,即当前消耗者下⼀条要消耗的消息偏移量。
若该⽅法多次调⽤,则末了⼀次的覆盖前⾯的。
如果在消耗中间随意使⽤,大概会丢失数据。
public long position(TopicPartition partition):
查抄指定主题分区的消耗偏移量
public void seekToBeginning(Collection<TopicPartition> partitions):
将给定每个分区的消耗者偏移量移动到它们的起始偏移量。该⽅法懒执⾏,只有当调⽤过poll⽅法或position⽅法之后才会执⾏。如果没有提供分区,则将全部分配给当前消耗者的分区消耗偏移量移动到起始偏移量。
使⽤kafka-topics.sh脚本时可用的设置:
选项说明–config <String: name=value>为创建的或修改的主题指定设置信息。⽀持下述设置条⽬: cleanup.policy``compression.type``delete.retention.ms``file.delete.delay.ms``flush.messages``flush.ms``follower.replication.throttled.replicas``index.interval.bytes``leader.replication.throttled.replicas``max.message.bytes``message.format.version``message.timestamp.difference.max.ms``message.timestamp.type``min.cleanable.dirty.ratio``min.compaction.lag.ms``min.insync.replicas``preallocate``retention.bytes``retention.ms``segment.bytes``segment.index.bytes``segment.jitter.ms``segment.ms``unclean.leader.election.enable–create创建⼀个新主题–delete删除⼀个主题–delete-config <String: name>删除现有主题的⼀个主题设置条⽬。这些条⽬就是在--config中给出的设置条⽬。–alter更改主题的分区数量,副天职配和/或设置条⽬。–describe列出给定主题的细节–disable-rack-aware禁⽤副天职配的机架感知。–force抑制控制台提示信息–help打印资助信息–if-exists如果指定了该选项,则在修改或删除主题的时候,只有主题存在才可以执⾏。–if-not-exists在创建主题的时候,如果指定了该选项,则只有主题不存在的时候才可以执⾏下令。–list列出全部可⽤的主题。–partitions <Integer: # of partitions>要创建或修改主题的分区数。–replica-assignment <String:broker_id_for_part1_replica1 :broker_id_for_part1_replica2,broker_id_for_part2_replica1 :broker_id_for_part2_replica2 , …>当创建或修改主题的时候⼿动指定partition-to-broker的分配关系。–replication-factor <Integer:replication factor>要创建的主题分区副本数。1表示只有⼀个副本,也就是Leader副本。–topic <String: topic>要创建、修改或形貌的主题名称。除了创建,修改和形貌在这⾥还可以使⽤正则表达式。–topics-with-overridesif set when describing topics, only show topics that haveoverridden configs–unavailable-partitionsif set when describing topics, only show partitions whoseleader is not available–under-replicated-partitionsif set when describing topics, only show under replicatedpartitions–zookeeper <String: urls>必需的参数:连接zookeeper的字符串,逗号分隔的多个host:port列表。多个URL可以故障转移。 主题中可以使⽤的参数界说(也就是上面--config <String: name=value> 的参数):
属性默认值服务器默认属性说明cleanup.policydeletelog.cleanup.policy要么是”delete“要么是”compact“; 这个字符串指明了针对旧⽇志部分的利⽤⽅式;默认⽅式(“delete”)将会丢弃旧的部分当他们的接纳时间大概尺⼨限定到达时。”compact“将会进⾏⽇志压缩compression.typenoneproducer⽤于压缩数据的压缩类型。默认是⽆压缩。正确的选项值是none、gzip、snappy。压缩最好⽤于批量处理,批量处理消息越多,压缩性能越好。delete.retention.ms86400000(24hours)log.cleaner.delete.retention.ms对于压缩⽇志保留的最⻓时间,也是客户端消耗消息的最⻓时间,通log.retention.minutes的区别在于⼀个控制未压缩数据,⼀个控制压缩后的数据。此项设置可以在topic创建时的置顶参数覆盖flush.msNonelog.flush.interval.ms此项设置⽤来置顶欺压进⾏fsync⽇志到磁盘的时间隔断;例如,如果设置为1000,那么每1000ms就必要进⾏⼀次fsync。⼀般不发起使⽤这个选项flush.messagesNonelog.flush.interval.messages此项设置指定时间隔断:欺压进⾏fsync⽇志。例如,如果这个选项设置为1,那么每条消息之后都必要进⾏fsync,如果设置为5,则每5条消息就必要进⾏⼀次fsync。⼀般来说,发起你不要设置这个值。此参数的设置,必要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过⼤,将会导致每次"fsync"的时间较⻓(IO壅闭),如果此值过⼩,将会导致"fsync"的次数较多,这也意味着整体的client请求有⼀定的耽误.物理server故障,将会导致没有fsync的消息丢失.index.interval.bytes4096log.index.interval.bytes默认设置包管了我们每4096个字节就对消息添加⼀个索引,更多的索引使得阅读的消息更加靠近,但是索引规模却会由此增⼤;⼀般不必要改变这个选项max.message.bytes1000000max.message.byteskafka追加消息的最⼤尺⼨。注意如果你增⼤这个尺⼨,你也必须增⼤你consumer的fetch 尺⼨,这样consumer才能fetch到这些最⼤尺⼨的消息。min.cleanable.dirty.ratio0.5min.cleanable.dirty.ratio此项设置控制log压缩器试图进⾏扫除⽇志的频率。默认情况下,将制止扫除压缩率超过50%的⽇志。这个⽐率制止了最⼤的空间浪费min.insync.replicas1min.insync.replicas当producer设置request.required.acks为-1时,min.insync.replicas指定replicas的最⼩数⽬(必须确认每⼀个repica的写数据都是成功的),如果这个数⽬没有达到,producer会产⽣异常。retention.bytesNonelog.retention.bytes如果使⽤“delete”的retention 计谋,这项设置就是指在删除⽇志之前,⽇志所能达到的最⼤尺⼨。默认情况下,没有尺⼨限定⽽只偶然间限定retention.ms7 dayslog.retention.minutes如果使⽤“delete”的retention计谋,这项设置就是指删除⽇志前⽇志生存的时间。segment.bytes1Glog.segment.byteskafka中log⽇志是分成⼀块块存储的,此设置是指log⽇志划分成块的⼤⼩segment.index.bytes10MBlog.index.size.max.bytes此设置是有关offsets和⽂件位置之间映射的索引⽂件的⼤⼩;⼀般不必要修改这个设置segment.jitter.ms0log.roll.jitter.{ms,hours}The maximum jitter to subtract from logRollTimeMillis.segment.ms7 dayslog.roll.hours即使log的分块⽂件没有达到必要删除、压缩的⼤⼩,⼀旦log 的时间达到这个上限,就会欺压新建⼀个log分块⽂件unclean.leader.election.enabletrue指明了是否可以或许使不在ISR中replicas设置⽤来作为leader 2.3.1 主题操纵
说明:
除了使⽤Kafka的bin⽬录下的脚本⼯具来管理Kafka,还可以使⽤管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采⽤Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操纵。Kafka0.11.0.0之后,⼜多了⼀个AdminClient,在kafka-client包下,⼀个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient。 功能与原理介绍
Kafka官⽹:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects
KafkaAdminClient包含了⼀下⼏种功能(以Kafka1.0.2版本为准):
创建主题
createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)
场景和之前大抵是类似的,只不过引用 Leader Epoch 机制后,Follower 副本 B 重启返来后,必要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。在这个例子中,该值为 2。当获知到 Leader LEO=2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有生存任何起始位移值大于2 的 Epoch 条目(<1,2>里的start offset为2),因此 B 无需实验任何日志截断操纵。这是对高水位机制的一个显着改进,即副本是否实验日志截断不再依赖于高水位举行判断。
如今,副本 A 宕机了,B 成为 Leader。同样地,当 A 重启返来后,实验与 B 相同的逻辑判断,发现也不消实验日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。背面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会天生新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会利用这个条目资助判断后续是否实验日志截断操纵。这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。
规避数据不一致
B 第一个恢复过来并成为新的 leader。
之后 B 写入消息 m3(赤色的1),并将 LEO 和 HW 更新至2,此时的 LeaderEpoch 已经从 LE0 增至 LE1 了
紧接着 A 也恢复过来成为 Follower 并向 B 发送 OffsetsForLeaderEpochRequest 请求,此时 A 的 LeaderEpoch 为 LE0。B 根据 LE0 查询到LE0+1=LE1,LE1对应的 offset 为1并返回给 A,A 就截断日志并删除了消息 m2(蓝色的1)。之后 A 发送 FetchRequest 至 B 请求来同步数据,最终A和B中都有两条消息 m1 和 m3,HW 和 LEO都为2,而且 LeaderEpoch 都为 LE1,云云便办理了数据不一致的题目。
2.6.5 消息重复