SpringCloud(六) - RabbitMQ安装,三种消息发送模式,消息发送确认,消息消 ...

打印 上一主题 下一主题

主题 854|帖子 854|积分 2562

1、安装erlang语言环境

1.1 创建 erlang安装目录
  1. mkdir erlang
复制代码

1.2  上传解压压缩包
  1. 上传到: /root/
  2. 解压缩# tar -zxvf otp_src_22.0.tar.gz
复制代码

1.3 进入解压缩目录,指定目录并安装
  1. 进入解压目录,指定安装目录# ./configure --prefix=/usr/local/kh96/erlang
  2. 安装# make install
  3. 添加环境变量# echo 'export PATH=$PATH:/usr/local/kh96/erlang/bin' >> /etc/profile
  4. 刷新环境变量# source /etc/profile
复制代码
1.4 测试环境
  1. 进入erlang环境#erl
  2. 退出# halt().
复制代码

2、安装RabbitMQ

2.1上传解压压缩包
  1. 第一步xx.tar.xz->xx.tar # /bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz
  2. 第二步#tar -xvf rabbitmq-server-generic-unix-3.7.15.tar
复制代码
2.2 添加环境变量
  1. 添加环境变量# echo 'export PATH=$PATH:/usr/local/kh96/rabbitmq/rabbitmq_server-3.7.15/sbin' >> /etc/profile
  2. 刷新环境变量# source /etc/profile
复制代码
2.3 启动
  1. 启动#  rabbitmq-server -detached
  2. 查看状态# rabbitmqctl status
  3. 查看防火墙状态# firewall-cmd --state (建议不开)
复制代码

2.4 开启云服务端口
  1. RabbitMQ 服务端口: 5672
  2. RabbitMQ 监控平台端口: 15672
  3. 开启web插件允许监控平台访问 # rabbitmq-plugins enable rabbitmq_management
复制代码

2.5 远程 访问 15672
  1. 公网ip:15672
  2. Username: guest
  3. Password: guest
  4. 提示这个这个账号只允许本地访问,所以需要添加用户
复制代码
2.6 添加用户
  1. 显示所有用户# rabbitmqctl list_users
  2. 查看guest用户权限# rabbitmqctl list_user_permissions guest
  3. 添加admin用户及密码# rabbitmqctl add_user admin admin
  4. 设置限权# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
  5. 授予admin用户administrator角色# rabbitmqctl set_user_tags admin administrator
  6. 查看admin用户权限# rabbitmqctl list_user_permissions admin
  7. 删除用户guest# rabbitmqctl delete_user guest
  8. 停止RabbitMQ# rabbitmqctl stop
复制代码
2.7 登录成功
  1. Username: admin
  2. Password: admin
复制代码

3、SpringBoot整合

3.0 项目准备

3.0.1 jar包
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
3.0.2 配置信息
  1. # 端口
  2. server:
  3.   port: 8104
  4. # RabbitMQ配置
  5. spring:
  6.   rabbitmq:
  7.     host: x.xxx.xx.xx #服务器公网ip
  8.     port: 5672
  9.     username: admin
  10.     password: admin
复制代码
3.0.3 常量类
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQ 常量类,系统的所有队列名,交换机名,路由键名等,统一进行配置管理
  7. */
  8. public class RabbitMQConstant {
  9.     //========================== 直连模式
  10.     /**
  11.      * Direct直连模式 队列名
  12.      */
  13.     public static final String RABBITMQ_DIRECT_QUEUE_NAME_KH96 ="rabbitmq_direct_queue_name_kh96";
  14.     /**
  15.      * Direct直连模式 交换机名
  16.      */
  17.     public static final String RABBITMQ_DIRECT_EXCHANGE_KH96 ="rabbitmq_direct_exchange_kh96";
  18.     /**
  19.      * Direct直连模式 路由键
  20.      */
  21.     public static final String RABBITMQ_DIRECT_ROUTING_KEY_KH96 ="rabbitmq_direct_routing_key_kh96";
  22.     //========================== 扇形模式
  23.     /**
  24.      * Fanout 扇形模式 队列名one
  25.      */
  26.     public static final String RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE ="rabbitmq_fanout_queue_name_kh96_one";
  27.     /**
  28.      * Fanout 扇形模式 队列名two
  29.      */
  30.     public static final String RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO ="rabbitmq_fanout_queue_name_kh96_two";
  31.     /**
  32.      * Fanout 扇形模式 交换机名
  33.      */
  34.     public static final String RABBITMQ_FANOUT_EXCHANGE_KH96 ="rabbitmq_fanout_exchange_kh96";
  35.     //========================== 主题模式
  36.     // -- 队列
  37.     /**
  38.      * Topic 主题模式 队列名one
  39.      */
  40.     public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE ="rabbitmq_topic_queue_name_kh96_one";
  41.     /**
  42.      * Topic 主题模式 队列名two
  43.      */
  44.     public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO ="rabbitmq_topic_queue_name_kh96_two";
  45.     /**
  46.      * Topic 主题模式 队列名Three
  47.      */
  48.     public static final String RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE ="rabbitmq_topic_queue_name_kh96_three";
  49.     //-- 交换机
  50.     /**
  51.      * Topic 主题模式 交换机名
  52.      */
  53.     public static final String RABBITMQ_TOPIC_EXCHANGE_KH96 ="rabbitmq_topic_exchange_kh96";
  54.     //-- 路由键
  55.     /**
  56.      * Topic 主题模式 -路由键-唯一匹配规则
  57.      */
  58.     public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY="rabbitmq_topic_routing_key_kh96.only";
  59.     /**
  60.      * Topic 主题模式 -路由键-单词匹配规则  * 单个词
  61.      */
  62.     public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_WORLD="rabbitmq_topic_routing_key_kh96.*";
  63.     /**
  64.      * Topic 主题模式 -路由键-模糊匹配规则 # 0 或 多个词
  65.      */
  66.     public static final String RABBITMQ_TOPIC_ROUTING_KEY_KH96_LIKE="rabbitmq_topic_routing_key_kh96.#";
  67. }
