RabbitMQ安装
- docker run -d --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:3.8.15-management
- #网络安全组记得开放端口
- 4369 erlang 发现口
- 5672 client 端通信口
- 15672 管理界面 ui 端口
- 25672 server 间内部通信口
- 访问管理界面
- ip:15672
复制代码 依赖引入
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
复制代码 application.xml
- spring:
- ##----------rabbit配置--------------
- rabbitmq:
- host: 192.168.75.146
- port: 5672
- virtual-host: dev
- username: admin
- password: password
- listener:
- simple:
- #消息确认方式,manual(手动ack) 和auto(自动ack) 。消息队列重试到达次数进入异常交换机--为实现,该策略要为auto
- acknowledge-mode: auto
- retry:
- #开启重试,消费者代码不能添加try catch捕获不往外抛异常
- enabled: true
- #最大重试次数
- max-attempts: 4
- # 重试消息的时间间隔,5秒
- max-interval: 5000
复制代码 RabbitMQ配置文件 (一个交换机,两个队列,routingKey匹配规则适用于两队列)
- @Configuration
- @Data
- public class RabbitMQConfig {
- /**
- * 交换机
- */
- private String shortLinkEventExchange="short_link.event.exchange";
- /**
- * 创建交换机 Topic类型
- * 一般一个微服务一个交换机
- * @return
- */
- @Bean
- public Exchange shortLinkEventExchange(){
- return new TopicExchange(shortLinkEventExchange,true,false);
- //return new FanoutExchange(shortLinkEventExchange,true,false);
- }
- //新增短链相关配置====================================
- /**
- * 新增短链 队列
- */
- private String shortLinkAddLinkQueue="short_link.add.link.queue";
- /**
- * 新增短链映射 队列
- */
- private String shortLinkAddMappingQueue="short_link.add.mapping.queue";
- /**
- * 新增短链具体的routingKey,【发送消息使用】
- */
- private String shortLinkAddRoutingKey="short_link.add.link.mapping.routing.key";
- /**
- * topic类型的binding key,用于绑定队列和交换机,是用于 link 消费者
- */
- private String shortLinkAddLinkBindingKey="short_link.add.link.*.routing.key";
- /**
- * topic类型的binding key,用于绑定队列和交换机,是用于 mapping 消费者
- */
- private String shortLinkAddMappingBindingKey="short_link.add.*.mapping.routing.key";
- /**
- * 新增短链api队列和交换机的绑定关系建立
- */
- @Bean
- public Binding shortLinkAddApiBinding(){
- return new Binding(shortLinkAddLinkQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddLinkBindingKey,null);
- }
- /**
- * 新增短链mapping队列和交换机的绑定关系建立
- */
- @Bean
- public Binding shortLinkAddMappingBinding(){
- return new Binding(shortLinkAddMappingQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddMappingBindingKey,null);
- }
- /**
- * 新增短链Link普通队列,用于被监听
- */
- @Bean
- public Queue shortLinkAddLinkQueue(){
- return new Queue(shortLinkAddLinkQueue,true,false,false);
- }
- /**
- * 新增短链mapping 普通队列,用于被监听
- */
- @Bean
- public Queue shortLinkAddMappingQueue(){
- return new Queue(shortLinkAddMappingQueue,true,false,false);
- }
复制代码 对应的两个消费者
- @Component
- @Slf4j
- //@RabbitListener(queues = "short_link.add.link.queue")
- @RabbitListener(queuesToDeclare = { @Queue("short_link.add.link.queue") })
- public class ShortLinkAddLinkMQListener {
- @Autowired
- private ShortLinkService shortLinkService;
- @RabbitHandler
- public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException {
- log.info("ShortLinkAddLinkMQListener message:{}",message);
- try{
- eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_LINK.name());
- shortLinkService.handlerAddShortLink(eventMessage);
- }catch (Exception e){
- // 处理业务异常,等其他操作
- log.error("消费失败:{}",eventMessage);
- throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
- }
- log.info("消费成功:{}",eventMessage);
- //确认消息消费成功
- // channel.basicAck(tag,false);
- }
- }
复制代码- @Component
- @Slf4j
- //@RabbitListener(queues = "short_link.add.mapping.queue")
- @RabbitListener(queuesToDeclare = { @Queue("short_link.add.mapping.queue") })
- public class ShortLinkAddMappingMQListener {
- @Autowired
- private ShortLinkService shortLinkService;
- @RabbitHandler
- public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException {
- log.info("ShortLinkAddMappingMQListener message:{}",message);
- try{
- eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_MAPPING.name());
- shortLinkService.handlerAddShortLink(eventMessage);
- }catch (Exception e){
- // 处理业务异常,等其他操作
- log.error("消费失败:{}",eventMessage);
- throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
- }
- log.info("消费成功:{}",eventMessage);
- //确认消息消费成功
- // channel.basicAck(tag,false);
- }
- }
复制代码 实战,这里直接在controller层中编码。(应该在Service实现)
- @RestController
- @RequestMapping("/api/link/v1")
- public class ShortLinkController {
- @Autowired
- private ShortLinkService shortLinkService;
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 新增短链
- * @param shortLinkAddRequest
- * @return
- */
- @PostMapping("add")
- public JsonData createShortLink(@RequestBody ShortLinkAddRequest shortLinkAddRequest){
- //参数:交换机、匹配规则Key、信息对象
- EventMessage eventMessage = EventMessage.builder().[设置对象各字段信息]
- .build();
- //生成MQ。
- rabbitTemplate.convertAndSend(rabbitMQConfig.getShortLinkEventExchange(), rabbitMQConfig.getShortLinkAddRoutingKey(), eventMessage);
-
- return jsonData;
- }
-
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |