【消息队列】RabbitMQ实现消费者组机制

打印 上一主题 下一主题

主题 877|帖子 877|积分 2631

目次

1. RabbitMQ 的 发布订阅模式
2. GRPC 服务间的实体同步
2.1 生产者服务
2.2 消费者服务
3. 可靠性
3.1 生产者丢失消息
3.2 消费者丢失消息
3.3 RabbitMQ 中心件丢失消息


1. RabbitMQ 的 发布订阅模式

https://www.rabbitmq.com/tutorials/tutorial-three-go



  • P 生产者:发送消息到一个特定的交换机(交换机类型是fanout),不需指定具体的目标队列
  • X 交换机:将消息分发给所有绑定到它的队列
  • C 消费者:订阅主题,通过绑定到交换机的队列吸收消息

2. 服务间的实体同步

考虑以下业务需求——


  • 模仿消费者组机制:

    • 同一消费者组下的消费者(即一个服务的多个实例)监听同一个队列,是竞争关系
    • 不同消费者组(即不同服务)监听不同队列,这些队列绑定到同一个交换机,不同消费者组可以独立消费雷同的数据

  • 消费历史数据:当生产者先启动,生产了一部门数据,消费者后启动时,也能消费到历史数据
服务之间的实体同步:

2.1 生产者服务

(1) 初始化
生产者初始化时必要负责绑定 RabbitMQ 的交换机和队列关系,即显式声明本身的实体有哪些消费者在消费。比如:


  • 声明交换机 exchange_A、exchange_B
  • 声明消费者 consumer_1、consumer_2
  • 创建队列 exchange_A_consumer_1、exchange_B_consumer_1、exchange_B_consumer_2,也就是对每个 topic-consumer 组合,创建一个相应的队列
  • 将交换机和队列绑定

(2) 实体变动时发送消息
发送消息到交换机,交换机会自动分发给所有绑定到它的队列,也就是发送一条消息至 exchange_B交换机,那么消息会被投递给队列 exchange_B_consumer_1 和 队列 exchange_B_consumer_2。

2.2 消费者服务

消费者订阅一个 topic,处置惩罚 rabbitMQ 队列发来的消息。


  • 若消息处置惩罚乐成(业务流程乐成),发送 Ack 给 rabbitMQ 确认消费
  • 若消息处置惩罚失败(业务流程失败),发送 Nack 关照 rabbitMQ 处置惩罚失败,消息将放回队列等候下次消费
   Ack 时 rabbitMQ 会记录消费者消费的 offset,下次会基于 offset 继续消费~
  
3. 可靠性

3.1 生产者丢失消息

(1) 生产者绑定交换机和队列
在生产者初始化时,必要先将交换机和队列的关系绑定好,以避免此场景发生:生产者先启动,未绑定交换机和队列,发送了消息到交换机,此时无法投递到具体队列。消费者后启动,即便做了交换机和队列绑定,也无法消费到历史消息。
  1. func NewMQ(rabbitMQCfg *Config, option Option) (*RabbitMQ, error) {
  2.         // ...
  3.         // 初始化交换机和队列
  4.         for topic, consumerGroups := range option.TopicConsumerGroupsBinding {
  5.                 err = initExchange(topic, consumerGroups, mq)
  6.                 if err != nil {
  7.                         return nil, err
  8.                 }
  9.         }
  10.         return mq, nil
  11. }
  12. func initExchange(exchange, consumerGroups string, mq *RabbitMQ) error {
  13.         // 1. 创建发送通道
  14.         pch, err := mq.conn.Channel()
  15.         if err != nil {
  16.                 return err
  17.         }
  18.         mq.produceChannels[exchange] = pch
  19.         // 2. 开启消息确认机制
  20.         if err := pch.Confirm(false); err != nil {
  21.                 return err
  22.         }
  23.         // 3. 创建交换机
  24.         // 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
  25.         err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
  26.         if err != nil {
  27.                 return err
  28.         }
  29.         slog.Info("rabbitmq declared exchange", "exchange_name", exchange)
  30.         // 4. 创建队列并绑定到交换机
  31.         for _, consumerGroup := range strings.Split(consumerGroups, ",") {
  32.                 consumerGroup = strings.TrimSpace(consumerGroup)
  33.                 if consumerGroup == "" {
  34.                         continue
  35.                 }
  36.                 queue := queueName(exchange, consumerGroup)
  37.                 // 创建队列
  38.                 // 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
  39.                 _, err = pch.QueueDeclare(queue, true, false, false, false, nil)
  40.                 if err != nil {
  41.                         return err
  42.                 }
  43.                 // 将队列绑定到交换机
  44.                 // 参数 queue:队列名称, key:路由键, exchange:交换机名称, noWait:是否等待服务器确认, args:额外参数
  45.                 err = pch.QueueBind(queue, "", exchange, false, nil)
  46.                 if err != nil {
  47.                         return err
  48.                 }
  49.                 slog.Info("rabbitmq declared and bind queue", "queue", queue, "bind_exchange", exchange)
  50.                 // 创建接收通道
  51.                 cch, err := mq.conn.Channel()
  52.                 if err != nil {
  53.                         return err
  54.                 }
  55.                 mq.consumeChannels[queue] = cch
  56.         }
  57.         // 5. 开启消息确认事件监听、消息投递事件监听
  58.         mq.publishWatcher[exchange] = &watcher{
  59.                 returnCh:  pch.NotifyReturn(make(chan amqp.Return)),
  60.                 confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
  61.         }
  62.         // 监听未被交换机投递的消息
  63.         go func() {
  64.                 for ret := range mq.publishWatcher[exchange].returnCh {
  65.                         // 尝试重新投递
  66.                         ctx, _ := context.WithTimeout(context.Background(), mq.config.Timeout)
  67.                         if err := mq.publish(ctx, ret.Exchange, ret.MessageId, ret.Body, ret.Timestamp); err != nil {
  68.                                 slog.Error("rabbitmq republish failed.", "exchange", ret.Exchange, "msgID", ret.MessageId, "err", err)
  69.                         } else {
  70.                                 slog.Warn("rabbitmq got exchange undelivered msg, republished.", "exchange", ret.Exchange, "msgID", ret.MessageId)
  71.                         }
  72.                         time.Sleep(time.Second * 3)
  73.                 }
  74.         }()
  75.         return nil
  76. }