复制代码
3.0.4 手动操作队列关系

在测试的时候,一定要注意交换机和队列的绑定关系,只要绑定过的关系就会一直存在需要手动删除;如果测试结果不正常的时候,看一些交换机和队列与键值的绑定关系;
选择队列:


删除队列:


3.1 Direct 直连模式

3.1.0 核心构造方法:Queue


  • 核心构造方法:Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)

  • name参数:name – the name of the queue.

    • 指定创建的消息队列的名字,参数必传,即创建队列必须要有队列名。

  • durable参数:durable – true if we are declaring a durable queue (the queue will survive a server restart)

    • 指定创建的消息队列是否需要持久化,默认是true,如果是true,该队列支持持久化,自动持久化到磁盘,RabbitMQ服务重启,队列仍然是可用的(存活的)。

  • exclusive参数:true if we are declaring an exclusive queue (the queue will only be used by the declarer's connection)

    • 指定创建的消息队列是否是排他队列,默认是false,如果是true,该队列是排他队列,只有创建当前队列的连接才可以使用,连接一旦断开,队列会自动删除。

  • autoDelete参数:true if the server should delete the queue when it is no longer in use

    • 指定创建的消息队列是否是自动删除队列,默认是false,如果是true,该队列是自动删除队列,一旦没有消息生产者或者消费者使用当前队列,会被自动删除。

3.1.1  配置类
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: Direct直连模式,自动配置类,自动创建队列,交换机,并将队列绑定到交换机,指定唯一路由
  7. */
  8. @Configuration
  9. public class RabbitMQDirectConfig {
  10.     //创建 直连队列
  11.     @Bean
  12.     public Queue directQueue(){
  13.         //创建 直连队列
  14.         return new Queue(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96,true);
  15.     }
  16.     //创建 直连交换机
  17.     @Bean
  18.     public DirectExchange directExchange(){
  19.         // 创建支持持久化的直连交换机,指定交换机的名称
  20.         return new DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96);
  21.     }
  22.            //将直连队列和直连交换机 进行绑定,并指定绑定的唯一路由键
  23.     @Bean
  24.     public Binding directBinding(){
  25.         // 将直连队列和直连交换机进行绑定,并指定绑定的唯一路由键
  26.         return BindingBuilder.bind(directQueue())
  27.                             .to(directExchange())
  28.                             .with(RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96);
  29.     }
  30. }
复制代码
3.1.2  消息生产者
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: Direct 直连模式 消息生产者
  7. */
  8. @Slf4j
  9. @Component
  10. public class RabbitMQDirectProducer {
  11.     @Autowired
  12.     private RabbitTemplate rabbitTemplate;
  13.     /**
  14.      * @author : huayu
  15.      * @date   : 1/11/2022
  16.      * @param  : [directMsg, directExchange, directRoutingKey]
  17.      * @return : void
  18.      * @description : 使用直连模式,发送消息到直连交换机,通过交换机绑定的唯一路由键,将消息发送到绑定的队列中
  19.      */
  20.     public void sendDirectMsg2DirectExchange(String directExchange,String directRoutingKey,String directMsg){
  21.         log.info("++++++  direct模式消息生产者,发送直连消息:{},到交换机:{},路由键:{} ++++++",directMsg,directExchange,directRoutingKey);
  22.         rabbitTemplate.convertAndSend(directExchange,directRoutingKey,directMsg);
  23.     }
  24. }
复制代码
3.1.3  消费者

3.1.3.1 消费者One
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: Direct 直连模式消费者 One
  7. */
  8. @Slf4j
  9. @Component
  10. //指定接听的 消息队列 名字
  11. @RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96)
  12. public class RabbitMQDirectConsumerOne {
  13.     /**
  14.      * @author : huayu
  15.      * @date   : 1/11/2022
  16.      * @param  : [directMsgJson]
  17.      * @return : void
  18.      * @description : Direct 直连模式消费者One,消费信息
  19.      */
  20.     //指定消息队列中的消息,交给对应的方法处理
  21.     @RabbitHandler
  22.     public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){
  23.         log.info("***** Direct直连模式,消费者One,消费消息:{} ******",directMsgJson);
  24.         // TODO 核心业务逻辑处理
  25.     }
  26. //    @RabbitHandler  //自动根据队列中的消息类型,自动区分方法
  27. //    public void consumeOtherDirectMsgFromDirectQueue(List<String> directMsgJson){
  28. //        log.info("***** Direct直连模式,消费者Two,消费消息:{} ******",directMsgJson);
  29. //
  30. //        // TODO 核心业务逻辑处理
  31. //
  32. //    }
  33. }
复制代码
3.1.3.2 消费者Two
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQDirectConsumerTwo
  7. */
  8. @Slf4j
  9. @Component
  10. //指定监听的消息队列 名字
  11. @RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96)
  12. public class RabbitMQDirectConsumerTwo {
  13.     /**
  14.      * @author : huayu
  15.      * @date   : 1/11/2022
  16.      * @param  : [directMsgJson]
  17.      * @return : void
  18.      * @description : Direct 直连模式消费者 Two,消费信息
  19.      */
  20.     //指定消息队列中的消息,交给对应的方法处理
  21.     @RabbitHandler
  22.     public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){
  23.         log.info("***** Direct直连模式,消费者Two,消费消息:{} ******",directMsgJson);
  24.         // TODO 核心业务逻辑处理
  25.     }
  26. }
