kafka 参数篇

去皮卡多  论坛元老 | 2025-3-28 09:30:19 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1800|帖子 1800|积分 5400

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
kafka Rebalancing 相关的参数有哪些

kafka 部门源码 (可忽略)

ConsumerConfig

  1. public class ConsumerConfig extends AbstractConfig {
  2. ...
  3.     /** <code>max.poll.records</code> */
  4.     public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
  5.     private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."
  6.         + " Note, that <code>" + MAX_POLL_RECORDS_CONFIG + "</code> does not impact the underlying fetching behavior."
  7.         + " The consumer will cache the records from each fetch request and returns them incrementally from each poll.";
  8.     /** <code>max.poll.interval.ms</code> */
  9.     public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
  10.     private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
  11.    
  12.     public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
  13.     public static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using "
  14.                                                           + "consumer group management. This places an upper bound on the amount of time that the consumer can be idle "
  15.                                                           + "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer "
  16.                                                           + "is considered failed and the group will rebalance in order to reassign the partitions to another member. "
  17.                                                           + "For consumers using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be immediately reassigned. "
  18.                                                           + "Instead, the consumer will stop sending heartbeats and partitions will be reassigned "
  19.                                                           + "after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a static consumer which has shutdown.";
  20.    
  21.     /**
  22.      * <code>session.timeout.ms</code>
  23.      */
  24.     public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
  25.     private static final String SESSION_TIMEOUT_MS_DOC = CommonClientConfigs.SESSION_TIMEOUT_MS_DOC;
  26.     public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
  27.     public static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect client failures when using "
  28.                                                         + "Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness "
  29.                                                         + "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, "
  30.                                                         + "then the broker will remove this client from the group and initiate a rebalance. Note that the value "
  31.                                                         + "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> "
  32.                                                         + "and <code>group.max.session.timeout.ms</code>.";
  33.     /**
  34.      * <code>heartbeat.interval.ms</code>
  35.      */
  36.     public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
  37.     private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
  38.     public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
  39.     public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer "
  40.                                                            + "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the "
  41.                                                            + "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. "
  42.                                                            + "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher "
  43.                                                            + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
  44. }
复制代码
Heartbeat

  1. /**
  2. * A helper class for managing the heartbeat to the coordinator
  3. */
  4. public final class Heartbeat {
  5.     private final int maxPollIntervalMs;
  6.     private final GroupRebalanceConfig rebalanceConfig;
  7.     private final Time time;
  8.     private final Timer heartbeatTimer;
  9.     private final Timer sessionTimer;
  10.     private final Timer pollTimer;
  11.     private final Logger log;
  12.     private final ExponentialBackoff retryBackoff;
  13.     private volatile long lastHeartbeatSend = 0L;
  14.     private volatile boolean heartbeatInFlight = false;
  15.     private volatile long heartbeatAttempts = 0L;
  16.     public Heartbeat(GroupRebalanceConfig config,
  17.                      Time time) {
  18.         if (config.heartbeatIntervalMs >= config.sessionTimeoutMs)
  19.             throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
  20.         this.rebalanceConfig = config;
  21.         this.time = time;
  22.         this.heartbeatTimer = time.timer(config.heartbeatIntervalMs);
  23.         this.sessionTimer = time.timer(config.sessionTimeoutMs);
  24.         this.maxPollIntervalMs = config.rebalanceTimeoutMs;
  25.         this.pollTimer = time.timer(maxPollIntervalMs);
  26.         this.retryBackoff = new ExponentialBackoff(rebalanceConfig.retryBackoffMs,
  27.                 CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
  28.                 rebalanceConfig.retryBackoffMaxMs,
  29.                 CommonClientConfigs.RETRY_BACKOFF_JITTER);
  30.         final LogContext logContext = new LogContext("[Heartbeat groupID=" + config.groupId + "] ");
  31.         this.log = logContext.logger(getClass());
  32.     }
  33.     private void update(long now) {
  34.         heartbeatTimer.update(now);
  35.         sessionTimer.update(now);
  36.         pollTimer.update(now);
  37.     }
  38.     public void poll(long now) {
  39.         update(now);
  40.         pollTimer.reset(maxPollIntervalMs);
  41.     }
  42.     boolean hasInflight() {
  43.         return heartbeatInFlight;
  44.     }
  45.     void sentHeartbeat(long now) {
  46.         lastHeartbeatSend = now;
  47.         heartbeatInFlight = true;
  48.         update(now);
  49.         heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
  50.         if (log.isTraceEnabled()) {
  51.             log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());
  52.         }
  53.     }
  54.     void failHeartbeat() {
  55.         update(time.milliseconds());
  56.         heartbeatInFlight = false;
  57.         heartbeatTimer.reset(retryBackoff.backoff(heartbeatAttempts++));
  58.         log.trace("Heartbeat failed, reset the timer to {}ms remaining", heartbeatTimer.remainingMs());
  59.     }
  60.     void receiveHeartbeat() {
  61.         update(time.milliseconds());
  62.         heartbeatInFlight = false;
  63.         heartbeatAttempts = 0L;
  64.         sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
  65.     }
  66.     boolean shouldHeartbeat(long now) {
  67.         update(now);
  68.         return heartbeatTimer.isExpired();
  69.     }
  70.    
  71.     long lastHeartbeatSend() {
  72.         return this.lastHeartbeatSend;
  73.     }
  74.     long timeToNextHeartbeat(long now) {
  75.         update(now);
  76.         return heartbeatTimer.remainingMs();
  77.     }
  78.     boolean sessionTimeoutExpired(long now) {
  79.         update(now);
  80.         return sessionTimer.isExpired();
  81.     }
  82.     void resetTimeouts() {
  83.         update(time.milliseconds());
  84.         sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
  85.         pollTimer.reset(maxPollIntervalMs);
  86.         heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
  87.     }
  88.     void resetSessionTimeout() {
  89.         update(time.milliseconds());
  90.         sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
  91.     }
  92.     boolean pollTimeoutExpired(long now) {
  93.         update(now);
  94.         return pollTimer.isExpired();
  95.     }
  96.     long lastPollTime() {
  97.         return pollTimer.currentTimeMs();
  98.     }
  99. }
