RabbitMQ的安装、配置和实战

鼠扑  金牌会员 | 2023-7-31 19:04:58 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 921|帖子 921|积分 2763

RabbitMQ安装
  1. docker run -d  --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:3.8.15-management
  2. #网络安全组记得开放端口
  3. 4369 erlang 发现口
  4. 5672 client 端通信口
  5. 15672 管理界面 ui 端口
  6. 25672 server 间内部通信口
  7. 访问管理界面
  8. ip:15672
复制代码
依赖引入
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
application.xml
  1. spring:
  2.   ##----------rabbit配置--------------
  3.   rabbitmq:
  4.     host: 192.168.75.146
  5.     port: 5672
  6.     virtual-host: dev
  7.     username: admin
  8.     password: password
  9.     listener:
  10.       simple:
  11.         #消息确认方式,manual(手动ack) 和auto(自动ack) 。消息队列重试到达次数进入异常交换机--为实现,该策略要为auto
  12.         acknowledge-mode: auto
  13.         retry:
  14.           #开启重试,消费者代码不能添加try catch捕获不往外抛异常
  15.           enabled: true
  16.           #最大重试次数
  17.           max-attempts: 4
  18.           # 重试消息的时间间隔,5秒
  19.           max-interval: 5000
复制代码
RabbitMQ配置文件 (一个交换机,两个队列,routingKey匹配规则适用于两队列)
  1. @Configuration
  2. @Data
  3. public class RabbitMQConfig {
  4.     /**
  5.      * 交换机
  6.      */
  7.     private String shortLinkEventExchange="short_link.event.exchange";
  8.     /**
  9.      * 创建交换机 Topic类型
  10.      * 一般一个微服务一个交换机
  11.      * @return
  12.      */
  13.     @Bean
  14.     public Exchange shortLinkEventExchange(){
  15.         return new TopicExchange(shortLinkEventExchange,true,false);
  16.         //return new FanoutExchange(shortLinkEventExchange,true,false);
  17.     }
  18.     //新增短链相关配置====================================
  19.     /**
  20.      * 新增短链 队列
  21.      */
  22.     private String shortLinkAddLinkQueue="short_link.add.link.queue";
  23.     /**
  24.      * 新增短链映射 队列
  25.      */
  26.     private String shortLinkAddMappingQueue="short_link.add.mapping.queue";
  27.     /**
  28.      * 新增短链具体的routingKey,【发送消息使用】
  29.      */
  30.     private String shortLinkAddRoutingKey="short_link.add.link.mapping.routing.key";
  31.     /**
  32.      * topic类型的binding key,用于绑定队列和交换机,是用于 link 消费者
  33.      */
  34.     private String shortLinkAddLinkBindingKey="short_link.add.link.*.routing.key";
  35.     /**
  36.      * topic类型的binding key,用于绑定队列和交换机,是用于 mapping 消费者
  37.      */
  38.     private String shortLinkAddMappingBindingKey="short_link.add.*.mapping.routing.key";
  39.     /**
  40.      * 新增短链api队列和交换机的绑定关系建立
  41.      */
  42.     @Bean
  43.     public Binding shortLinkAddApiBinding(){
  44.         return new Binding(shortLinkAddLinkQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddLinkBindingKey,null);
  45.     }
  46.     /**
  47.      * 新增短链mapping队列和交换机的绑定关系建立
  48.      */
  49.     @Bean
  50.     public Binding shortLinkAddMappingBinding(){
  51.         return new Binding(shortLinkAddMappingQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddMappingBindingKey,null);
  52.     }
  53.     /**
  54.      * 新增短链Link普通队列,用于被监听
  55.      */
  56.     @Bean
  57.     public Queue shortLinkAddLinkQueue(){
  58.         return new Queue(shortLinkAddLinkQueue,true,false,false);
  59.     }
  60.     /**
  61.      * 新增短链mapping 普通队列,用于被监听
  62.      */
  63.     @Bean
  64.     public Queue shortLinkAddMappingQueue(){
  65.         return new Queue(shortLinkAddMappingQueue,true,false,false);
  66.     }
复制代码
对应的两个消费者
  1. @Component
  2. @Slf4j
  3. //@RabbitListener(queues = "short_link.add.link.queue")
  4. @RabbitListener(queuesToDeclare = { @Queue("short_link.add.link.queue") })
  5. public class ShortLinkAddLinkMQListener {
  6.     @Autowired
  7.     private ShortLinkService shortLinkService;
  8.     @RabbitHandler
  9.     public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException {
  10.         log.info("ShortLinkAddLinkMQListener message:{}",message);
  11.         try{
  12.             eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_LINK.name());
  13.             shortLinkService.handlerAddShortLink(eventMessage);
  14.         }catch (Exception e){
  15.             // 处理业务异常,等其他操作
  16.             log.error("消费失败:{}",eventMessage);
  17.             throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
  18.         }
  19.         log.info("消费成功:{}",eventMessage);
  20.         //确认消息消费成功
  21. //        channel.basicAck(tag,false);
  22.     }
  23. }
复制代码
  1. @Component
  2. @Slf4j
  3. //@RabbitListener(queues = "short_link.add.mapping.queue")
  4. @RabbitListener(queuesToDeclare = { @Queue("short_link.add.mapping.queue") })
  5. public class ShortLinkAddMappingMQListener {
  6.     @Autowired
  7.     private ShortLinkService shortLinkService;
  8.     @RabbitHandler
  9.     public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException {
  10.         log.info("ShortLinkAddMappingMQListener message:{}",message);
  11.         try{
  12.             eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_MAPPING.name());
  13.             shortLinkService.handlerAddShortLink(eventMessage);
  14.         }catch (Exception e){
  15.             // 处理业务异常,等其他操作
  16.             log.error("消费失败:{}",eventMessage);
  17.             throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
  18.         }
  19.         log.info("消费成功:{}",eventMessage);
  20.         //确认消息消费成功
  21. //        channel.basicAck(tag,false);
  22.     }
  23. }
复制代码
实战,这里直接在controller层中编码。(应该在Service实现)
  1. @RestController
  2. @RequestMapping("/api/link/v1")
  3. public class ShortLinkController {
  4.     @Autowired
  5.     private ShortLinkService shortLinkService;
  6.    
  7.     @Autowired
  8.     private RabbitTemplate rabbitTemplate;
  9.    
  10.     /**
  11.      * 新增短链
  12.      * @param shortLinkAddRequest
  13.      * @return
  14.      */
  15.     @PostMapping("add")
  16.     public JsonData createShortLink(@RequestBody ShortLinkAddRequest shortLinkAddRequest){
  17.         //参数:交换机、匹配规则Key、信息对象
  18.         EventMessage eventMessage = EventMessage.builder().[设置对象各字段信息]
  19.             .build();
  20.         //生成MQ。
  21.         rabbitTemplate.convertAndSend(rabbitMQConfig.getShortLinkEventExchange(), rabbitMQConfig.getShortLinkAddRoutingKey(), eventMessage);
  22.         
  23.         return jsonData;
  24.     }
  25.    
  26. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

鼠扑

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表