复制代码
3.1.4  请求测试方法
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: 测试 RabbitMQ 消息队列的操作入口
  7. */
  8. @Slf4j
  9. @RestController
  10. public class RabbitMQController {
  11.     @Autowired
  12.     private RabbitMQDirectProducer rabbitMQDirectProducer;
  13.    
  14.     /**
  15.      * @author : Administrator
  16.      * @date   : 2022/11/1
  17.      * @param  : [directMsg]
  18.      * @return : com.kgc.sct.util.RequestResult<java.lang.String>
  19.      * @description : 测试direct直连模式,发送和消费消息
  20.      */
  21.     @GetMapping("/direct")
  22.     public RequestResult<String> testRabbitMQDirect(@RequestParam String directMsg){
  23.         log.info("direct直连模式,发送消息");
  24.         //模拟发送5条直连消息
  25.         Stream.of(11,22,33,44,55).forEach(directNo ->{
  26.             //模拟创建消息对象
  27.             Map<String,Object> directMap =new HashMap<>();
  28.             directMap.put("directNo",directNo);
  29.             directMap.put("directData",directMsg);
  30.             directMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
  31.         //调用直连模式消息生产者,发送消息
  32.             rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96
  33.                                                          ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
  34.                                                          ,JSON.toJSONString(directMap));
  35.         return ResultBuildUtil.success("使用直连模式。发送消息成功");
  36.     }
  37. }
复制代码
3.1.5 请求测试

发起请求

3.1.5.1 一个消费者

消费者One消费了队列中的所有信息(只有一个队列);

3.1.5.2 两个消费者

消费者One和消费者Two依次消费了队列中的所有信息(只有一个队列);

3.2 Fanout 扇形模式

3.2.1 配置类
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: Fanout扇形模式,自动配置类,自动创建队列,交换机,并将队列绑定到交换机
  7. */
  8. @Configuration
  9. public class RabbitMQFanoutConfig {
  10.     //创建 扇形队列One
  11.     @Bean
  12.     public Queue fanoutQueueOne(){
  13.         return new Queue(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE);
  14.     }
  15.     //创建 扇形队列Two
  16.     @Bean
  17.     public Queue fanoutQueueTwo(){
  18.         return new Queue(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO);
  19.     }
  20.     // 创建扇形交换机
  21.     @Bean
  22.     public FanoutExchange fanoutExchange(){
  23.         return new FanoutExchange(RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96);
  24.     }
  25.     //绑定队列到扇形交换机,不需要 指定 路由键
  26.     @Bean
  27.     public Binding fanoutBindingQueueOne(){
  28.         //绑定队列到扇形交换机,不需要路由键,消息是广播发送,会给多有绑定的队列群发信息消息(根本没有提供with方法)
  29.         return BindingBuilder.bind(fanoutQueueOne())
  30.                             .to(fanoutExchange());
  31.     }
  32.     @Bean
  33.     public Binding fanoutBindingQueueTwo(){
  34.         //绑定队列到扇形交换机,不需要路由键,消息是广播发送,会给多有绑定的队列群发信息消息(根本没有提供with方法)
  35.         return BindingBuilder.bind(fanoutQueueTwo())
  36.                             .to(fanoutExchange());
  37.     }
  38. }
复制代码
3.2.2 消息生产者
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQFanoutProducer
  7. */
  8. @Slf4j
  9. @Component
  10. public class RabbitMQFanoutProducer {
  11.     @Autowired
  12.     private RabbitTemplate rabbitTemplate;
  13.     /**
  14.      * @author : huayu
  15.      * @date   : 1/11/2022
  16.      * @param  : [fanoutExchange, fanoutRoutingKey, fanoutMsg]
  17.      * @return : void
  18.      * @description : 使用扇形模式,发送消息到扇形交换机,将消息发送到绑定的队列中
  19.      */
  20.     public void sendFanoutMsg2FanoutExchange(String fanoutExchange,String fanoutRoutingKey,String fanoutMsg){
  21.         log.info("++++++ Fanout模式消息生产者,发送广播消息:{},到交换机:{},路由键:{} ++++++", fanoutMsg, fanoutExchange, fanoutRoutingKey);
  22.         rabbitTemplate.convertAndSend(fanoutExchange, fanoutRoutingKey, fanoutMsg);
  23.     }
  24. }
复制代码
3.2.3 消费者

3.2.3.1 消费者One
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQFanoutConsumerOne
  7. */
  8. @Slf4j
  9. @Component
  10. @RabbitListener(queues = RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE)
  11. public class RabbitMQFanoutConsumerOne {
  12.     @RabbitHandler
  13.     public void fanoutConsumeOneFanoutMsgFromFanoutQueueOne(String fanoutMsgJson){
  14.         log.info("****** Fanout扇形模式,消费One,消费队列One,消息:{}  ******",fanoutMsgJson);
  15.         // TODO 核心业务逻辑处理
  16.     }
  17. }
复制代码
3.2.3.2 消费者Two
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQFanoutConsumerTwo
  7. */
  8. @Slf4j
  9. @Component
  10. //@RabbitListener(queues = RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO)
  11. public class RabbitMQFanoutConsumerTwo {
  12. //    @RabbitHandler
  13.     public void fanoutConsumeTwoFanoutMsgFromFanoutQueueTwo(String fanoutMsgJson){
  14.         log.info("****** Fanout扇形模式,消费Two,消费队列Two,消息:{}  ******",fanoutMsgJson);
  15.         // TODO 核心业务逻辑处理
  16.     }
  17. }
复制代码
3.2.4 请求测试方法
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: 测试 RabbitMQ 消息队列的操作入口
  7. */
  8. @Slf4j
  9. @RestController
  10. public class RabbitMQController {
  11.     @Autowired
  12.     private RabbitMQFanoutProducer rabbitMQFanoutProducer;
  13.    
  14.         /**
  15.      * @author : huayu
  16.      * @date   : 1/11/2022
  17.      * @param  : [fanoutMsg]
  18.      * @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
  19.      * @description : 测试扇形(广播)模式,发送和消费信息
  20.      */
  21.     @GetMapping("/fanout")
  22.     public RequestResult<String> testRabbitMQFanout(@RequestParam String fanoutMsg){
  23.         log.info("------- fanout 扇形模式,发送消息 -------");
  24.         //模拟发送5条直连消息
  25.         Stream.of(66,77,88,99,96).forEach(directNo ->{
  26.             //模拟创建消息对象
  27.             Map<String,Object> fanoutMap =new HashMap<>();
  28.             fanoutMap.put("directNo",directNo);
  29.             fanoutMap.put("directData",fanoutMsg);
  30.             fanoutMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
  31.             //调用扇形模式消息生产者,发送消息
  32.             rabbitMQFanoutProducer.sendFanoutMsg2FanoutExchange(RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96
  33.                                                                 ,null
  34.                                                                 ,JSON.toJSONString(fanoutMap));
  35.         });
  36.         return ResultBuildUtil.success("使用扇形模式。发送消息成功");
  37.         
  38.     }
  39. }     
复制代码
3.2.5 请求测试


3.2.5.1 一个消费者

消费者One消费了队列One中的所有信息;

3.2.5.2 两个消费者

消费者One消费了队列One中的所有信息;
消费者Two消费了队列Two中的所有信息;

3.3 Topic 主题模式

3.3.1 配置类
  1. /**
  2. * Created On : 2/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: Topic 主题模式,自动配置类
  7. */
  8. @Configuration
  9. public class RabbitMQTopicConfig {
  10.     //======== 队列
  11.     //Topic 主题模式 队列One
  12.     @Bean
  13.     public Queue topicQueueOne(){
  14.         return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE,true);
  15.     }
  16.     //Topic 主题模式 队列Two
  17.     @Bean
  18.     public Queue topicQueueTwo(){
  19.         return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO,true);
  20.     }
  21.     //Topic 主题模式 队列Three
  22.     @Bean
  23.     public Queue topicQueueThree(){
  24.         return new Queue(RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE,true);
  25.     }
  26.     //======= 交换机
  27.     //Topic 主题模式 交换机
  28.     @Bean
  29.     public TopicExchange topicExchange(){
  30.         return new TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96);
  31.     }
  32.     //======= 绑定
  33.     // 队列One 绑定 Topic主题模式交换机  和 路由键-唯一匹配规则
  34.     @Bean
  35.     public Binding topicBindingQueueOne(){
  36.         return BindingBuilder.bind(topicQueueOne())
  37.                 .to(topicExchange())
  38.                 .with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY);
  39.     }
  40.     // 队列Two 绑定 Topic主题模式交换机  和 路由键-单个单词词匹配规则
  41.     @Bean
  42.     public Binding topicBindingQueueTwo(){
  43.         return BindingBuilder.bind(topicQueueTwo())
  44.                 .to(topicExchange())
  45.                 .with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_WORLD);
  46.     }
  47.     // 队列Two 绑定 Topic主题模式交换机  和 路由键-模糊匹配规则
  48.     @Bean
  49.     public Binding topicBindingQueueThree(){
  50.         return BindingBuilder.bind(topicQueueThree())
  51.                 .to(topicExchange())
  52.                 .with(RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_LIKE);
  53.     }
  54. }
复制代码
3.3.2 消息生产者
  1. /**
  2. * Created On : 2/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQ 主题模式消息生产者
  7. */
  8. @Slf4j
  9. @Component
  10. public class RabbitMQTopicProducer {
  11.     @Autowired
  12.     private RabbitTemplate rabbitTemplate;
  13.     /**
  14.      * @author : huayu
  15.      * @date   : 1/11/2022
  16.      * @param  : [topicExchange, topicRoutingKey, topicMsg]
  17.      * @return : void
  18.      * @description : 使用主题模式,发送消息到主题交换机,主题交换机会根据发送消息的路由键 ,根据匹配规则将消息投递到匹配的队列中
  19.      */
  20.     public void sendTopicMsg2TopicExchange(String topicExchange,String topicRoutingKey,String topicMsg){
  21.         log.info("++++++  direct模式消息生产者,发送直连消息:{},到交换机:{},路由键:{} ++++++",topicMsg,topicExchange,topicRoutingKey);
  22.         rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey,topicMsg);
  23.     }
  24. }
复制代码
3.3.3 消费者

3.3.3.1 消费者One
  1. /**
  2. * Created On : 2/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQTopicConsumerOne
  7. */
  8. @Slf4j
  9. @Component
  10. @RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_ONE)
  11. public class RabbitMQTopicConsumerOne {
  12.     @RabbitHandler
  13.     public void consumeTopicMsgFromTopicQueue(String topicMapJson){
  14.         log.info("****** Topic 主题模式,消费One,消费队列One,消息:{}  ******",topicMapJson);
  15.         // TODO 核心业务逻辑处理
  16.     }
  17. }
复制代码
3.3.3.2 消费者Two
  1. /**
  2. * Created On : 2/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQTopicConsumerTwo
  7. */
  8. @Slf4j
  9. @Component
  10. @RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_TWO)
  11. public class RabbitMQTopicConsumerTwo {
  12.     @RabbitHandler
  13.     public void consumeTopicMsgFromTopicQueue(String topicMapJson){
  14.         log.info("****** Topic 主题模式,消费 Two,消费队列 Two,消息:{}  ******",topicMapJson);
  15.         // TODO 核心业务逻辑处理
  16.     }
  17. }
复制代码
3.3.3.3 消费者Three
  1. /**
  2. * Created On : 2/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQTopicConsumerThree
  7. */
  8. @Slf4j
  9. @Component
  10. @RabbitListener(queues = RabbitMQConstant.RABBITMQ_TOPIC_QUEUE_NAME_KH96_THREE)
  11. public class RabbitMQTopicConsumerThree {
  12.     @RabbitHandler
  13.     public void consumeTopicMsgFromTopicQueue(String topicMapJson){
  14.         log.info("****** Topic 主题模式,消费 Three,消费队列 Three,消息:{}  ******",topicMapJson);
  15.         // TODO 核心业务逻辑处理
  16.     }
  17. }
