RabbitMQ可靠投递之confirmCallback确认模式------RabbitMQ

张裕  金牌会员 | 2024-12-4 01:45:46 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 566|帖子 566|积分 1698

  1. 保证消息不丢失,使用事务消息,会导致性能下降250倍,为此需要使用确认机制
  2. confirmCallback确认模式,从消息生产者到服务器交换机
  3. returnCallback若交换机未能投递到队列,需要重新回退到交换机内
  4. 开启发送端确认后
  5. 如果没有投递成功,消息会被记录和保存的
复制代码
保证消息不丢失,利用事件消息,会导致性能下降250倍,为此需要利用确认机制
confirmCallback确认模式,从消息生产者到服务器交换机
returnCallback若交换机未能投递到队列,需要重新回退到交换机内
开开导送端确认后
如果没有投递成功,消息会被记载和保存的
   
  1. spring.application.name=Mall-order
  2. spring.cloud.nacos.config.server-addr=127.0.0.1:8848
  3. spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
  4. spring.rabbitmq.host=192.168.56.10
  5. spring.rabbitmq.port=5672
  6. spring.rabbitmq.virtual-host=/
  7. #只要抵达队列,以异步形式回调
  8. spring.rabbitmq.template.mandatory=true
  9. #新版本的发送确认
  10. spring.rabbitmq.publisher-confirm-type=CORRELATED
  11. spring.cloud.nacos.config.namespace=becc5563-6c9b-47ff-a6b1-580982722371
复制代码

  1. spring.application.name=Mall-order
  2. spring.cloud.nacos.config.server-addr=127.0.0.1:8848
  3. spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
  4. spring.rabbitmq.host=192.168.56.10
  5. spring.rabbitmq.port=5672
  6. spring.rabbitmq.virtual-host=/
  7. #只要抵达队列,以异步形式回调
  8. spring.rabbitmq.template.mandatory=true
  9. #新版本的发送确认
  10. spring.rabbitmq.publisher-confirm-type=CORRELATED
  11. spring.cloud.nacos.config.namespace=becc5563-6c9b-47ff-a6b1-580982722371
复制代码
  
  1. package com.alatus.mall.order.controller;
  2. import com.alatus.mall.order.entity.OrderEntity;
  3. import com.alatus.mall.order.entity.OrderReturnReasonEntity;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.util.Date;
  9. import java.util.UUID;
  10. @RestController
  11. public class RabbitController {
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14.     @GetMapping("/sendMessage")
  15.     public String sendMessage(){
  16.         for (int i = 0; i < 10; i++) {
  17.             if(i % 2 == 0){
  18.                 OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
  19.                 reasonEntity.setId(1L);
  20.                 reasonEntity.setCreateTime(new Date());
  21.                 reasonEntity.setName("哈哈"+i);
  22.                 rabbitTemplate.convertAndSend("helloWorld","hello",reasonEntity);
  23.             }
  24.             else{
  25.                 OrderEntity entity = new OrderEntity();
  26.                 entity.setOrderSn(UUID.randomUUID().toString());
  27.                 rabbitTemplate.convertAndSend("helloWorld","hello",entity);
  28.             }
  29.         }
  30.         return "OK";
  31.     }
  32. }
复制代码
  1. package com.alatus.mall.order.controller;
  2. import com.alatus.mall.order.entity.OrderEntity;
  3. import com.alatus.mall.order.entity.OrderReturnReasonEntity;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.util.Date;
  9. import java.util.UUID;
  10. @RestController
  11. public class RabbitController {
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14.     @GetMapping("/sendMessage")
  15.     public String sendMessage(){
  16.         for (int i = 0; i < 10; i++) {
  17.             if(i % 2 == 0){
  18.                 OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
  19.                 reasonEntity.setId(1L);
  20.                 reasonEntity.setCreateTime(new Date());
  21.                 reasonEntity.setName("哈哈"+i);
  22.                 rabbitTemplate.convertAndSend("helloWorld","hello",reasonEntity);
  23.             }
  24.             else{
  25.                 OrderEntity entity = new OrderEntity();
  26.                 entity.setOrderSn(UUID.randomUUID().toString());
  27.                 rabbitTemplate.convertAndSend("helloWorld","hello",entity);
  28.             }
  29.         }
  30.         return "OK";
  31.     }
  32. }
