大号在练葵花宝典 发表于 2024-7-30 03:25:04

【架构系列】RabbitMQ应用场景及在实际项目中如何搭建可靠的RabbitMQ架构体

作者:后端小肥肠

创作不易,未经允许克制转载。
1. 前言


RabbitMQ,作为一款高性能、可靠的消息队列软件,已经成为很多企业和开辟团队的首选之一。它的灵活性和可扩展性使得它适用于各种应用场景,从简朴的使命队列到复杂的分布式系统。本文将深入探讨RabbitMQ的应用场景以及如何在实际项目中构建可靠的RabbitMQ架构体系。
2. RabbitMQ应用场景

2.1 异步处理

在现代应用中,异步消息处理是提升用户体验和系统服从的关键。RabbitMQ可以有效地用于多种异步处理使命,比方:


[*]用户注册后的邮件发送:用户注册后,通过RabbitMQ发送一个消息到队列中,由背景服务监听并处剃头送邮件的使命,从而不会延迟用户的注册过程。
[*]订单处理:在电商平台中,订单处理包括库存管理、支付确认等多个步骤,RabbitMQ可以用来在这些服务间异步转达订单信息,确保处理流程的连续性和服从。
2.2 应用解耦

RabbitMQ支持多种通信模式,如点对点、发布/订阅等,这些模式帮助系统各部门保持低耦合度,便于独立扩展和维护。比方:


[*]微服务架构中的服务通信:在微服务架构中,RabbitMQ允许各个微服务之间通过消息进行交互,而不是直接调用对方的API,这种方式减少了服务间的直接依赖。
2.3 流量削峰

在流量高峰期,如促销或大型活动期间,系统大概会遭遇巨大的访问压力。RabbitMQ可以用来缓冲入站消息,如订单或请求,从而保护后端服务不被过载:


[*]秒杀活动中的订单处理:在秒杀活动中,大量的购买请求可以先进入RabbitMQ队列,系统根据处理本领逐步从队列中取出并处理这些请求,有效避免了系统瓦解。
2.4 通信与集成

RabbitMQ提供了一个灵活的消息转达系统,可以集成复杂的企业系统。它支持多种协媾和广泛的开辟语言库,适用于:


[*]跨平台通信:在不同操作系统和不同编程语言编写的应用之间,RabbitMQ可以作为消息转达中间件,实现这些系统的有效通信。
2.5 日记处理和应用监控

RabbitMQ也常用于系统日记处理和监控。它可以聚合各服务产生的日记信息,并传输到日记分析系统:


[*]集中式日记管理:通过RabbitMQ,各个系统和应用的日记可以被统一网络至一个中心处理位置,便于进行日记分析、监控和报警。
2.6 数据同步

RabbitMQ 在数据同步中扮演着告急的角色,特殊是在分布式系统中,它能够确保数据在多个系统或组件之间保持同等性和最新状态。这对于维护数据的完整性和实时性至关告急。比方:


[*] 数据库同步:在多地数据中心运营的情况下,RabbitMQ 可以用来同步不同地点的数据库。通过消息队列,当一个数据中心的数据库更新时,相应的变更可以通过 RabbitMQ 发送到其他数据中心,从而包管所有地点的数据同等。
[*] 实时数据复制:在金融服务或电子商务平台,实时数据复制是包管高可用性和劫难恢复的关键。使用 RabbitMQ,可以实现高效的数据复制战略,如将交易数据从主系统复制到备份系统或分析数据库。
[*] 缓存刷新:在使用缓存进步应用性能的情况下,RabbitMQ 可以用来在数据更新时主动通知系统刷新缓存。如许,用户总是能够获取到最新的数据,而不是过时的缓存数据。
通过这些应用场景,可以看出RabbitMQ在现代软件架构中扮演的多样化角色,不但增强了系统的可靠性和伸缩性,还进步了开辟和运维的服从。
3. 在项目中如何搭建稳定RabbitMQ架构体系

3.1. RabbitMQ安装

网上RabbitMQ安装教程很多,本文只简述基于docker安装的核心步骤:
1. 环境准备,准备Cenos虚拟机,我的是7.x版本:
https://img-blog.csdnimg.cn/direct/11d4f038dbd44ad889c201d3c35fab6e.png
2. 拉取或解压RabbitMQ镜像:
https://img-blog.csdnimg.cn/direct/6b1af83a148042b19b56f69839443d59.png
3. 运行docker容器:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /home/docker/rabbitmq/rabbitmq:/var/lib/rabbitmq -v /home/docker/rabbitmq/rabbitmq_conf:/etc/rabbitmq   -e RABBITMQ_DEFAULT_VHOST=km_vhost-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:latest 4. 进入容器 :
docker exec -it 容器id /bin/bash 5. 运行rabbitmq-plugins enable rabbitmq_management(解决无法访问网页端15672端口题目),即可完成RabbitMQ安装。
https://img-blog.csdnimg.cn/direct/d8be31e3f7b3447b84621003bc24d11b.png
3.2. 总体技能流程

