这篇文章将重要介绍 Kafka 中的元数据和生产者拉取元数据的过程。生产者在发送消息时,需要获取消息所属的 Topic 在 Broker 节点上的分布情况,然后将消息发送到对应的节点,因此,在生产者发送消息时,需要有当前 Topic 的元数据信息。本篇文章在 Kafka 生产者拉取元数据代码的底子上,对元数据内部的数据结构和生产者拉取元数据的过程进行介绍,并且对生产者拉取元数据的代码增长了注释来方便明确。
1. 元数据
Metadata 是 Kafka 中的元数据类,负责对元数据进行管理,Metadata类内部通过大量synchronized方法来保证多线程下修改元数据的线程安全。以下是 Metadata 内部包含的重要数据结构:
- public class Metadata implements Closeable {
- private final Logger log;
- // 重试间隔时间
- private final ExponentialBackoff refreshBackoff;
- // 元数据过期时间
- private final long metadataExpireMs;
- // 元数据版本号,每次元数据响应都会加 1
- private int updateVersion; // bumped on every metadata response
- // 有新主题(Topic)的版本号,每次有新主题加 1
- private int requestVersion; // bumped on every new topic addition
- // 上一次更新时间
- private long lastRefreshMs;
- // 上一次成功更新时间
- private long lastSuccessfulRefreshMs;
- private long attempts;
- private KafkaException fatalException;
- private Set<String> invalidTopics;
- private Set<String> unauthorizedTopics;
- // **当前保存的元数据**
- private volatile MetadataSnapshot metadataSnapshot = MetadataSnapshot.empty();
- // 需要全部主题更新
- private boolean needFullUpdate;
- // 需要部分主题更新
- private boolean needPartialUpdate;
- private long equivalentResponseCount;
- private final ClusterResourceListeners clusterResourceListeners;
- private boolean isClosed;
- private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
- /** Addresses with which the metadata was originally bootstrapped. */
- private List<InetSocketAddress> bootstrapAddresses;
- }
复制代码 MetadataSnapshot 是当前生存的元数据。
- public class MetadataSnapshot {
- // 集群 cluset id
- private final String clusterId;
- // 节点
- private final Map<Integer, Node> nodes;
- // Topic
- private final Set<String> unauthorizedTopics;
- private final Set<String> invalidTopics;
- private final Set<String> internalTopics;
- // controller 节点
- private final Node controller;
- // Partition 和对应的 Partition 元数据
- private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
- // Topic 信息
- private final Map<String, Uuid> topicIds;
- private final Map<Uuid, String> topicNames;
- // Cluster 集群信息
- private Cluster clusterInstance;
- }
复制代码 Broker 节点
- public class Node {
- // 节点 id
- private final int id;
- // 节点 id string
- private final String idString;
- // 节点地址
- private final String host;
- // 节点端口
- private final int port;
- // 节点机架
- private final String rack;
- }
复制代码 Cluster 集群
- public final class Cluster {
- private final boolean isBootstrapConfigured;
- // 节点
- private final List<Node> nodes;
- private final Set<String> unauthorizedTopics;
- private final Set<String> invalidTopics;
- private final Set<String> internalTopics;
- // Controller 节点
- private final Node controller;
- // Partition 和对应的 PartitionInfo
- private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
- // Topic 和对应的 PartitionInfo
- private final Map<String, List<PartitionInfo>> partitionsByTopic;
- private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
- // 节点和对应的 PartitionInfo
- private final Map<Integer, List<PartitionInfo>> partitionsByNode;
- // 节点 Id 和对应的服务器节点
- private final Map<Integer, Node> nodesById;
- private final ClusterResource clusterResource;
- // Topic 的 Id 和 Name
- private final Map<String, Uuid> topicIds;
- private final Map<Uuid, String> topicNames;
- }
复制代码- public class TopicMetadata {
- // Topic id
- private final Uuid id;
- // Topic name
- private final String name;
- // 分区数量
- private final int numPartitions;
- // 分区和对应的机架
- private final Map<Integer, Set<String>> partitionRacks;
- }
复制代码- public class PartitionMetadata {
- // 版本号
- private final int version;
- // Topic id
- private final Uuid topicId;
- }
复制代码- public final class TopicPartition implements Serializable {
- // 分区编号
- private final int partition;
- // Topic
- private final String topic;
- }
复制代码- public class PartitionInfo {
- // Topic
- private final String topic;
- // 分区编号
- private final int partition;
- // Leader 所在的服务器节点
- private final Node leader;
- // 副本所在的服务器节点
- private final Node[] replicas;
- // **ISR**: 可以与 leader 保持同步的副本
- private final Node[] inSyncReplicas;
- private final Node[] offlineReplicas;
- }
复制代码 2. 发起元数据哀求
客户端调用 send() 来发送一条消息,send() 方法底层调用了 doSend() 方法。doSend() 方法中在发送消息前调用 waitOnMetadata() 哀求元数据。如下为 doSend() 方法中哀求元数据的部分。
- private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
- long nowMs = time.milliseconds();
- ClusterAndWaitTime clusterAndWaitTime;
- try {
- // **请求元数据**
- clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
- } catch (KafkaException e) {
- if (metadata.isClosed())
- throw new KafkaException("Producer closed while send in progress", e);
- throw e;
- }
- }
复制代码 waitOnMetadata() 中起首尝试从已生存的元数据中获取 cluster 信息,如果失败,则会发起元数据哀求,调用 awaitUpdate() 阻塞当火线程等待拉取元数据。
- private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
- // 尝试从元数据中获取 cluster 信息
- Cluster cluster = metadata.fetch();
- // 判断是否是无效主题
- if (cluster.invalidTopics().contains(topic))
- throw new InvalidTopicException(topic);
- // 将当前主题加入元数据
- metadata.add(topic, nowMs);
- // 尝试获取分区数量
- Integer partitionsCount = cluster.partitionCountForTopic(topic);
- // 如果存在则直接从当前保存的元数据中返回
- if (partitionsCount != null && (partition == null || partition < partitionsCount))
- return new ClusterAndWaitTime(cluster, 0);
- // 如果不存在则开始请求元数据
- long remainingWaitMs = maxWaitMs;
- long elapsed = 0;
- // Issue metadata requests until we have metadata for the topic and the requested partition,
- // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
- // is stale and the number of partitions for this topic has increased in the meantime.
- long nowNanos = time.nanoseconds();
- do {
- if (partition != null) {
- log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
- } else {
- log.trace("Requesting metadata update for topic {}.", topic);
- }
- metadata.add(topic, nowMs + elapsed);
- // 将 needFullUpdate 或 needPartialUpdate 设定为 true,返回 version
- int version = metadata.requestUpdateForTopic(topic);
- // 唤醒 sender 线程
- sender.wakeup();
- try {
- // **调用 awaitUpdate() 阻塞当前线程等待拉取元数据**
- metadata.awaitUpdate(version, remainingWaitMs);
- } catch (TimeoutException ex) {
- // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
- throw new TimeoutException(
- String.format("Topic %s not present in metadata after %d ms.",
- topic, maxWaitMs));
- }
- // 尝试从元数据中获取 cluster 信息
- cluster = metadata.fetch();
- elapsed = time.milliseconds() - nowMs;
- if (elapsed >= maxWaitMs) {
- throw new TimeoutException(partitionsCount == null ?
- String.format("Topic %s not present in metadata after %d ms.",
- topic, maxWaitMs) :
- String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
- partition, topic, partitionsCount, maxWaitMs));
- }
- metadata.maybeThrowExceptionForTopic(topic);
- remainingWaitMs = maxWaitMs - elapsed;
- // 尝试获取分区数量
- partitionsCount = cluster.partitionCountForTopic(topic);
- // 当 获取不到分区信息 或者 当前分区号大于获取到的分区数量,则继续循环
- } while (partitionsCount == null || (partition != null && partition >= partitionsCount));
- producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos);
- return new ClusterAndWaitTime(cluster, elapsed);
- }
复制代码 awaitUpdate() 会阻塞主线程,等待 sender 线程返回元数据相应后 (updateVersion() > lastVersion) 才会继续执行。
- public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {
- long currentTimeMs = time.milliseconds();
- long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;
- // **阻塞等待拉取元数据**
- time.waitObject(this, () -> {
- // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller.
- maybeThrowFatalException();
- return updateVersion() > lastVersion || isClosed();
- }, deadlineMs);
- if (isClosed())
- throw new KafkaException("Requested metadata update after close");
- }
复制代码 3. sender 线程中的元数据哀求
sender 调用了 KafkaClient 接口中的 poll() 方法来进行网络哀求。NetworkClient 实现了 KafkaClient 接口,其内部调用了 metadataUpdater.maybeUpdate() 来哀求元数据,以下为 poll() 方法的具体实现。
- @Override
- public List<ClientResponse> poll(long timeout, long now) {
- ensureActive();
- if (!abortedSends.isEmpty()) {
- // If there are aborted sends because of unsupported version exceptions or disconnects,
- // handle them immediately without waiting for Selector#poll.
- List<ClientResponse> responses = new ArrayList<>();
- handleAbortedSends(responses);
- completeResponses(responses);
- return responses;
- }
- // **调用 maybeUpdate() 拉取元数据**
- long metadataTimeout = metadataUpdater.maybeUpdate(now);
- long telemetryTimeout = telemetrySender != null ? telemetrySender.maybeUpdate(now) : Integer.MAX_VALUE;
- try {
- this.selector.poll(Utils.min(timeout, metadataTimeout, telemetryTimeout, defaultRequestTimeoutMs));
- } catch (IOException e) {
- log.error("Unexpected error during I/O", e);
- }
- // process completed actions
- long updatedNow = this.time.milliseconds();
- List<ClientResponse> responses = new ArrayList<>();
- handleCompletedSends(responses, updatedNow);
- handleCompletedReceives(responses, updatedNow);
- handleDisconnections(responses, updatedNow);
- handleConnections();
- handleInitiateApiVersionRequests(updatedNow);
- handleTimedOutConnections(responses, updatedNow);
- handleTimedOutRequests(responses, updatedNow);
- completeResponses(responses);
- return responses;
- }
复制代码 maybeUpdate() 内部起首根据更新标记和前次更新元数据的时间间隔等信息计算是否需要再次更新元数据,如果需要更新,则选取一个较为空闲的节点发起元数据哀求。
- @Override
- public long maybeUpdate(long now) {
- // 根据更新标记,过期时间和退避时间,计算出下一次要更新元数据的时间。
- long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
- // 是否有正在进行的元数据请求,如果有,将 waitForMetadataFetch 设定为 defaultRequestTimeoutMs(默认30s)
- long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;
- // 计算元数据超时时间
- long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
- if (metadataTimeout > 0) {
- return metadataTimeout;
- }
- // Beware that the behavior of this method and the computation of timeouts for poll() are
- // highly dependent on the behavior of leastLoadedNode.
- LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
- // Rebootstrap if needed and configured.
- if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP
- && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
- metadata.rebootstrap();
- leastLoadedNode = leastLoadedNode(now);
- }
- if (leastLoadedNode.node() == null) {
- log.debug("Give up sending metadata request since no node is available");
- return reconnectBackoffMs;
- }
- // **调用 private maybeUpdate(long now, Node node) 向选中的节点请求元数据**
- return maybeUpdate(now, leastLoadedNode.node());
- }
- // **判断是否需要更新元数据的条件**
- // 计算下一次要更新元数据的时间
- public synchronized long timeToNextUpdate(long nowMs) {
- // updateRequested() 检查是否标记了 needFullUpdate 或者 needPartialUpdate
- // 如果已标记,则将过期时间 timeToExpire 设定为 0。
- // 如果未标记,则根据 lastSuccessfulRefreshMs 和 metadataExpireMs 计算当前保存的元数据的过期时间 timeToExpire。
- long timeToExpire = updateRequested() ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
- // timeToAllowUpdate() 根据退避时间计算允许下一次更新的时间
- // 最终返回 过期时间 和 允许下一次更新的时间 中较大的值。
- return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
- }
- // 如果 needFullUpdate 或者 needPartialUpdate 为 true,则表示需要立即更新元数据
- public synchronized boolean updateRequested() {
- return this.needFullUpdate || this.needPartialUpdate;
- }
- public synchronized long timeToAllowUpdate(long nowMs) {
- // 计算重试的退避时间,在退避时间内不进行重试。
- long backoffForAttempts = Math.max(this.lastRefreshMs +
- this.refreshBackoff.backoff(this.attempts > 0 ? this.attempts - 1 : 0) - nowMs, 0);
- // Periodic updates based on expiration resets the equivalent response count so exponential backoff is not used
- if (Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0) == 0) {
- this.equivalentResponseCount = 0;
- }
- // 计算重复返回相同响应的退避时间
- long backoffForEquivalentResponseCount = Math.max(this.lastRefreshMs +
- (this.equivalentResponseCount > 0 ? this.refreshBackoff.backoff(this.equivalentResponseCount - 1) : 0) - nowMs, 0);
- return Math.max(backoffForAttempts, backoffForEquivalentResponseCount);
- }
复制代码 private maybeUpdate(long now, Node node) 方法将元数据哀求封装成 ClientRequest (与生产者发送消息类似),并发送哀求。
- private long maybeUpdate(long now, Node node) {
- String nodeConnectionId = node.idString();
- if (canSendRequest(nodeConnectionId, now)) {
- // **将元数据请求封装成 ClientRequest,并发送请求**
- Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(now);
- MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
- log.debug("Sending metadata request {} to node {}", metadataRequest, node);
- sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
- inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);
- return defaultRequestTimeoutMs;
- }
- // If there's any connection establishment underway, wait until it completes. This prevents
- // the client from unnecessarily connecting to additional nodes while a previous connection
- // attempt has not been completed.
- if (isAnyNodeConnecting()) {
- // Strictly the timeout we should return here is "connect timeout", but as we don't
- // have such application level configuration, using reconnect backoff instead.
- return reconnectBackoffMs;
- }
- if (connectionStates.canConnect(nodeConnectionId, now)) {
- // We don't have a connection to this node right now, make one
- log.debug("Initialize connection to node {} for sending metadata request", node);
- initiateConnect(node, now);
- return reconnectBackoffMs;
- }
- // connected, but can't send more OR connecting
- // In either case, we just need to wait for a network event to let us know the selected
- // connection might be usable again.
- return Long.MAX_VALUE;
- }
复制代码 4. 处理元数据相应并更新元数据
在节点返回元数据相应之后,由 sender 线程 poll() 方法中的 handleCompletedReceives() 处理元数据。与生产者发送消息类似,起首将已经完成的哀求从 inFlightRequests 中移除,然后调用 handleSuccessfulResponse() 来处理元数据相应中的信息。
- private void handleCompletedReceives(List<ClientResponse> responses, long now) {
- for (NetworkReceive receive : this.selector.completedReceives()) {
- String source = receive.source();
- // 将已经完成的请求从 inFlightRequests 中移除
- InFlightRequest req = inFlightRequests.completeNext(source);
- AbstractResponse response = parseResponse(receive.payload(), req.header);
- if (throttleTimeSensor != null)
- throttleTimeSensor.record(response.throttleTimeMs(), now);
- if (log.isDebugEnabled()) {
- log.debug("Received {} response from node {} for request with header {}: {}",
- req.header.apiKey(), req.destination, req.header, response);
- }
- maybeThrottle(response, req.header.apiVersion(), req.destination, now);
-
- // **如果是元数据响应,则调用 handleSuccessfulResponse() 来处理元数据响应中的信息**
- if (req.isInternalRequest && response instanceof MetadataResponse)
- metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);
-
- }
- }
复制代码 handleSuccessfulResponse() 中更新当前生存的元数据。
- @Override
- public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
-
- // 更新元数据
- this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now);
-
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |