Pulsar 入门实战(1)--Pulsar 消息通报

打印 上一主题 下一主题

主题 861|帖子 861|积分 2583

本文主要介绍 Pulsar 消息通报的相干概念,对应的 pulsar 版本为 3.3.x。
1、概述

Pulsar 基于发布-订阅模式构建。在这种模式中,生产者将消息发布到主题;消费者订阅这些主题,处理传入的消息,并在处理完成后向 broker 发送确认。

当创建订阅时,即使消费者断开连接,Pulsar 也会保留所有消息。只有当消费者确认所有这些消息都已成功处理后,保留的消息才会被丢弃。
如果消息消费失败并且盼望重新消费该消息,可以启用消息重投递机制,让 broker 重新发送该消息。
2、消息

消息是 Pulsar 的基本“单位”。它们是生产者发布到主题的内容,也是消费者从主题中消费的内容。下表列出了消息的组成部分。
组件说明
Value / data payload消息的内容
Key消息的 key
Properties消息的属性,用户界说的键值对
Producer name生产者的名称,如果没有指定,将主动天生
Topic name主题名称
Schema version消息所使用模式的版本号
Sequence ID消息的序列 ID
Message ID消息 ID
Publish time消息发布的时间戳
Event time由应用程序附加到消息上的可选时间戳。例如,应用程序可以附加消息处理的时间戳。默认为 0。 
消息的默认最大巨细为 5 MB。可以修改如下配置来调解消息的最大巨细:
A、broker.conf
  1. # The max size of a message (in bytes).
  2. maxMessageSize=5242880
复制代码
B、bookkeeper.conf
  1. # The max size of the netty frame (in bytes). Any messages received larger than this value are rejected. The default value is 5 MB.
  2. nettyMaxFrameSizeBytes=5253120
复制代码
2.1、Acknowledgment(消息确认)

消费者在成功消费一条消息后,会向 broker 发送一条消息确认。消息会被永久存储,直到所有订阅已确认该消息后才会被删除。确认(ack)是Pulsar判断消息可以从系统中删除的一种方式。如果您想存储已被消费者确认的消息,必要配置消息保留策略。
对于批量消息,可以启用批量索引确认以制止向消费者重新发送已确认的消息。
消息可以通过以下两种方式举行确认:

  • 单独确认 消费者对每条消息举行确认,向 broker 发送确认请求。
  • 累计确认 消费者只确认它接收到的末了一条消息。在流中,直到(包括)提供的那条消息为止,所有消息都不会重新投递给该消费者。
单独确认 API:
  1. consumer.acknowledge(msg);
复制代码
累计确认 API:
  1. consumer.acknowledgeCumulative(msg);
复制代码
留意:累计确认不能用于共享(Shared)或键共享(Key_Shared)订阅范例,因为共享或键共享订阅范例涉及多个消费者访问类似的订阅。在共享订阅范例中,消息是单独确认的。
2.2、Negative acknowledgment(否定确认)

否定确认机制允许向 broker 发送通知,指示消费者未处理某条消息。当消费者未能成功消费一条消息并必要重新消费时,消费者会向 broker 发送一个否定确认(nack),触发 broker 将这条消息重新投递给消费者。
根据订阅范例不同,消息可以以单独或累积方式举行否定确认。
在 Exclusive 和 Failover 订阅范例中,消费者可用累计方式举行否定确认。
在 Shared 和 Key_Shared 订阅范例中,消费者可可用单独方式举行否定确认。
在有序订阅范例(如 Exclusive、Failover 和 Key_Shared)上使用否定确认大概会导致失败的消息按非原始顺序发送给消费者。
如果你打算对某条消息使用否定确认,请确保在确认超时之前举行否定确认。
否定确认 API:
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer()
  2.                 .topic(topic)
  3.                 .subscriptionName("sub-negative-ack")
  4.                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  5.                 .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) // the default value is 1 min
  6.                 .subscribe();
  7. Message<byte[]> message = consumer.receive();
  8. // call the API to send negative acknowledgment
  9. consumer.negativeAcknowledge(message);
  10. message = consumer.receive();
  11. consumer.acknowledge(message);
复制代码
要以不同的延迟重新投递消息,你可以通过设置消息重投次数来使用重投递退避机制。
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer()
  2.         .topic(topic)
  3.         .subscriptionName("sub-negative-ack")
  4.         .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  5.         .negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
  6.             .minDelayMs(1000)
  7.             .maxDelayMs(60 * 1000)
  8.             .multiplier(2)
  9.             .build())
  10.         .subscribe();
复制代码
消息重投时延如下:
重投消息数重投时延
11 秒
22 秒
34 秒
48 秒
516 秒
632 秒
760 秒
860 秒
留意:如果启用了批处理,批处理中的所有消息都会重新投递给消费者。
2.3、Acknowledgment timeout(确认超时)

留意:默认情况下,确认超时是禁用的,这意味着发送给消费者的消息不会被重新投递,除非消费者崩溃。
确认超机会制允许你设置一个时间,用于客户端跟踪未确认的消息。在到达确认超时时间(ackTimeout)后,客户端会向 broker 发送重新投递未确认消息的请求,从而使 broker 将未确认的消息重新发送给消费者。
你可以配置确认超机会制,在 ackTimeout 之后重新投递消息,定时使命会在每个 ackTimeoutTickTime 周期检查确认超时的消息。
你还可以使用重投递回退机制,通过设置消息重投的次数,以不同的延迟时间重新投递消息。
重投递回退机制 API:
  1. consumer.ackTimeout(10, TimeUnit.SECOND)
  2.         .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
  3.             .minDelayMs(1000)
  4.             .maxDelayMs(60 * 1000)
  5.             .multiplier(2)
  6.             .build());
复制代码
消息重投时延如下:
重投消息数重投时延
110 + 1 秒
210 + 2 秒
310 + 4 秒
410 + 8 秒
510 + 16 秒
610 + 32 秒
710 + 60 秒
810 + 60 秒
留意:
如果启用了批处理,一个批次中的所有消息都会重新投递给消费者。
与确认超时相比,否定确认是首选。首先,设置超时值很困难。其次,当消息处理时间超过确认超时时,broker 会重新发送消息,但这些消息大概不必要被重新消费。
确认超时 API:
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer()
  2.                 .topic(topic)
  3.                 .ackTimeout(2, TimeUnit.SECONDS) // the default value is 0
  4.                 .ackTimeoutTickTime(1, TimeUnit.SECONDS) //定时检查确认超时消息的时间间隔
  5.                 .subscriptionName("sub")
  6.                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  7.                 .subscribe();
  8. Message<byte[]> message = consumer.receive();
  9. // wait at least 2 seconds
  10. message = consumer.receive();
  11. consumer.acknowledge(message);
复制代码
2.4、Retry letter topic(重试信主题)

重试信主题允许你存储未能被消费的消息,并在稍后重新实验消费它们。通过这种方法,你可以自界说消息重新投递的间隔。原始主题上的消费者也会主动订阅重试消息主题。一旦到达最大重试次数,未被消费的消息将被移动到一个死信主题举行手动处理。重试信主题的功能是由消费者实现的。

使用重试信主题与使用延迟消息通报的意图不同,尽管它们都旨在稍后消费消息。重试信主题通过消息重新投递来处理失败,以确保关键数据不会丢失,而延迟消息通报则旨在在指定的延迟时间通报消息。
默认情况下,重试是禁用的。你可以将 enableRetry 设置为 true,以在消费者上启用重试功能。
可用使用以下 API 来从重试信主题消费消息;当到达 maxRedeliverCount 的值时,未被消费的消息将会被移动到死信主题。
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2.                 .topic("my-topic")
  3.                 .subscriptionName("my-subscription")
  4.                 .subscriptionType(SubscriptionType.Shared)
  5.                 .enableRetry(true)
  6.                 .deadLetterPolicy(DeadLetterPolicy.builder()
  7.                         .maxRedeliverCount(maxRedeliveryCount)
  8.                         .build())
  9.                 .subscribe();
复制代码
默认的重试信主题格式如下:
  1. <topicname>-<subscriptionname>-RETRY
复制代码
通过代码指定重试信主题:
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2.         .topic("my-topic")
  3.         .subscriptionName("my-subscription")
  4.         .subscriptionType(SubscriptionType.Shared)
  5.         .enableRetry(true)
  6.         .deadLetterPolicy(DeadLetterPolicy.builder()
  7.                 .maxRedeliverCount(maxRedeliveryCount)
  8.                 .retryLetterTopic("my-retry-letter-topic-name")
  9.                 .build())
  10.         .subscribe();