本文以异步处理应用场景为例,展示如何构建稳定可靠的RabbitMQ架构体系:
https://img-blog.csdnimg.cn/direct/30bd8a719521470d812fde45fafc62dd.png
上述流程为异步消息通信的技能流程,在异步消息通信中当消息投递后就立刻返回了效果,我们无法获取消息消耗的具体过程,这就导致了固然我们可以马上获取程序返回状态,但是程序实行细节或是否失败无法通过程序相应返回的方式获取。
基于以上RabbitMQ异步通信的优缺点,我们要搭建一个可靠的RabbitMQ架构需要从以下几个方面入手:
生产者稳定架构:
1. 消息投递回调监听。创建消息投递回调监听函数,监听生产者投递的消息是否投递乐成。
2. 消息确认表创建。创建消息确认表(message_confirmation),记录消息投递状态,其中字段status反应了是否投递乐成(0为为投递乐成,1为投递乐成)。
CREATE TABLE "public"."message_confirmation" (
"id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
"status" int4,
"create_time" timestamp(6),
"update_time" timestamp(6),
"message" varchar(255) COLLATE "pg_catalog"."default",
CONSTRAINT "message_confirmation_pkey" PRIMARY KEY ("id")
)
;

ALTER TABLE "public"."message_confirmation"
OWNER TO "postgres"; 3. 创建定时使命监听消息投递确认表。每隔一段时间遍历消息确认表,筛选出status为0的消息数据,进行重复投递动作。
消耗者稳定架构
1. 死信队列运用。由于网络或外部因素导致消息消耗失败,可将消息投递至死信队列进行二次消耗。
2. 日记表记录。如死信队列也消耗失败,可将消息写入日记表(message_error)后进行手动消耗,由技能人员获取日记表中消耗失败记录,排查消耗失败缘故起因。
CREATE TABLE "public"."message_error" (
"id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
"message_id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
"error_log" text COLLATE "pg_catalog"."default",
"create_time" timestamp(6),
"update_time" timestamp(6),
CONSTRAINT "message_error_pkey" PRIMARY KEY ("id")
)
;

ALTER TABLE "public"."message_error"
OWNER TO "postgres"; 3.3. 实战讲解

3.3.1. 环境配置

3.3.1.1. 所需版本工具

依赖版本Spring Boot2.6.3Java1.8以上postgresSQL13.12RabbitMQ3.9.11 3.3.1.2. pom依赖

<dependencies>
      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
      </dependency>
      <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
      </dependency>
      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
      </dependency>
         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
       <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
      </dependency>
</dependencies> 3.3.2. 生产者核心代码讲解

https://img-blog.csdnimg.cn/direct/828e9081b91244aab9f424c7681bf85f.png
3.3.2.1. yml配置

server:
port: 8873
spring:
datasource:
    url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_producer
    username: postgres
    password: postgres
    driver-class-name: org.postgresql.Driver
rabbitmq:
    port: 5672
    host: 192.168.10.11
    username: admin
    password: admin
    virtual-host: my_vhost
    publisher-confirm-type: correlated
    listener:
      simple:
      acknowledge-mode: manual 3.3.2.2. 编写回调函数

@PostConstruct
    public void regCallback() {
      // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("cause:"+cause);
                // 如果ack为true代表消息已经收到
                String messageId = correlationData.getId();

                if (!ack) {
                  // 这里可能要进行其他的方式进行存储
                  log.error("MQ队列应答失败,messageId是:" + messageId);
                  return;
                }

                try {
                  MessageConfirmation messageConfirmation = messageConfirmationMapper.selectById(messageId);
                  messageConfirmation.setStatus(1);
                  int count=messageConfirmationMapper.updateById(messageConfirmation);
                  if (count == 1) {
                        log.info("本地消息状态修改成功,消息成功投递到消息队列中...");
                  }
                } catch (Exception ex) {
                  log.error("本地消息状态修改失败,出现异常:" + ex.getMessage());
                }
            }
      });
    } 上述回调函数主要用于监听生产者发送的消息是否发送乐成,并将消息发送状态更新至消息确认表中。
3.3.2.3. 编写定时使命监听消息确认表

@Configuration
@EnableScheduling
@Slf4j
public class confirmMessageTaskService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    MessageConfirmationMapper messageConfirmationMapper;

    @Scheduled(cron = "0 */1 * * * ?")
    public void sendMessage(){
      // 把消息为0的状态消息重新查询出来,投递到MQ中。
      LambdaQueryWrapper<MessageConfirmation> queryWrapper = new LambdaQueryWrapper<>();
      queryWrapper.eq(MessageConfirmation::getStatus, 0);
      List<MessageConfirmation> noConfirmMessages = messageConfirmationMapper.selectList(queryWrapper)
                .stream()
                .collect(Collectors.toList());
      noConfirmMessages.forEach((noConfirmMessage)->{
            rabbitTemplate.convertAndSend("xz_push_exchange","", JsonUtil.obj2String(noConfirmMessage),
                  new CorrelationData(noConfirmMessage.getId()));
      });
    }
}  上述定时使命为每分钟遍历消息确认表,将status=0的消息筛选出来进行消息投递。
3.3.2.4. 消息投递

    public void sendMessage(MessageConfirmation messageConfirmation) {
      messageConfirmationMapper.insert(messageConfirmation);
      rabbitTemplate.convertAndSend("xfc_fanout_exchange","", JsonUtil.obj2String(messageConfirmation),
                new CorrelationData(messageConfirmation.getId()));
    } 3.4. 消耗者核心代码讲解