复制代码
3.3.4 请求测试方法
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: 测试 RabbitMQ 消息队列的操作入口
  7. */
  8. @Slf4j
  9. @RestController
  10. public class RabbitMQController {
  11.     @Autowired
  12.     private RabbitMQTopicProducer rabbitMQTopicProducer;
  13.    
  14.         @GetMapping("/topic")
  15.     public RequestResult<String> testRabbitMQTopic(@RequestParam String topicMsg){
  16.         log.info("------- topic 主题模式,发送消息 -------");
  17.         //模拟发送5条直连消息
  18.         Stream.of(95,96,97,98,99).forEach(directNo ->{
  19.             //模拟创建消息对象
  20.             Map<String,Object> fanoutMap =new HashMap<>();
  21.             fanoutMap.put("directNo",directNo);
  22.             fanoutMap.put("directData",topicMsg);
  23.             fanoutMap.put("directTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
  24.             //调用主题模式消息生产者,发送消息
  25.             //场景1:使用唯一路由键 rabbitmq_topic_routing_key_kh96.only , 发送消息
  26.             rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96
  27.                                                             ,RabbitMQConstant.RABBITMQ_TOPIC_ROUTING_KEY_KH96_ONLY
  28.                                                             ,JSON.toJSONString(fanoutMap));
  29.             //场景2:使用单词匹配路由键 rabbitmq_topic_routing_key_kh96.* ,发送消息
  30. //            rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96
  31. //                                    ,"rabbitmq_topic_routing_key_kh96.abc"
  32. //                                    ,JSON.toJSONString(fanoutMap));
  33.             //场景3:0 或多词匹配 rabbitmq_topic_routing_key_kh96.# ,发送消息
  34. //            rabbitMQTopicProducer.sendTopicMsg2TopicExchange(RabbitMQConstant.RABBITMQ_TOPIC_EXCHANGE_KH96
  35. //                                                            ,"rabbitmq_topic_routing_key_kh96.abc.def"
  36. //                                                            ,JSON.toJSONString(fanoutMap));
  37.         });
  38.         return ResultBuildUtil.success("使用主题模式。发送消息成功");
  39.     }
  40.    
  41. }
复制代码
3.3.5 请求测试

3.3.5.1 场景1:使用唯一路由键
  1. 发送消息路由键名: rabbitmq_topic_routing_key_kh96.only
复制代码
发起请求:


请求结果:

队列One,Two,Three都接收到了信息,所以对应的消费者One,Two,Three都消费了信息;

3.3.5.2 场景2:使用单词匹配路由键
  1. 发送消息路由键名: rabbitmq_topic_routing_key_kh96.abc
复制代码
发起请求:


请求结果:

队列Two,Three都接收到了信息,所以对应的消费者Two,Three都消费了信息;

3.3.5.3 场景3:0 或多词匹配
  1. 发送消息路由键名: rabbitmq_topic_routing_key_kh96.abc.def
复制代码
发起请求:


请求结果:

只有队列Three接收到了信息,所以只有对应的消费者Three消费了信息;

3.3.6 主题模式小结


  • 当生产者发送消息到交换机,指定的路由键一般都是使用句点(.)作为分隔符,分割多个单词。

    • 比如:词1.词...

  • 所谓单词:是由一个或多个单词组成,多个单词组成的路由键,就代表某种主题的关键信息,路由键长度最多不能超过256字节。
  • 匹配规则格式:* 或者 #

    • *代表单个单词。

      • 比如 队列绑定主题交换机的 路由键:KH96.* ,代表发送消息的路由键是以KH96开头,后面只能跟一个单词,如:KH96.aaa,KH96.bbb等。
      • 再比如:绑定路由键为:KH96.*.KGC,代表发送消息路由键是以KH96开头,中间可以带一个单词,结尾,如:KH96.aa.KGC,KH96.bb.KGC。
      • #代表0或多个单词,比如 队列绑定主题交换机的 路由键:KH96.#,代表发送消息的路由键是以KH96开头,后面只能跟0个或者多个单词,如:KH96,KH96.aaa,KH96.aaa.bbb。
      • 再比如:绑定路由键为:KH96.#.KGC,代表发送消息路由键是以KH96开头,中间可以带一个或多个单词,结尾,如KH96.KGC,KH96.aa.KGC,KH96.aa.bb.KGC。


    • 备注:

      • 如果主题交换机,队列绑定的路由键使用的不是模糊匹配符,主题交换机跟直连交换机一致。
      • 如果单独使用#,代表所有队列都可以收消息,主题交换机跟扇形交换机一致。


  • 提醒:

    • 主题模式下,队列绑定的路由键,是允许为多个的。
    • 如果路由键被更换,之前的路由键是不会删除,仍然会绑定到当前队列上。
    • 如果有多个路由键匹配,规则为:如果其中一个没有匹配到,会自动匹配其他路由键,如果需要删除历史路由键,需要在RabbitMQ控制台删除。

3.4 消息  发送确认 - 交换机,队列 确认

3.4.1 配置信息
  1. # RabbitMQ配置
  2. spring:
  3.   rabbitmq:
  4.     # 打开发送消息确认配置
  5.     publisher-confirms: true # 发送消息到交换机确认,默认false
  6.     publisher-returns: true # 发送消息到队列确认,默认是false