复制代码
重试信主题中的消息包含一些特殊属性,这些属性是由客户端主动创建的。
属性描述
REAL_TOPIC实际的主题
ORIGIN_MESSAGE_ID消息原始 message ID
RECONSUMETIMES消费消息的重试次数
DELAY_TIME消息重试间隔,单位为毫秒
可使用以下 API 将消息存储在重试信主题中:
  1. consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS);
复制代码
可以使用以下 API 添加自界说属性。在下一次消费时,可以通过 message#getProperty 获取自界说属性。
  1. Map<String, String> customProperties = new HashMap<String, String>();
  2. customProperties.put("custom-key-1", "custom-value-1");
  3. customProperties.put("custom-key-2", "custom-value-2");
  4. consumer.reconsumeLater(msg, customProperties, 3, TimeUnit.SECONDS);
复制代码
留意:目前,在共享订阅范例中启用了重试主题。 与否定确认相比,重试信主题更适合必要大量重试且具有可配置重试间隔的消息。因为重试主题中的消息被长期化到了 BookKeeper,而因否定确认必要重试的消息则被缓存在客户端。
2.5、Dead letter topic(死信主题)

死信主题允许您在某些消息未成功消费时继续消息的消费。那些未能成功消费的消息会被存储在一个特定的主题中,称为死信主题。死信主题的功能由消费者实现。您可以决定如那边理死信主题中的消息。
启用默认的死信主题:
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2.                 .topic("my-topic")
  3.                 .subscriptionName("my-subscription")
  4.                 .subscriptionType(SubscriptionType.Shared)
  5.                 .deadLetterPolicy(DeadLetterPolicy.builder()
  6.                       .maxRedeliverCount(maxRedeliveryCount)
  7.                       .build())
  8.                 .subscribe();
复制代码
默认死信主题格式如下:
  1. <topicname>-<subscriptionname>-DLQ
复制代码
死信主题的生产者名称格式如下:
  1. -<topicname>-<subscriptionname>-DLQ
复制代码
指定死信主题名称:
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2.                 .topic("my-topic")
  3.                 .subscriptionName("my-subscription")
  4.                 .subscriptionType(SubscriptionType.Shared)
  5.                 .deadLetterPolicy(DeadLetterPolicy.builder()
  6.                       .maxRedeliverCount(maxRedeliveryCount)
  7.                       .deadLetterTopic("my-dead-letter-topic-name")
  8.                       .build())
  9.                 .subscribe();
复制代码
默认情况下,在创建 DLQ 主题时不会创建订阅。如果在 DLQ 主题上没有即时订阅,大概会丢失消息。为了主动创建 DLQ 的初始订阅,您可以指定 initialSubscriptionName 参数。如果设置了这个参数,但是 Broker 的 allowAutoSubscriptionCreation 被禁用,DLQ 的生产者将无法创建。
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
  2.                 .topic("my-topic")
  3.                 .subscriptionName("my-subscription")
  4.                 .subscriptionType(SubscriptionType.Shared)
  5.                 .deadLetterPolicy(DeadLetterPolicy.builder()
  6.                       .maxRedeliverCount(maxRedeliveryCount)
  7.                       .deadLetterTopic("my-dead-letter-topic-name")
  8.                       .initialSubscriptionName("init-sub")
  9.                       .build())
  10.                 .subscribe();
复制代码
initialSubscriptionName 只是为了消息不丢失创建的订阅,并不是本消费者订阅了该死信主题;必要另外写程序处理死信主题中的消息或手工处理。
死信主题用于保存未成功消费的消息,触发条件包括确认超时、否定确认或重试信主题。
留意:目前,死信主题已在共享(Shared)和键共享(Key_Shared)订阅范例中启用。
2.6、Compression(压缩)

消息压缩可以通过小号一些 CPU 开销来减小消息巨细。Pulsar 客户端支持的压缩范例:LZ4、ZLIB、ZSTD、SNAPPY。
压缩范例存储在消息的元数据中,因此消费者可以根据必要主动采用不同的压缩范例。
生产者中启用压缩:
  1. client.newProducer()
  2.     .topic("topic-name")
  3.     .compressionType(CompressionType.LZ4)
  4.     .create();
复制代码
2.7、Batching(批处理)

当启用批处理时,生产者会累积并在单个请求中发送一批消息。批量巨细由最大消息数和最大发布延迟界说。因此,积压巨细表示的是批的总数,而不是消息的总数。