https://img-blog.csdnimg.cn/direct/a7fe774277394897b00a234ee2fb1800.png
3.4.1. yml配置

server:
port: 8872
spring:
datasource:
    url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_consumer
    username: postgres
    password: postgres
    driver-class-name: org.postgresql.Driver
rabbitmq:
    port: 5672
    host: 192.168.10.11
    username: admin
    password: admin
    virtual-host: my_vhost
    listener:
      simple:
      acknowledge-mode: manual
mybatis-plus:
typeAliasesPackage: com.xfc.consumer.entities
mapper-locations: classpath:mapper/*.xml 3.4.2. RabbitMQ配置类

@Configuration
public class RabbitMQConfig {
    /**
   * 死信队列
   * @return
   */
    @Bean
    public FanoutExchange deadExchange() {
      return new FanoutExchange("dead_xfc_fanout_exchange", true, false);
    }

    @Bean
    public Queue deadXfcQueue() {
      return new Queue("dead.xfc.queue", true);
    }
    @Bean
    public Binding bindDeadXfc() {
      return BindingBuilder.bind(deadXfcQueue()).to(deadExchange());
    }

    /**
   * 队列
   * @return
   */
    @Bean
    public FanoutExchange fanoutExchange() {
      return new FanoutExchange("xfc_fanout_exchange", true, false);
    }

    @Bean
    public Queue xfcQueue() {
      Map<String, Object> args = new HashMap<>();
      args.put("x-dead-letter-exchange", "dead_xfc_fanout_exchange");
      return new Queue("xfc.queue", true, false, false, args);
    }

    @Bean
    public Binding bindXfc() {
      return BindingBuilder.bind(xfcQueue()).to(fanoutExchange());
    }
}
上述代码为RabbitMQ配置类,用于在项目初始化时天生相应的交换机和队列。 
3.4.3. 队列消耗

@Service
@Slf4j
public class XfcMqConsumer {
    @RabbitListener(queues = {"xfc.queue"})
    public void messageconsumer(String message, Channel channel,
                              CorrelationData correlationData,
                              @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
      MessageConfirmation messageConfirmation=null;
      try {
            log.info("收到MQ的消息是: " + message );
            messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
            /**
             * 编写业务逻辑
             */
            
      } catch (Exception e) {
            e.printStackTrace();
            log.error("消息投放到死信队列"+e.getMessage(),e);
            channel.basicNack(tag,false,false);// 死信队列
      }
    }
} 3.4.4. 死信队列消耗

@Service
@Slf4j
public class DeadMqConsumer {
    @Autowired
    MessageErrorMapper messageErrorMapper;
    @RabbitListener(queues = {"dead.xfc.queue"})
    public void messageconsumer(String message, Channel channel,
                              CorrelationData correlationData,
                              @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
      MessageConfirmation messageConfirmation=null;
      try {
            log.info("收到MQ的消息是: " + message );
            messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
            /**
             * 编写业务逻辑
             */
      } catch (Exception e) {
            e.printStackTrace();
            /**
             * 写入message_error
             */
            messageErrorMapper.insert(new MessageError(messageConfirmation.getId(),e.getMessage(),new Date()));
            channel.basicNack(tag,false,false);// 死信队列
      }
    }
}
3.5 效果测试

以上代码编写完成后需要进行架构效果测试,其步骤如下:
1. 消息投递测试
https://img-blog.csdnimg.cn/direct/805527f8dee44307815f9f7b5d36cead.png
上图调用了消息投递接口。
https://img-blog.csdnimg.cn/direct/949b1f3c0cc344de857231715c78655e.png
在消息确认表中,新增了一条消息且status=1,代表该条消息已投递乐成。
2. 消耗者正常消耗测试
https://img-blog.csdnimg.cn/direct/14eb24770d34495392f666b8efc83e75.png
3. 消耗异常测试
https://img-blog.csdnimg.cn/direct/3c69c51a9639403dba91200d3d04dac2.png
上图可看出消息消耗异常投入到了死信队列。
https://img-blog.csdnimg.cn/direct/722710bb5f424225910324ddfeb70025.png
在死信队列中依然消耗失败。
https://img-blog.csdnimg.cn/direct/aeaff9f6de974be7b39344251f1337ea.png
消耗失败后乐成写入了日记表。
4. 结语

本文讲解了RabbitMQ应用场景以及在异步处理场景中如何搭建稳定的RabbitMQ架构体系,逐步具体的给出了生产者及消耗者端代码并在文章最后对架构效果进行了测试,感兴趣的同学可根据代码进行实操,有疑问和其他见解也可在评论区留言,我看到都会回复。
https://img-blog.csdnimg.cn/direct/688035c8860744b5b95716130759cfbf.jpeg


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【架构系列】RabbitMQ应用场景及在实际项目中如何搭建可靠的RabbitMQ架构体