复制代码
3.4.2 消息发送确认配置类


  • 触发机制

    • ConfirmCallback 函数式接口中的唯一抽象方法 confirm : 是否有交换机都会触发;

      • 标识:true,发送到交换机正常;
      • 标识:false,发送到交换机失败,进行特殊处理;

    • ReturnCallback 函数式接口中的唯一抽象方法 returnedMessage :交换机存在且队列不存在才会触发;

      • 触发:发送到队列失败,进行特殊处理;


  1. /**
  2. * Created On : 2/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQ 消息确认机制: 发送确认
  7. */
  8. @Slf4j
  9. @Configuration
  10. public class RabbitMQSendMsgAck {
  11.     @Bean
  12.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
  13.         //发送确认,消息是通过rabbitTemplate发的,所以要重置rabbitTemplate才可以实现
  14.         RabbitTemplate rabbitTemplate = new RabbitTemplate();
  15.         rabbitTemplate.setConnectionFactory(connectionFactory);
  16.         //开启触发回调处理方法,不论消息推送结果是什么,都会强制触发回调方法
  17.         rabbitTemplate.setMandatory(true);
  18.         //指定消息发送到RabbitMQ的broker节点,是否正确到达交换机确然
  19.         //是否有交换机都会触发
  20.         rabbitTemplate.setConfirmCallback( (correlationData, ack, cause) ->{
  21.             log.info("######  发送消息确认回调,数据:{}  ######",correlationData);
  22.             log.info("######  发送消息确认回调,标识:{}  ######",ack);
  23.             log.info("######  发送消息确认回调,原因:{}  ######\n",cause);
  24.             //TODO 如果没有到交换机,ack返回的是false,可能是交换机被删除,就需要进行特殊处理的业务,比如给负责人发送信息或邮件
  25.         });
  26.         //消息是否正确到达交换机上绑定的 目标队列
  27.         //交换机存在且队列不存在才会触发
  28.         rabbitTemplate.setReturnCallback( ( message, replyCode, replyText,exchange,routingKey) ->{
  29.             log.info("######  发送消息返回回调,数据:{}  ######",message);
  30.             log.info("######  发送消息返回回调,返回码:{}  ######",replyCode);
  31.             log.info("######  发送消息返回回调,返回说明:{}  ######",replyText);
  32.             log.info("######  发送消息返回回调,交换机:{}  ######",exchange);
  33.             log.info("######  发送消息返回回调,路由键:{}  ######\n",routingKey);
  34.             //TODO 如果没有到目标队列,就需要进行特殊处理的业务,比如给负责人发送信息或邮件
  35.         });
  36.         return rabbitTemplate;
  37.     }
  38. }
复制代码
3.4.3 交换机
  1. /**
  2. * Created On : 2/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: Ack 测试交换机,没有绑定队列
  7. */
  8. @Configuration
  9. public class RabbitMQAckConfig {
  10.     //ack 测试交换机,没有绑定队列
  11.     @Bean
  12.     public DirectExchange directExchange(){
  13.         return new DirectExchange(RabbitMQConstant.RABBITMQ_ACK_EXCHANGE_KH96);
  14.     }
  15. }
复制代码
3.4.4 请求方法
  1. /**
  2. * @author : huayu
  3. * @date   : 2/11/2022
  4. * @param  : [topicMsg]
  5. * @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
  6. * @description : 直连模式测试 Ack  不存在交换机  和 存在交换机
  7. */
  8. @GetMapping("/sendMsgAck")
  9. public RequestResult<String> RabbitMQSendMsgAck(@RequestParam String ackMsg){
  10.     log.info("------- 直连 模式 测试Ack,发送消息 -------");
  11.     //模拟发送直连消息
  12.     //调用直连模式消息生产者,发送消息
  13.     //测试1: 不存在的 交换机
  14.     rabbitMQDirectProducer.sendDirectMsg2DirectExchange("test_noExchange"
  15.                                                         ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
  16.                                                         ,JSON.toJSONString(ackMsg));
  17.     return ResultBuildUtil.success("使用直连模式 测试Ack。交换机不存在");
  18.     //测试2: 存在的交换机,但是没有绑定 队列
  19.     //            rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_ACK_EXCHANGE_KH96
  20.     //                                    ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
  21.     //                                    ,JSON.toJSONString(ackMsg));
  22.     //        return ResultBuildUtil.success("使用直连模式 测试Ack。交换机 没有绑定队列");
  23. }
复制代码
3.2.5 请求测试

3.2.5.1  交换机不存在

发起请求:


请求结果:

交换机不存在,
触发了ConfirmCallback 函数式接口中的唯一抽象方法 confirm ,
返回标识 false,发送到交换机失败,
原因,该交换机不存在;

注意:如果没有到交换机,ack返回的是false,可能是交换机被删除,就需要进行特殊处理的业务,比如给负责人发送信息或邮件;
3.2.5.2 交换机存在,但是没有绑定 队列

发起请求:


请求结果:

交换机存在,
触发了ConfirmCallback 函数式接口中的唯一抽象方法 confirm ,
返回标识 true,发送到交换机成功;
没有绑定队列,
触发了ReturnCallback 函数式接口中的唯一抽象方法 returnedMessage ,
返回说明 NO_ROUT,发送到队列失败;

注意:如果没有到目标队列,就需要进行特殊处理的业务,比如给负责人发送信息或邮件;
3.2.5.3 交换机存在,且绑定了队列

发起请求


请求结果:

交换机存在,且绑定了队列,
触发了ConfirmCallback 函数式接口中的唯一抽象方法 confirm ,
返回标识 true,发送到交换机成功;
没有触发ReturnCallback 函数式接口中的唯一抽象方法 returnedMessage ,
说明发送到队列成功;

3.5 消息确认

3.5.1 自动确认

3.5.1.1 配置信息
  1. # RabbitMQ配置
  2. spring:
  3.   rabbitmq:
  4.     # 消费消息确认配置-自动
  5.     listener:
  6.       simple:
  7.         retry:
  8.           enabled: true # 开启消费消息失败重试机制
  9.           max-attempts: 5 # 指定重试的次数
  10.           max-interval: 10000 # 最大重试间隔时间,单位毫秒,每次重试的间隔时间,不能比当前设置的值大,如果计算间隔时间是6s,最大时间时间5s,会用5秒
  11.           initial-interval: 1000 # 重试间隔初始时间,单位毫秒
  12.           multiplier: 2 #乘子;重试的间隔时间 * 乘子,就是下一次重试的时间间隔市场,即:1s,2s,4s,8s,16...
复制代码
3.5.1.2 消费者 模拟异常

