马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
kafka Rebalancing 相关的参数有哪些
kafka 部门源码 (可忽略)
ConsumerConfig
- public class ConsumerConfig extends AbstractConfig {
- ...
- /** <code>max.poll.records</code> */
- public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
- private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."
- + " Note, that <code>" + MAX_POLL_RECORDS_CONFIG + "</code> does not impact the underlying fetching behavior."
- + " The consumer will cache the records from each fetch request and returns them incrementally from each poll.";
- /** <code>max.poll.interval.ms</code> */
- public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
- private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
-
- public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
- public static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using "
- + "consumer group management. This places an upper bound on the amount of time that the consumer can be idle "
- + "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer "
- + "is considered failed and the group will rebalance in order to reassign the partitions to another member. "
- + "For consumers using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be immediately reassigned. "
- + "Instead, the consumer will stop sending heartbeats and partitions will be reassigned "
- + "after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a static consumer which has shutdown.";
-
- /**
- * <code>session.timeout.ms</code>
- */
- public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
- private static final String SESSION_TIMEOUT_MS_DOC = CommonClientConfigs.SESSION_TIMEOUT_MS_DOC;
- public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
- public static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect client failures when using "
- + "Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness "
- + "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, "
- + "then the broker will remove this client from the group and initiate a rebalance. Note that the value "
- + "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> "
- + "and <code>group.max.session.timeout.ms</code>.";
- /**
- * <code>heartbeat.interval.ms</code>
- */
- public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
- private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
- public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
- public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer "
- + "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the "
- + "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. "
- + "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher "
- + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
- }
复制代码 Heartbeat
:1. session.timeout.ms
- 作用: 定义消费者在 Group Coordinator 中会话的超时时间。假如消费者在此时间内未发送心跳,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。
源码注释
在利用Kafka消费者组管理机制时,用于检测客户端故障的会话超时时间。客户端会周期性地向Broker发送心跳信号以表明其存活状态。假如Broker在此会话超时到期前未收到心跳,则会将该客户端从消费者组中移除并触发再平衡。需要留意的是,该值必须处于Broker端设置参数group.min.session.timeout.ms(最小会话超时)和group.max.session.timeout.ms(最大会话超时)规定的允许范围内。
- 默认值: 通常为 10 秒(10000ms:)。
- 可能导致 Rebalancing 的缘故原由:
- :消费者处置惩罚使命过重,导致无法按时发送心跳。
- :网络延迟或故障,导致心跳无法及时到达 Group Coordinator。
- 优化建议: 根据消费者的处置惩罚本事和网络状态适当增大该值,但留意过大的值会延迟消费者失效的检测。
:2. heartbeat.interval.ms:
源码注释:
在利用Kafka消费者组管理机制时,与消费者协调器之间预期的心跳间隔时间。心跳机制用于维持消费者会话的活泼状态,并在新消费者到场或离开组时促进再平衡过程。该参数值必须小于session.timeout.ms(会话超时时间),通常建议设置为该值的1/3以下。可通过进一步调低此值来控制常规再平衡的预期相应时间。
- 默认值: 通常为 3 秒(3000ms:)。
- 可能导致 Rebalancing 的缘故原由:
- :心跳间隔设置过长,导致消费者在: session.timeout.ms: :内未能发送足够的心跳。
- :消费者处置惩罚使命过重或网络问题,导致心跳未能按时发送。
- 优化建议: 确保 heartbeat.interval.ms: 的值小于 session.timeout.ms: 的三分之一,并适当调解以平衡心跳频率和网络负载。
:3. max.poll.interval.ms:
- 作用: 定义消费者在两次调用 poll(): 方法之间的最大时间间隔。假如消费者在此时间内未调用 poll():,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。 对于利用指定分组id (:group.instance.id
|