复制代码
:1. session.timeout.ms



  • 作用: 定义消费者在 Group Coordinator 中会话的超时时间。假如消费者在此时间内未发送心跳,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。
   源码注释
  在利用Kafka消费者组管理机制时,用于检测客户端故障的会话超时时间。客户端会周期性地向Broker发送心跳信号以表明其存活状态。假如Broker在此会话超时到期前未收到心跳,则会将该客户端从消费者组中移除并触发再平衡。需要留意的是,该值必须处于Broker端设置参数group.min.session.timeout.ms(最小会话超时)和group.max.session.timeout.ms(最大会话超时)规定的允许范围内。
  

  • 默认值: 通常为 10 秒(10000ms:)。
  • 可能导致 Rebalancing 的缘故原由:

    • :消费者处置惩罚使命过重,导致无法按时发送心跳。
    • :网络延迟或故障,导致心跳无法及时到达 Group Coordinator。

  • 优化建议: 根据消费者的处置惩罚本事和网络状态适当增大该值,但留意过大的值会延迟消费者失效的检测。
:2. heartbeat.interval.ms:



  • 作用: 定义消费者发送心跳的时间间隔。
   源码注释:
  在利用Kafka消费者组管理机制时,与消费者协调器之间预期的心跳间隔时间。心跳机制用于维持消费者会话的活泼状态,并在新消费者到场或离开组时促进再平衡过程。该参数值必须小于session.timeout.ms(会话超时时间),通常建议设置为该值的1/3以下。可通过进一步调低此值来控制常规再平衡的预期相应时间。
  

  • 默认值: 通常为 3 秒(3000ms:)。
  • 可能导致 Rebalancing 的缘故原由:

    • :心跳间隔设置过长,导致消费者在: session.timeout.ms: :内未能发送足够的心跳。
    • :消费者处置惩罚使命过重或网络问题,导致心跳未能按时发送。

  • 优化建议: 确保 heartbeat.interval.ms: 的值小于 session.timeout.ms: 的三分之一,并适当调解以平衡心跳频率和网络负载。
:3. max.poll.interval.ms:



  • 作用: 定义消费者在两次调用 poll(): 方法之间的最大时间间隔。假如消费者在此时间内未调用 poll():,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。 对于利用指定分组id (:group.instance.id
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

去皮卡多

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表