注意:测试时为了让消费者One一定接收到消息,所以注释掉消费者Two,这样才可以保证消费者One接收消息,然后触发异常,重试的效果;
  1. /**
  2. * Created On : 1/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: Direct 直连模式消费者 One
  7. */
  8. @Slf4j
  9. @Component
  10. //指定接听的 消息队列 名字
  11. @RabbitListener(queues = RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96)
  12. public class RabbitMQDirectConsumerOne {
  13.     /**
  14.      * @author : huayu
  15.      * @date   : 1/11/2022
  16.      * @param  : [directMsgJson]
  17.      * @return : void
  18.      * @description : Direct 直连模式消费者One,消费信息
  19.      */
  20.     //指定消息队列中的消息,交给对应的方法处理
  21.     @RabbitHandler
  22.     public void consumeOneDirectMsgFromDirectQueue(String directMsgJson){
  23.         log.info("***** Direct直连模式,消费者One,消费消息:{} ******",directMsgJson);
  24.         // TODO 核心业务逻辑处理
  25.         //默认自动确认,模拟消费端消费消息,处理异常,自动重试
  26.         int a = 10 / 0;
  27.     }
  28. }
复制代码
3.5.1.3 请求方法
  1. /**
  2. * @author : huayu
  3. * @date   : 3/11/2022
  4. * @param  : [ackMsg]
  5. * @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
  6. * @description : 测试 消费者自动 重试
  7. */
  8. @GetMapping("/consumeAckAuto")
  9. public RequestResult<String> testRabbitMQConsumeAckAuto(@RequestParam String ackMsg){
  10.     log.info("------- 直连 模式 测试Ack 自动 重试,发送消息 -------");
  11.     //模拟发送直连消息
  12.     //消费消息失败重试机制
  13.   rabbitMQDirectProducer.sendDirectMsg2DirectExchange(RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96
  14.                                                         ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
  15.                                                         ,JSON.toJSONString(ackMsg));
  16.     return ResultBuildUtil.success("使用直连模式 消费确认-自动消费成功");
  17. }
复制代码
3.5.1.4 请求测试

发起请求:

请求结果:
一共重试了五次
间隔时间为1,2,4,8
(如果还有一次应该为10,因为最后一次计算时间16大于最大间隔时间10,按最大间隔时间10重试);

3.4.2 手动确认

注意:

  • 手动确认需要先将自动确认的配置注释掉;
  • 使用手动确认,不能再用@RabbitListener 监听,手动确认相关队列,需要我们手动配置消费者;
3.4.2.1 消费消息手动确认的监听器


  • 获取消息消费的唯一标识 message.getMessageProperties().getDeliveryTag();
  • 执行业务处理

    • 每个消费者在同一个时间点,最多处理一个message,默认是0(全部) channel.basicQos(1);
    • 获取message的消息内容 message
    • 获取消息对应的目标队列,可以实现一些灵活判断处理message.getMessageProperties().getConsumerQueue()

      • 比如根据不同的目标队列进行不同的处理
      • 在消息处理的时候如果出错会被捕获(消息确认失败)

    • 消息确认channel.basicAck(deliveryTag,false);

  • 消息确认失败处理

    • 根据条件判断设置是否重回队列 ,是否支持批量处理  channel.basicNack(deliveryTag,true,false);

  1. /**
  2. * Created On : 2/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: 消费端 消费消息手动确认的监听器,注意它也是一个消费者,并可以通过 消息监听容器工厂,动态配置多个
  7. */
  8. @Slf4j
  9. @Component
  10. public class RabbitMQConsumerManualAckListener implements ChannelAwareMessageListener {
  11.     @Override
  12.     public void onMessage(Message message, Channel channel) throws IOException {
  13.         //获取消息消费的唯一标识,rabbitMQ在推送消息时,会给每个消息携带一个唯一标识,值是一个递增的正整数
  14.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  15.         log.info("====== 消费消息的唯一标识:{}  ======",deliveryTag);
  16.         //执行手动确认业务处理
  17.         try{
  18.             //给每个消费者在同一个时间点,最多处理一个message,默认是0(全部),换句话说,在接收到消费者的 ack 确认前,不会分发新的消息给当前的消费者
  19.             //在接收当前消息的ack确认前是不会发送新的消息给它
  20.             channel.basicQos(1);
  21.             //获取message的消息内容,发送的消息的json字符串
  22.             log.info("====== 消息队列中完整消息内容:{} ======",message);
  23.             //获取发送的实际内容,发送消息的json字符串
  24.             log.info("====== 发送的实际内容:{} ======",new String(message.getBody(),"utf-8"));
  25.             //获取消息对应的目标队列,可以实现一些灵活判断
  26.             //TODO 比如根据目标队列不同,可以做不同的处理
  27.             log.info("======  消息的来源队列:{} =======",message.getMessageProperties().getConsumerQueue());
  28.             //模拟错误 ,当 deliveryTag 为1的时候,进入 报错 ,捕获异常,然后(如果设置了重回队列)将消息重回队列
  29.             //if(deliveryTag == 1){
  30.             //    int num = 1/0;
  31.             //}
  32.             //消费消息的手动确认,消息确认成功-basicAck
  33.             //第一个参数deliveryTag,消息的唯一标识
  34.             //第二个参数multiple,消息是否支持批量确认,如果是true,代表可以一次性确认标识小于等于当前标识的所有消息
  35.             //如果是false,只会确认当前消息
  36.             channel.basicAck(deliveryTag,false);
  37.         }catch (Exception e){
  38.             //说明消费消息处理失败,如果不进行确认(自动确认,投递成功即确认,消费是否正常,不关心),消息就会丢失
  39.             //消息处理失败确认,代表消息没有正确消费,注意:此种方式一次只能确认一个消息
  40.             //第一给参数是消息的唯一标识,
  41.             //第二个参数是代表是否重回队列,如果是true,重新将该消息放入队列,再次消费
  42.             //注意:第二个参数要谨慎,必须要结合具体业务场景,根据业务判断是否需要重回队列,一旦处理不当,机会导致消息循环入队,消息挤压
  43.             //不重回队列 require = false
  44. //            channel.basicReject(deliveryTag,false);
  45.             //重回队列 require = true
  46.             channel.basicReject(deliveryTag,true);
  47.             //消息处理失败确认,代表消息没有正确消费,注意,此种方式支持批量
  48.             //第一个参数是消息的唯一标识,
  49.             //第二个参数是代表是否支持批量确认
  50.             //第三给参数代表是否重回队列
  51.             //不重回队列 require = false
  52. //            channel.basicNack(deliveryTag,true,false);
  53.             //重回队列 require = true
  54. //            channel.basicNack(deliveryTag,false,true);
  55.             //TODO 手动消费异常处理
  56.             log.error("====== 消费消息失败,异常信息:{}  ======",e.getMessage());
  57.         }
  58.     }
  59. }
