kafka 参数篇
kafka Rebalancing 相关的参数有哪些kafka 部门源码 (可忽略)
ConsumerConfig
public class ConsumerConfig extends AbstractConfig {
...
/** <code>max.poll.records</code> */
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."
+ " Note, that <code>" + MAX_POLL_RECORDS_CONFIG + "</code> does not impact the underlying fetching behavior."
+ " The consumer will cache the records from each fetch request and returns them incrementally from each poll.";
/** <code>max.poll.interval.ms</code> */
public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
public static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using "
+ "consumer group management. This places an upper bound on the amount of time that the consumer can be idle "
+ "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer "
+ "is considered failed and the group will rebalance in order to reassign the partitions to another member. "
+ "For consumers using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be immediately reassigned. "
+ "Instead, the consumer will stop sending heartbeats and partitions will be reassigned "
+ "after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a static consumer which has shutdown.";
/**
* <code>session.timeout.ms</code>
*/
public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
private static final String SESSION_TIMEOUT_MS_DOC = CommonClientConfigs.SESSION_TIMEOUT_MS_DOC;
public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
public static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect client failures when using "
+ "Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness "
+ "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, "
+ "then the broker will remove this client from the group and initiate a rebalance. Note that the value "
+ "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> "
+ "and <code>group.max.session.timeout.ms</code>.";
/**
* <code>heartbeat.interval.ms</code>
*/
public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer "
+ "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the "
+ "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. "
+ "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher "
+ "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
}
Heartbeat
/**
* A helper class for managing the heartbeat to the coordinator
*/
public final class Heartbeat {
private final int maxPollIntervalMs;
private final GroupRebalanceConfig rebalanceConfig;
private final Time time;
private final Timer heartbeatTimer;
private final Timer sessionTimer;
private final Timer pollTimer;
private final Logger log;
private final ExponentialBackoff retryBackoff;
private volatile long lastHeartbeatSend = 0L;
private volatile boolean heartbeatInFlight = false;
private volatile long heartbeatAttempts = 0L;
public Heartbeat(GroupRebalanceConfig config,
Time time) {
if (config.heartbeatIntervalMs >= config.sessionTimeoutMs)
throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
this.rebalanceConfig = config;
this.time = time;
this.heartbeatTimer = time.timer(config.heartbeatIntervalMs);
this.sessionTimer = time.timer(config.sessionTimeoutMs);
this.maxPollIntervalMs = config.rebalanceTimeoutMs;
this.pollTimer = time.timer(maxPollIntervalMs);
this.retryBackoff = new ExponentialBackoff(rebalanceConfig.retryBackoffMs,
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
rebalanceConfig.retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
final LogContext logContext = new LogContext(" ");
this.log = logContext.logger(getClass());
}
private void update(long now) {
heartbeatTimer.update(now);
sessionTimer.update(now);
pollTimer.update(now);
}
public void poll(long now) {
update(now);
pollTimer.reset(maxPollIntervalMs);
}
boolean hasInflight() {
return heartbeatInFlight;
}
void sentHeartbeat(long now) {
lastHeartbeatSend = now;
heartbeatInFlight = true;
update(now);
heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
if (log.isTraceEnabled()) {
log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());
}
}
void failHeartbeat() {
update(time.milliseconds());
heartbeatInFlight = false;
heartbeatTimer.reset(retryBackoff.backoff(heartbeatAttempts++));
log.trace("Heartbeat failed, reset the timer to {}ms remaining", heartbeatTimer.remainingMs());
}
void receiveHeartbeat() {
update(time.milliseconds());
heartbeatInFlight = false;
heartbeatAttempts = 0L;
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
}
boolean shouldHeartbeat(long now) {
update(now);
return heartbeatTimer.isExpired();
}
long lastHeartbeatSend() {
return this.lastHeartbeatSend;
}
long timeToNextHeartbeat(long now) {
update(now);
return heartbeatTimer.remainingMs();
}
boolean sessionTimeoutExpired(long now) {
update(now);
return sessionTimer.isExpired();
}
void resetTimeouts() {
update(time.milliseconds());
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
pollTimer.reset(maxPollIntervalMs);
heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
}
void resetSessionTimeout() {
update(time.milliseconds());
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
}
boolean pollTimeoutExpired(long now) {
update(now);
return pollTimer.isExpired();
}
long lastPollTime() {
return pollTimer.currentTimeMs();
}
}
:1. session.timeout.ms
[*]作用: 定义消费者在 Group Coordinator 中会话的超时时间。假如消费者在此时间内未发送心跳,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。
源码注释
在利用Kafka消费者组管理机制时,用于检测客户端故障的会话超时时间。客户端会周期性地向Broker发送心跳信号以表明其存活状态。假如Broker在此会话超时到期前未收到心跳,则会将该客户端从消费者组中移除并触发再平衡。需要留意的是,该值必须处于Broker端设置参数group.min.session.timeout.ms(最小会话超时)和group.max.session.timeout.ms(最大会话超时)规定的允许范围内。
[*]默认值: 通常为 10 秒(10000ms:)。
[*]可能导致 Rebalancing 的缘故原由:
[*]:消费者处置惩罚使命过重,导致无法按时发送心跳。
[*]:网络延迟或故障,导致心跳无法及时到达 Group Coordinator。
[*]优化建议: 根据消费者的处置惩罚本事和网络状态适当增大该值,但留意过大的值会延迟消费者失效的检测。
:2. heartbeat.interval.ms:
[*]作用: 定义消费者发送心跳的时间间隔。
源码注释:
在利用Kafka消费者组管理机制时,与消费者协调器之间预期的心跳间隔时间。心跳机制用于维持消费者会话的活泼状态,并在新消费者到场或离开组时促进再平衡过程。该参数值必须小于session.timeout.ms(会话超时时间),通常建议设置为该值的1/3以下。可通过进一步调低此值来控制常规再平衡的预期相应时间。
[*]默认值: 通常为 3 秒(3000ms:)。
[*]可能导致 Rebalancing 的缘故原由:
[*]:心跳间隔设置过长,导致消费者在: session.timeout.ms: :内未能发送足够的心跳。
[*]:消费者处置惩罚使命过重或网络问题,导致心跳未能按时发送。
[*]优化建议: 确保 heartbeat.interval.ms: 的值小于 session.timeout.ms: 的三分之一,并适当调解以平衡心跳频率和网络负载。
:3. max.poll.interval.ms:
[*]作用: 定义消费者在两次调用 poll(): 方法之间的最大时间间隔。假如消费者在此时间内未调用 poll():,Group Coordinator 会认为该消费者已失效,触发 Rebalancing。 对于利用指定分组id (:group.instance.id
页:
[1]