复制代码
(2) 发送重试
发送消息时增加重试机制。若超过重试上限,需记录日志或报警。
  1. func (r *RabbitMQ) Produce(ctx context.Context, topic string, data map[string]any) error {
  2.         body, _ := json.Marshal(data)
  3.         msgID := uuid.New()
  4.         var retried int
  5.         for {
  6.                 err := r.publish(ctx, topic, msgID, body, time.Now())
  7.                 if err == nil {
  8.                         return nil
  9.                 }
  10.                 retried++
  11.                 if retried > r.option.RetryNum {
  12.                         return err
  13.                 }
  14.                 time.Sleep(r.option.RetryInterval)
  15.         }
  16. }
复制代码
(3) confirm 消息确认机制
生产端投递消息到 RabbitMQ 后,RabbitMQ 将发送一个确认变乱,让生产端知晓消息已发送乐成。监听 confirm 变乱以确认消息的发送状态:
  1. func initExchange(exchange string, mq *RabbitMQ) error {
  2.         // ...
  3.         // 开启消息确认机制
  4.         if err := pch.Confirm(false); err != nil {
  5.                 return err
  6.         }
  7.        
  8.         // 创建监听器
  9.         mq.publishWatcher[exchange] = &watcher{
  10.                 confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
  11.         }
  12.         // ...
  13. }
  14. func (r *RabbitMQ) publish(ctx context.Context, ...) error {
  15.         // publish发送消息
  16.         // ...
  17.        
  18.         // 等待rabbitmq返回消息确认
  19.         select {
  20.         case confirm := <-r.publishWatcher[exchange].confirmCh:
  21.                 if !confirm.Ack {
  22.                         return errors.New("publish failed, got nack from rabbitmq")
  23.                 }
  24.         case <-ctx.Done():
  25.                 return errors.New("context deadline, publish to rabbitmq timeout")
  26.         case <-time.After(r.config.Timeout):
  27.                 return errors.New("publish to rabbitmq timeout")
  28.         }
  29.         return nil
  30. }
复制代码

3.2 消费者丢失消息

消费者消费完成后,必须手动 Ack 关照 MQ,表示已经消费乐成:
  1. func (r *RabbitMQ) Ack(topic, consumerGroup, msgID string) error {
  2.         // ...
  3.         return consumeChannel.Ack(deliveryTag, false)
  4. }
复制代码
如果消费失败,必要手动 Nack,那此条消息会重新入队,等候下次消费:
  1. func (r *RabbitMQ) Nack(topic, consumerGroup, msgID string) error {
  2.         // ...
  3.         return consumeChannel.Nack(deliveryTag, false, true)
  4. }
复制代码

3.3 RabbitMQ 中心件丢失消息

(1) 数据持久化到磁盘
交换机持久化(durable=true):
  1. // 创建交换机
  2. // 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
  3. err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
复制代码
队列持久化(durable=true):
  1. // 创建队列
  2. // 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
  3. _, err = pch.QueueDeclare(queue, true, false, false, false, nil)
复制代码
消息持久化(DeliveryMode=Persistent):
  1. err := ch.Publish(
  2.         exchange, // 交换机名称
  3.         "",       // 路由键
  4.         true,     // 如果消息不能路由到任何队列,是否返回未处理的消息. true将返回未处理通知, false将丢弃消息
  5.         false,    // 是否立即交付给消费者. 若为true, 当队列中没有等待中的消费者时,消息会被丢弃
  6.         amqp.Publishing{
  7.                 MessageId:    msgID,              // 消息ID
  8.                 ContentType:  "application/json", // 消息内容类型
  9.                 Body:         body,               // 消息内容
  10.                 DeliveryMode: amqp.Persistent,    // 消息需要持久化
  11.                 Timestamp:    t,                  // 消息时间
  12.         },
  13. )
复制代码
(2) RabbitMQ 本身的数据一致性保证
RabbitMQ 使用 raft 共识算法保证数据一致性:
https://www.rabbitmq.com/docs/clustering#replica-placement




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

水军大提督

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表