去皮卡多 发表于 2025-3-28 09:30:19

kafka 参数篇

kafka Rebalancing 相关的参数有哪些

kafka 部门源码 (可忽略)

ConsumerConfig

public class ConsumerConfig extends AbstractConfig {
...
    /** <code>max.poll.records</code> */
    public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
    private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."
      + " Note, that <code>" + MAX_POLL_RECORDS_CONFIG + "</code> does not impact the underlying fetching behavior."
      + " The consumer will cache the records from each fetch request and returns them incrementally from each poll.";

    /** <code>max.poll.interval.ms</code> */
    public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
    private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
   
    public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
    public static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using "
                                                          + "consumer group management. This places an upper bound on the amount of time that the consumer can be idle "
                                                          + "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer "
                                                          + "is considered failed and the group will rebalance in order to reassign the partitions to another member. "
                                                          + "For consumers using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be immediately reassigned. "
                                                          + "Instead, the consumer will stop sending heartbeats and partitions will be reassigned "
                                                          + "after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a static consumer which has shutdown.";
   
    /**
   * <code>session.timeout.ms</code>
   */
    public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
    private static final String SESSION_TIMEOUT_MS_DOC = CommonClientConfigs.SESSION_TIMEOUT_MS_DOC;
    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
    public static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect client failures when using "
                                                      + "Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness "
                                                      + "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, "
                                                      + "then the broker will remove this client from the group and initiate a rebalance. Note that the value "
                                                      + "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> "
                                                      + "and <code>group.max.session.timeout.ms</code>.";
    /**
   * <code>heartbeat.interval.ms</code>
   */
    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
    private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;

    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
    public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer "
                                                         + "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the "
                                                         + "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. "
                                                         + "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher "
                                                         + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";

}
Heartbeat

/**
* A helper class for managing the heartbeat to the coordinator
*/
public final class Heartbeat {
    private final int maxPollIntervalMs;
    private final GroupRebalanceConfig rebalanceConfig;
    private final Time time;
    private final Timer heartbeatTimer;
    private final Timer sessionTimer;
    private final Timer pollTimer;
    private final Logger log;
    private final ExponentialBackoff retryBackoff;

    private volatile long lastHeartbeatSend = 0L;
    private volatile boolean heartbeatInFlight = false;
    private volatile long heartbeatAttempts = 0L;

    public Heartbeat(GroupRebalanceConfig config,
                     Time time) {
      if (config.heartbeatIntervalMs >= config.sessionTimeoutMs)
            throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
      this.rebalanceConfig = config;
      this.time = time;
      this.heartbeatTimer = time.timer(config.heartbeatIntervalMs);
      this.sessionTimer = time.timer(config.sessionTimeoutMs);
      this.maxPollIntervalMs = config.rebalanceTimeoutMs;
      this.pollTimer = time.timer(maxPollIntervalMs);
      this.retryBackoff = new ExponentialBackoff(rebalanceConfig.retryBackoffMs,
                CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
                rebalanceConfig.retryBackoffMaxMs,
                CommonClientConfigs.RETRY_BACKOFF_JITTER);

      final LogContext logContext = new LogContext(" ");
      this.log = logContext.logger(getClass());
    }

    private void update(long now) {
      heartbeatTimer.update(now);
      sessionTimer.update(now);
      pollTimer.update(now);
    }

    public void poll(long now) {
      update(now);
      pollTimer.reset(maxPollIntervalMs);
    }

    boolean hasInflight() {
      return heartbeatInFlight;
    }

    void sentHeartbeat(long now) {
      lastHeartbeatSend = now;
      heartbeatInFlight = true;
      update(now);
      heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);

      if (log.isTraceEnabled()) {
            log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());
      }
    }

    void failHeartbeat() {
      update(time.milliseconds());
      heartbeatInFlight = false;
      heartbeatTimer.reset(retryBackoff.backoff(heartbeatAttempts++));

      log.trace("Heartbeat failed, reset the timer to {}ms remaining", heartbeatTimer.remainingMs());
    }

    void receiveHeartbeat() {
      update(time.milliseconds());
      heartbeatInFlight = false;
      heartbeatAttempts = 0L;
      sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
    }

    boolean shouldHeartbeat(long now) {
      update(now);
      return heartbeatTimer.isExpired();
    }
   
    long lastHeartbeatSend() {
      return this.lastHeartbeatSend;
    }

    long timeToNextHeartbeat(long now) {
      update(now);
      return heartbeatTimer.remainingMs();
    }

    boolean sessionTimeoutExpired(long now) {
      update(now);
      return sessionTimer.isExpired();
    }

    void resetTimeouts() {
      update(time.milliseconds());
      sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
      pollTimer.reset(maxPollIntervalMs);
      heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
    }

    void resetSessionTimeout() {
      update(time.milliseconds());
      sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
    }

    boolean pollTimeoutExpired(long now) {
      update(now);
      return pollTimer.isExpired();
    }

    long lastPollTime() {
      return pollTimer.currentTimeMs();
    }
}

:1. session.timeout.ms



[*]作用: 定义消费者在 Group Coordinator 中会话的超时时间。假如消费者在此时间内未发送心跳,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。
   源码注释
在利用Kafka消费者组管理机制时,用于检测客户端故障的会话超时时间。客户端会周期性地向Broker发送心跳信号以表明其存活状态。假如Broker在此会话超时到期前未收到心跳,则会将该客户端从消费者组中移除并触发再平衡。需要留意的是,该值必须处于Broker端设置参数group.min.session.timeout.ms(最小会话超时)和group.max.session.timeout.ms(最大会话超时)规定的允许范围内。


[*]默认值: 通常为 10 秒(10000ms:)。
[*]可能导致 Rebalancing 的缘故原由:

[*]:消费者处置惩罚使命过重,导致无法按时发送心跳。
[*]:网络延迟或故障,导致心跳无法及时到达 Group Coordinator。

[*]优化建议: 根据消费者的处置惩罚本事和网络状态适当增大该值,但留意过大的值会延迟消费者失效的检测。
:2. heartbeat.interval.ms:



[*]作用: 定义消费者发送心跳的时间间隔。
   源码注释:
在利用Kafka消费者组管理机制时,与消费者协调器之间预期的心跳间隔时间。心跳机制用于维持消费者会话的活泼状态,并在新消费者到场或离开组时促进再平衡过程。该参数值必须小于session.timeout.ms(会话超时时间),通常建议设置为该值的1/3以下。可通过进一步调低此值来控制常规再平衡的预期相应时间。


[*]默认值: 通常为 3 秒(3000ms:)。
[*]可能导致 Rebalancing 的缘故原由:

[*]:心跳间隔设置过长,导致消费者在: session.timeout.ms: :内未能发送足够的心跳。
[*]:消费者处置惩罚使命过重或网络问题,导致心跳未能按时发送。

[*]优化建议: 确保 heartbeat.interval.ms: 的值小于 session.timeout.ms: 的三分之一,并适当调解以平衡心跳频率和网络负载。
:3. max.poll.interval.ms:



[*]作用: 定义消费者在两次调用 poll(): 方法之间的最大时间间隔。假如消费者在此时间内未调用 poll():,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。 对于利用指定分组id (:group.instance.id
页: [1]
查看完整版本: kafka 参数篇