在 Pulsar 中,批次被作为单个单位举行跟踪和存储,而不是作为单独的消息。消费者将一个批次解开成单独的消息。然而,即使启用了批处理,通过 deliverAt 或 deliverAfter 参数配置的预定消息始终会作为单独的消息发送。
通常情况下,一个批次在所有消息被消费者确认后才会被确认。这意味着如果批次中不是所有消息都被确认,大概由于不测故障、否定确认(NACK)或确认超时,会导致重新投递该批次中的所有消息。
为了制止将已确认的批次消息重新投递给消费者,Pulsar 从版本 2.6.0 开始引入了批次索引确认功能。启用批次索引确认后,消费者会过滤已确认的批次索引,并向 broker 发送批次索引确认请求。broker 会维护批次索引的确认状态,并跟踪每个批次索引的确认状态,以制止向消费者分发已确认的消息。当批次中所有消息的索引都被确认时,该批次会被删除。
默认情况下,批次索引确认是禁用的(acknowledgmentAtBatchIndexLevelEnabled=false)。可以在 broker 中将 acknowledgmentAtBatchIndexLevelEnabled 参数设置为 true 来启用批次索引确认。启用批次索引确认会导致更多的内存开销。
批次索引确认还必须在消费者端通过调用 .enableBatchIndexAcknowledgment(true) 来启用:
  1. Consumer<byte[]> consumer = pulsarClient.newConsumer()
  2.         .topic(topicName)
  3.         .subscriptionName(subscriptionName)
  4.         .subscriptionType(subType)
  5.         .enableBatchIndexAcknowledgment(true)
  6.         .subscribe();
复制代码
留意:异步发送消息(sendAsync)时批处理才会生效。
2.8、Chunking(分块)

消息分块允许 Pulsar 在生产者端将大消息分割成多个块,并在消费者端将分块的消息聚合起来处理。
启用消息分块后,当消息巨细超过允许的最大有效载荷巨细(即 broker 的 maxMessageSize 参数)时,消息通报的工作流程如下:

  • 生产者将原始消息分割为分块消息(并带有分块元数据),将它们按顺序分别发送到 broker。
  • broker 将分块消息以与普通消息类似的方式存储在同一管理 ledger 中,并使用 chunkedMessageRate 参数记录主题上的分块消息速率。
  • 消费者缓存分块消息,在接收到消息的所有分块后,将分块消息聚合到接收队列中。
  • 客户端从接收队列消费聚合后的消息。
留意:
分块仅实用于长期化主题。
分块不能与批处理同时启用。在启用分块之前,必要先禁用批处理。
2.8.1、使用有序消费者处理一连的分块消息

下图显示了一个主题,此中有一个生产者发布了分块消息和常规非分块消息。生产者将消息 M1 分为三个标记为 M1-C1、M1-C2 和 M1-C3 的分块消息。broker 将所有三个分块消息存储在管理 ledger 中,并按类似顺序将它们发送到有序(独占/故障转移)消费者。消费者在内存中缓冲分块消息,直到接收到所有分块消息,然后将它们聚合成一条消息,末了将原始消息 M1 交给客户端。

2.8.2、使用有序消费者处理交织的分块消息

当多个生产者将分块消息发布到单个主题时,broker 未来自不同生产者的所有分块消息存储在同一个管理 ledger 中。管理 ledger 中的分块消息大概会交织在一起。如下所示,生产者1将消息 M1 分为三个分块消息 M1-C1、M1-C2 和 M1-C3。生产者2将消息 M2 分为三个分块消息 M2-C1、M2-C2 和 M2-C3。特定消息的所有分块消息仍然是有序的,但在管理 ledger 中大概不是一连的。

留意:在这种情况下,交织的分块消息大概会给消费者带来一些内存压力,因为消费者为每个大消息保留一个单独的缓冲区,以将其所有分块消息聚合成一条消息。你可以通过配置maxPendingChunkedMessage 参数来限制消费者同时维护的最大分块消息数。当到达阈值时,消费者通过静默确认来丢弃待处理的消息或请求 broker 稍后重新通报它们,以优化内存利用。
2.8.3、启用消息分块

条件:将 enableBatching 参数设置为 false 来禁用批处理。
消息分块功能默认处于关闭状态。要启用消息分块,在创建生产者时将 chunkingEnabled 设置为 true。
留意:如果消费者在指定时间(expireTimeOfIncompleteChunkedMessage)内未能接收到消息的所有分块,则未完成的分块将逾期。逾期时间默认值为 1 分钟。
3、主题