复制代码
  
  1. package com.alatus.mall.order.config;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  5. import org.springframework.amqp.support.converter.MessageConverter;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import javax.annotation.PostConstruct;
  10. @Configuration
  11. public class RabbitConfig {
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14.     @Bean
  15.     public MessageConverter messageConverter(){
  16.         return new Jackson2JsonMessageConverter();
  17.     }
  18. //    定制RabbitCallback,构造器创建完成,执行此方法
  19.     @PostConstruct
  20.     public void initRabbitTemplate(){
  21. //        设置确认回调
  22.         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  23.             @Override
  24.             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  25.                 System.out.println(""+correlationData+ack+cause);
  26.             }
  27.         });
  28.     }
  29. }
复制代码

  1. package com.alatus.mall.order.config;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  5. import org.springframework.amqp.support.converter.MessageConverter;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import javax.annotation.PostConstruct;
  10. @Configuration
  11. public class RabbitConfig {
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14.     @Bean
  15.     public MessageConverter messageConverter(){
  16.         return new Jackson2JsonMessageConverter();
  17.     }
  18. //    定制RabbitCallback,构造器创建完成,执行此方法
  19.     @PostConstruct
  20.     public void initRabbitTemplate(){
  21. //        设置确认回调
  22.         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  23.             @Override
  24.             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  25.                 System.out.println(""+correlationData+ack+cause);
  26.             }
  27.         });
  28.     }
  29. }
复制代码
  
  1. package com.alatus.mall.order.service.impl;
  2. import com.alatus.mall.order.entity.OrderEntity;
  3. import com.alatus.mall.order.entity.OrderReturnReasonEntity;
  4. import com.rabbitmq.client.Channel;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Service;
  9. import java.util.Map;
  10. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  11. import com.baomidou.mybatisplus.core.metadata.IPage;
  12. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  13. import com.alatus.common.utils.PageUtils;
  14. import com.alatus.common.utils.Query;
  15. import com.alatus.mall.order.dao.OrderItemDao;
  16. import com.alatus.mall.order.entity.OrderItemEntity;
  17. import com.alatus.mall.order.service.OrderItemService;
  18. @Service("orderItemService")
  19. @RabbitListener(queues = "helloJava")
  20. public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
  21.     @Override
  22.     public PageUtils queryPage(Map<String, Object> params) {
  23.         IPage<OrderItemEntity> page = this.page(
  24.                 new Query<OrderItemEntity>().getPage(params),
  25.                 new QueryWrapper<OrderItemEntity>()
  26.         );
  27.         return new PageUtils(page);
  28.     }
  29.     @RabbitHandler
  30.     public void getMessage(Message message, OrderReturnReasonEntity entity, Channel channel){
  31.         System.out.println(entity);
  32.     }
  33.     @RabbitHandler
  34.     public void getMessage1(OrderEntity entity){
  35.         System.out.println(entity);
  36.     }
  37. }
复制代码
  1. package com.alatus.mall.order.service.impl;
  2. import com.alatus.mall.order.entity.OrderEntity;
  3. import com.alatus.mall.order.entity.OrderReturnReasonEntity;
  4. import com.rabbitmq.client.Channel;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Service;
  9. import java.util.Map;
  10. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  11. import com.baomidou.mybatisplus.core.metadata.IPage;
  12. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  13. import com.alatus.common.utils.PageUtils;
  14. import com.alatus.common.utils.Query;
  15. import com.alatus.mall.order.dao.OrderItemDao;
  16. import com.alatus.mall.order.entity.OrderItemEntity;
  17. import com.alatus.mall.order.service.OrderItemService;
  18. @Service("orderItemService")
  19. @RabbitListener(queues = "helloJava")
  20. public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
  21.     @Override
  22.     public PageUtils queryPage(Map<String, Object> params) {
  23.         IPage<OrderItemEntity> page = this.page(
  24.                 new Query<OrderItemEntity>().getPage(params),
  25.                 new QueryWrapper<OrderItemEntity>()
  26.         );
  27.         return new PageUtils(page);
  28.     }
  29.     @RabbitHandler
  30.     public void getMessage(Message message, OrderReturnReasonEntity entity, Channel channel){
  31.         System.out.println(entity);
  32.     }
  33.     @RabbitHandler
  34.     public void getMessage1(OrderEntity entity){
  35.         System.out.println(entity);
  36.     }
  37. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张裕

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表