复制代码
3.4.2.2  消费消息手动确认配置类


  • 配置消费者的数量 setConcurrentConsumers(2);
  • 最大并发消费者数量 setMaxConcurrentConsumers(5);
  • 消费消息确认机制为手动 setAcknowledgeMode(AcknowledgeMode.MANUAL);
  • 设置监听消息队列的名称,支持多个队列setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96);
  • 设置消息手动确认监听器 setMessageListener(rabbitMQConsumerManualAckListener);
  1. /**
  2. * Created On : 2/11/2022.
  3. * <p>
  4. * Author : huayu
  5. * <p>
  6. * Description: RabbitMQ  消费消息手动确认配置类
  7. */
  8. @Configuration
  9. public class RabbitMQConsumeManualAckConfig {
  10.     @Autowired
  11.     private RabbitMQConsumerManualAckListener rabbitMQConsumerManualAckListener;
  12.     /**
  13.      * @author : huayu
  14.      * @date   : 2/11/2022
  15.      * @param  : [connectionFactory]
  16.      * @return : org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
  17.      * @description : 自定义消息监听器工程对象
  18.      */
  19.     @Bean
  20.     public SimpleMessageListenerContainer simpleBrokerMessageHandler(ConnectionFactory connectionFactory){
  21.         //初始化消息监听容器的工程对象
  22.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  23.         //初始化并发消费者的数量,比如是2,代表同时会有 两个消费者 消费消息
  24.         // ,投递标识可能会相同
  25.         container.setConcurrentConsumers(2);
  26.         //设置最大的并发消费者数量,数量不能低于初始化并发消费者数量
  27.         //可以动态的设定当前容器的消费者数量,可以实现动态增加和减少消费者的算法在 SimpleMessageListenerContainer类中实现
  28.         container.setMaxConcurrentConsumers(5);
  29.         //底层动态实现消费者数量的增加减少原理
  30.         // 有consumer已连续十个周期(consecutiveActiveTrigger)处于活动状态,并且自启动后最后一个consumer运行至少经过了10秒钟,则将启动新的consumer。
  31.         // private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000;
  32.         // 停止消费者算法的时间间隔
  33.         // 有consumer已连续10个周期(consecutiveIdleTrigger)连续空闲状态,并且上一个consumer至少在60秒之前停止,那么该consumer将停止
  34.         // private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000;
  35.         // 默认连续活动10个周期
  36.         // private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;
  37.         // 默认连续空闲10个周期
  38.         // private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10;
  39.         //默认的消费消息确认机制是自动,需要改为手动
  40.         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  41.         //设置监听消息队列的名称,支持多个队列(队列名1,队列名2...),注意前提是指定的队列必须是存在的
  42.         //监听 直连模式的 RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96 队列
  43.         container.setQueueNames(RabbitMQConstant.RABBITMQ_DIRECT_QUEUE_NAME_KH96);
  44.         //监听 扇形模式的
  45.         //RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE 队列
  46.         //和 RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO 队列
  47. //        container.setQueueNames(RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_ONE
  48. //                ,RabbitMQConstant.RABBITMQ_FANOUT_QUEUE_NAME_KH96_TWO);
  49.         //指定消息确认的处理类,会同时产生多个消费者,参数是上面设置的,
  50.         //注意之前使用直连模式,消息消费者,要注释掉,防止同类型的监听器,处理同一队列
  51.         //如果不是被当前消息确认的处理类消费(使用注解@RabbitListener),会导致消息不执行手动处理
  52.         container.setMessageListener(rabbitMQConsumerManualAckListener);
  53.         // 返回消息监听容器工厂对象
  54.         return container;
  55.     }
  56. }
复制代码
3.4.2.3 请求方法
  1. //======================
  2. /**
  3. * @author : huayu
  4. * @date   : 3/11/2022
  5. * @param  : [ackMsg]
  6. * @return : com.kgc.scd.uitl.RequestResult<java.lang.String>
  7. * @description : 测试 消费者手动确认
  8. */
  9. @GetMapping("/consumeAckManual")
  10. public RequestResult<String> testRabbitMQConsumeAckManual(@RequestParam String ackMsg){
  11.     log.info("------- 测试Ack 手动 确认,发送消息 -------");
  12.     //消息手动确认
  13.     //模拟发送直连消息
  14.     //测试1,2
  15.     rabbitMQDirectProducer.sendDirectMsg2DirectExchange(
  16.         RabbitMQConstant.RABBITMQ_DIRECT_EXCHANGE_KH96
  17.         ,RabbitMQConstant.RABBITMQ_DIRECT_ROUTING_KEY_KH96
  18.         ,JSON.toJSONString(ackMsg));
  19.     return ResultBuildUtil.success("使用直连模式 手动消费确认-消息确认成功");
  20.    
  21.     //测试3
  22.     //模拟发送扇形消息
  23.     //        rabbitMQFanoutProducer.sendFanoutMsg2FanoutExchange(
  24.     //                RabbitMQConstant.RABBITMQ_FANOUT_EXCHANGE_KH96
  25.     //                ,null
  26.     //                ,JSON.toJSONString(ackMsg));
  27.     //
  28.     //
  29.     //        return ResultBuildUtil.success("使用扇形模式 手动消费确认-消息确认成功");
  30. }
复制代码
3.4.2.4 请求测试

3.4.2.4.1 模拟发送直连消息并成功确认

发送请求:

请求结果:

3.4.2.4.2 模拟发送直连消息,抛出异常,重回队列

发送请求:

代码重点:

请求结果:

3.4.2.4.3 模拟发送扇形消息并成功确认

发送请求:

请求结果:





免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

何小豆儿在此

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表