Pulsar 主题是一种存储单位,用于将消息组织成流。与其他发布-订阅系统类似,Pulsar 中的主题是用于从生产者传输消息到消费者的通道。主题名称是具有明确界说结构的 URL:
  1. {persistent|non-persistent}://tenant/namespace/topic
复制代码
主题名称组件描述
persistent/non-persistentPulsar 支持两种范例的主题:长期化主题和非长期化主题。默认情况下是长期化主题,对于长期化主题,所有消息都会长期化存储在磁盘上(如果 broker 不是独立的,消息将在多个磁盘上长期化存储),而非长期化主题的数据则不会长期化到存储磁盘上。
tenant主题租户,租户在 Pulsar 中对于多租户架构至关重要。
namespace主题命名空间,每个租户拥有一个或多个命名空间。
topic主题名称
留意:在 Pulsar 中,你不必要显式地创建主题。如果客户端实验向一个尚不存在的主题写入或接收消息,Pulsar 会主动创建该主题。如果客户端在创建主题时没有指定租户或命名空间,那么该主题会被创建在默认的租户和命名空间中。你也可以在指定的租户和命名空间中创建主题,例如 persistent://my-tenant/my-namespace/my-topic。
4、命名空间

Pulsar 命名空间是主题的逻辑分组,同时也是租户内的逻辑概率。租户通过管理 API 创建命名空间。例如,具有多个应用程序的租户可以为每个应用程序创建单独的命名空间。命名空间允许应用程序创建和管理主题的层次结构。例如,主题 my-tenant/app1 是租户 my-tenant 下应用程序 app1 的命名空间。你可以在命名空间下创建任意数量的主题。
5、订阅

Pulsar 订阅是一个命名的配置规则,确定消息如何通报给消费者。它是由一组消费者在主题上建立的租约。Pulsar 有四种订阅范例:

  • 独占订阅(exclusive)
  • 共享订阅(shared)
  • 故障转移订阅(failover)
  • 键共享(key_shared)

提示:
在 Pulsar 中,你可以灵活地使用不同的订阅来实现发布-订阅或队列的效果。
1、如果你盼望在消费者之间实现传统的“广播式发布-订阅消息”,可以为每个消费者指定一个唯一的订阅名称,这是一种独占式订阅范例。
2、如果你盼望在消费者之间实现“消息队列”,可以让多个消费者共享类似的订阅名称(共享订阅、故障转移订阅、键共享订阅)。
3、如果你盼望同时实现这两种效果,可以将独占式订阅范例与其他订阅范例结合使用,为消费者创建不同的订阅。
5.1、订阅范例

当一个订阅没有消费者时,其订阅范例是未界说的。订阅的范例在有消费者连接时确定,并且可以通过重新启动所有消费者并使用不同的配置来更改订阅范例。
5.1.1、独占订阅(Exclusive)

独占订阅只允许单个消费者连接到该订阅。如果多个消费者使用类似的订阅名称订阅同一个主题,会发生错误。必要留意的是,如果主题是分区的,所有分区都将由允许连接到该订阅的单个消费者来消费。
在下图中,只有消费者A被允许消费消息。
提示:独占订阅是默认的订阅范例。

5.1.2、故障转移订阅(Failover)

故障转移订阅允许多个消费者连接到同一个订阅上。
对于非分区主题或分区主题的每个分区,会选择一个主消费者来接收消息。
当主消费者断开连接时,所有(未确认的和随后的)消息将被通报给下一个排队的消费者。
留意:在某些情况下,一个分区大概存在一个较旧的活动消费者在处理消息,同时一个新切换的活动消费者开始接收新消息。这大概导致消息重复或顺序庞杂的题目发生。
故障转移 | 分区主题
对于分区主题,broker 按照消费者的优先级和消费者名称的辞书顺序举行排序。broker 实验将分区均匀地分配给优先级最高的消费者。消费者是通过运行一个模运算 mod(partition index, consumer index)来选择的。
A、如果分区主题中的分区数量少于消费者数量
例如,在下图中,这个分区主题有 2 个分区,并且有 4 个消费者。每个分区有 1 个活动消费者和 3 个备用消费者。
  对于 P0,消费者A是主消费者,而消费者B、消费者C 和消费者D是备用消费者。
  对于 P1,消费者B是主消费者,而消费者A、消费者C 和消费者D是备用消费者。
  此外,如果消费者A和消费者B都断开连接,那么
    对于 P0:消费者C是活动消费者,消费者D是备用消费者。
    对于 P1:消费者D是活动消费者,消费者C是备用消费者。


B、如果分区主题中的分区数量多于消费者数量
例如,在下图中,这个分区主题有 9 个分区和 3 个消费者。
  P0、P3 和 P6 分配给消费者A。消费者A是它们的活跃消费者。消费者B和消费者C是它们的备用消费者。
  P1、P4 和 P7 分配给消费者B。消费者B是它们的活跃消费者。消费者A和消费者C是它们的备用消费者。
  P2、P5 和 P8 分配给消费者C。消费者C是它们的活跃消费者。消费者A和消费者B是它们的备用消费者。

故障转移 | 非分区主题
A、如果是一个非分区主题,那么 broker 会按照消费者订阅非分区主题的顺序选择它们。
例如,在下面的图表中,有 1 个非分区主题,2 个消费者。
该主题有 1 个活跃消费者和 1 个备用消费者。
消费者A是主要消费者,如果消费者A断开连接,消费者B将成为下一个接收消息的消费者。

B、如果存在多个非分区主题,消费者的选择是基于哈希消费者名称和哈希主题名称。客户端使用类似的订阅名称订阅所有主题。
例如,在下面的图表中,有 4 个非分区主题和 2 个消费者。
非分区主题 1 和非分区主题 4 分配给消费者 A。消费者 B 是它们的备用消费者。
非分区主题 2 和非分区主题 3 分配给消费者 B。消费者 A 是它们的备用消费者。

5.1.3、共享订阅(Shared)

Pulsar 中的共享订阅范例允许多个消费者连接到同一个订阅。消息以循环分发方式传送到各个消费者,并且每条消息只会传送到一个消费者那边。当一个消费者断开连接时,所有已发送但未被确认的消息将被重新安排发送给其余的消费者。
在下面的图表中,Consumer A、Consumer B 和 Consumer C 都可以订阅该主题。
留意:共享订阅不保证消息顺序或不支持累积确认。

5.1.4、键共享订阅(Shared)

Pulsar 中的建共享订阅范例允许多个消费者连接到同一个订阅。但与共享范例不同,键共享范例是把具有类似键或类似排序键的消息送给同一个消费者。无论消息被重新通报多少次,它都会通报给同一个消费者。

留意:
如果有新切换的活跃消费者,它将从旧的非活跃消费者确认消息的位置开始读取消息。
举例来说,如果 P0 被分配给 Consumer A。Consumer A 是活跃消费者,而 Consumer B 是备用消费者。
如果 Consumer A 断开连接而没有读取任何来自 P0 的消息,在添加 Consumer C 并使其成为新的活跃消费者后,Consumer C 将直接开始从 P0 读取消息。
如果Consumer A 在从 P0 读取消息(0,1,2,3)后断开连接,当添加 Consumer C 并使其成为活跃消费者后,Consumer C 将开始从 P0 读取消息(4,5,6,7)。
有三种映射算法决定如作甚给定的消息键(或排序键)选择消费者:

  • 主动分割哈希范围(Auto-split Hash Range)
  • 主动分割一致性哈希(Auto-split Consistent Hashing)
  • 粘性(Sticky)
每种算法都有其独特的方式来将消息分配给消费者,以确保消息的有效处理和负载均衡。
所有映射算法的步调如下:
1、消息键(或排序键)通报给哈希函数(例如,Murmur3 32-bit),天生一个 32 位整数哈希值。
2、该哈希值通报给算法,从现有的连接消费者中选择一个消费者。
  1.                       +--------------+                              +-----------+
  2. Message Key ----->  / Hash Function / ----- hash (32-bit) -------> / Algorithm / ----> Consumer
  3.                    +---------------+                               +----------+
复制代码
当一个新的消费者连接并因此被添加到已连接消费者列表时,算法会重新调解映射,使当前映射到现有消费者的一些键被映射到新添加的消费者。当一个消费者断开连接并因此从已连接消费者列表中移除时,映射到该消费者的键将被映射到其他消费者。
Auto-split Hash Range

主动分割哈希范围(Auto-split Hash Range)假设每个消费者被映射到 0 到 2^16(65,536)范围内的某个区域;所有的映射区域覆盖整个范围,并且没有重叠。通过对消息哈希举行取模(取模的巨细为65,536)运算,得到的数字(0
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

花瓣小跑

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表