本篇博客的内容为RabbitMQ在开发过程中的快速上手使用,侧重于代码部分,几乎没有相关概念的介绍,相关概念请参考以下csdn博客,两篇都是我找的精华帖,供大家学习。本篇博客也持续更新~~~
内容代码部分由于word转md格式有些问题,可以直接查看我的有道云笔记,链接:https://note.youdao.com/s/Ab7Cjiu
参考文档
csdn博客:
基础部分:https://blog.csdn.net/qq_35387940/article/details/100514134
高级部分:https://blog.csdn.net/weixin_49076273/article/details/124991012
application.yml
- server:
- port: 8021
- spring:
- #给项目来个名字
- application:
- name: rabbitmq-provider
- #配置rabbitMq 服务器
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: root
- password: root
- #虚拟host 可以不设置,使用server默认host
- virtual-host: JCcccHost
- #确认消息已发送到交换机(Exchange)
- #publisher-confirms: true
- publisher-confirm-type: correlated
- #确认消息已发送到队列(Queue)
- publisher-returns: true
复制代码 完善更多信息- spring:
- rabbitmq:
- host: localhost
- port: 5672
- virtual-host: /
- username: guest
- password: guest
- publisher-confirm-type: correlated
- publisher-returns: true
- template:
- mandatory: true
- retry:
- #发布重试,默认false
- enabled: true
- #重试时间 默认1000ms
- initial-interval: 1000
- #重试最大次数 最大3
- max-attempts: 3
- #重试最大间隔时间
- max-interval: 10000
- #重试的时间隔乘数,比如配2,0
- 第一次等于10s,第二次等于20s,第三次等于40s
- multiplier: 1
- listener:
- \# 默认配置是simple
- type: simple
- simple:
- \# 手动ack Acknowledge mode of container. auto none
- acknowledge-mode: manual
- #消费者调用程序线程的最小数量
- concurrency: 10
- #消费者最大数量
- max-concurrency: 10
- #限制消费者每次只处理一条信息,处理完在继续下一条
- prefetch: 1
- #启动时是否默认启动容器
- auto-startup: true
- #被拒绝时重新进入队列
- default-requeue-rejected: true
复制代码 相关注解说明
@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue
里面的消息。
@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致。- \@Component
- public class PointConsumer {
- //监听的队列名
- \@RabbitListener(queues = "point.to.point")
- public void processOne(String name) {
- System.out.println("point.to.point:" + name);
- }
- }
复制代码 @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给
@RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。- \@Component
- \@RabbitListener(queues = "consumer_queue")
- public class Receiver {
- \@RabbitHandler
- public void processMessage1(String message) {
- System.out.println(message);
- }
- \@RabbitHandler
- public void processMessage2(byte\[\] message) {
- System.out.println(new String(message));
- }
- }
复制代码 @Payload
可以获取消息中的 body 信息- \@RabbitListener(queues = "debug")
- public void processMessage1(@Payload String body) {
- System.out.println("body:"+body);
- }
复制代码 @Header,@Headers
可以获得消息中的 headers 信息- \@RabbitListener(queues = "debug")
- public void processMessage1(@Payload String body, \@Header String token)
- {
- System.out.println("body:"+body);
- System.out.println("token:"+token);
- }
- \@RabbitListener(queues = "debug")
- public void processMessage1(@Payload String body, \@Headers
- Map\<String,Object\> headers) {
- System.out.println("body:"+body);
- System.out.println("Headers:"+headers);
- }
复制代码 快速使用
配置xml文件
- <dependency\>
- \<groupId\>org.springframework.boot\</groupId\>
- \<artifactId\>spring-boot-starter-amqp\</artifactId\>
- \</dependency\>
复制代码 配置exchange、queue
注解快速创建版本
- \@Configuration
- public class RabbitmqConfig {
- //创建交换机
- //通过ExchangeBuilder能创建direct、topic、Fanout类型的交换机
- \@Bean("bootExchange")
- public Exchange bootExchange() {
- return
- ExchangeBuilder.topicExchange("zx_topic_exchange").durable(true).build();
- }
- //创建队列
- \@Bean("bootQueue")
- public Queue bootQueue() {
- return QueueBuilder.durable("zx_queue").build();
- }
- /\*\*
- \* 将队列与交换机绑定
- \*
- \* \@param queue
- \* \@param exchange
- \* \@return
- \*/
- \@Bean
- public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,
- \@Qualifier("bootExchange") Exchange exchange) {
- return
- BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
- }
- }
复制代码 Direct
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /\*\*
- \* \@Author : JCccc
- \* \@CreateTime : 2019/9/3
- \* \@Description :
- \*\*/
- \@Configuration
- public class DirectRabbitConfig {
- //队列 起名:TestDirectQueue
- \@Bean
- public Queue TestDirectQueue() {
- //
- durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- //
- exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
- //
- autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
- // return new Queue("TestDirectQueue",true,true,false);
- //一般设置一下队列的持久化就好,其余两个就是默认false
- return new Queue("TestDirectQueue",true);
- }
- //Direct交换机 起名:TestDirectExchange
- \@Bean
- DirectExchange TestDirectExchange() {
- // return new DirectExchange("TestDirectExchange",true,true);
- return new DirectExchange("TestDirectExchange",true,false);
- }
- //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
- \@Bean
- Binding bindingDirect() {
- return
- BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
- }
- \@Bean
- DirectExchange lonelyDirectExchange() {
- return new DirectExchange("lonelyDirectExchange");
- }
- }
复制代码 Fanout
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /\*\*
- \* \@Author : JCccc
- \* \@CreateTime : 2019/9/3
- \* \@Description :
- \*\*/
- \@Configuration
- public class FanoutRabbitConfig {
- /\*\*
- \* 创建三个队列 :fanout.A fanout.B fanout.C
- \* 将三个队列都绑定在交换机 fanoutExchange 上
- \* 因为是扇型交换机, 路由键无需配置,配置也不起作用
- \*/
- \@Bean
- public Queue queueA() {
- return new Queue("fanout.A");
- }
- \@Bean
- public Queue queueB() {
- return new Queue("fanout.B");
- }
- \@Bean
- public Queue queueC() {
- return new Queue("fanout.C");
- }
- \@Bean
- FanoutExchange fanoutExchange() {
- return new FanoutExchange("fanoutExchange");
- }
- \@Bean
- Binding bindingExchangeA() {
- return BindingBuilder.bind(queueA()).to(fanoutExchange());
- }
- \@Bean
- Binding bindingExchangeB() {
- return BindingBuilder.bind(queueB()).to(fanoutExchange());
- }
- \@Bean
- Binding bindingExchangeC() {
- return BindingBuilder.bind(queueC()).to(fanoutExchange());
- }
- }
复制代码 Topic
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /\*\*
- \* \@Author : JCccc
- \* \@CreateTime : 2019/9/3
- \* \@Description :
- \*\*/
- \@Configuration
- public class TopicRabbitConfig {
- //绑定键
- public final static String man = "topic.man";
- public final static String woman = "topic.woman";
- \@Bean
- public Queue firstQueue() {
- return new Queue(TopicRabbitConfig.man);
- }
- \@Bean
- public Queue secondQueue() {
- return new Queue(TopicRabbitConfig.woman);
- }
- \@Bean
- TopicExchange exchange() {
- return new TopicExchange("topicExchange");
- }
- //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
- //这样只要是消息携带的路由键是topic.man,才会分发到该队列
- \@Bean
- Binding bindingExchangeMessage() {
- return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
- }
- //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
- // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
- \@Bean
- Binding bindingExchangeMessage2() {
- return
- BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
- }
- }
复制代码 生产者发送消息
直接发送给队列- //指定消息队列的名字,直接发送消息到消息队列中
- \@Test
- public void testSimpleQueue() {
- // 队列名称
- String queueName = "simple.queue";
- // 消息
- String message = "hello, spring amqp!";
- // 发送消息
- rabbitTemplate.convertAndSend(queueName, message);
- }
复制代码 发送给交换机,然后走不同的模式- ////指定交换机的名字,将消息发送给交换机,然后不同模式下,消息队列根据key得到消息
- \@Test
- public void testSendDirectExchange() {
- // 交换机名称,有三种类型
- String exchangeName = "itcast.direct";
- // 消息
- String message =
- "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
- // 发送消息,red为队列的key,因此此队列会得到消息
- rabbitTemplate.convertAndSend(exchangeName, "red", message);
- }
复制代码 也可以将发送的消息封装到HashMap中然后发送给交换机- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.UUID;
- /\*\*
- \* \@Author : JCccc
- \* \@CreateTime : 2019/9/3
- \* \@Description :
- \*\*/
- \@RestController
- public class SendMessageController {
- \@Autowired
- RabbitTemplate rabbitTemplate;
- //使用RabbitTemplate,这提供了接收/发送等等方法
- \@GetMapping("/sendDirectMessage")
- public String sendDirectMessage() {
- String messageId = String.valueOf(UUID.randomUUID());
- String messageData = "test message, hello!";
- String createTime =
- LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd
- HH:mm:ss"));
- Map\<String,Object\> map=new HashMap\<\>();
- map.put("messageId",messageId);
- map.put("messageData",messageData);
- map.put("createTime",createTime);
- //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
- rabbitTemplate.convertAndSend("TestDirectExchange",
- "TestDirectRouting", map);
- return "ok";
- }
- }
复制代码 消费者接收消息
- //使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
- \@Component
- public class MessageListener {
- \@RabbitListener(queues = "direct_queue")
- public void receive(String id){
- System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id);
- }
- }
- 参数用Map接收也可以
- \@Component
- \@RabbitListener(queues = "TestDirectQueue")//监听的队列名称
- TestDirectQueue
- public class DirectReceiver {
- \@RabbitHandler
- public void process(Map testMessage) {
- System.out.println("DirectReceiver消费者收到消息 : " +
- testMessage.toString());
- }
- }
复制代码 高级特性
消息可靠性传递
有confirm和return两种
在application.yml中添加以下配置项:- server:
- port: 8021
- spring:
- #给项目来个名字
- application:
- name: rabbitmq-provider
- #配置rabbitMq 服务器
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: root
- password: root
- #虚拟host 可以不设置,使用server默认host
- virtual-host: JCcccHost
- #确认消息已发送到交换机(Exchange)
- #publisher-confirms: true
- publisher-confirm-type: correlated
- #确认消息已发送到队列(Queue)
- publisher-returns: true
复制代码 有两种配置方法:
写到配置类中
写到工具类或者普通类中,但是这个类得实现那两个接口
写法一
编写消息确认回调函数- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- \@Configuration
- public class RabbitConfig {
- \@Bean
- public RabbitTemplate createRabbitTemplate(ConnectionFactory
- connectionFactory){
- RabbitTemplate rabbitTemplate = new RabbitTemplate();
- rabbitTemplate.setConnectionFactory(connectionFactory);
- //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- \@Override
- public void confirm(CorrelationData correlationData, boolean ack, String
- cause) {
- System.out.println("ConfirmCallback:
- "+"相关数据:"+correlationData);
- System.out.println("ConfirmCallback: "+"确认情况:"+ack);
- System.out.println("ConfirmCallback: "+"原因:"+cause);
- }
- });
- rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
- \@Override
- public void returnedMessage(Message message, int replyCode, String
- replyText, String exchange, String routingKey) {
- System.out.println("ReturnCallback: "+"消息:"+message);
- System.out.println("ReturnCallback: "+"回应码:"+replyCode);
- System.out.println("ReturnCallback: "+"回应信息:"+replyText);
- System.out.println("ReturnCallback: "+"交换机:"+exchange);
- System.out.println("ReturnCallback: "+"路由键:"+routingKey);
- }
- });
- return rabbitTemplate;
- }
- }
复制代码 写法二
- \@Component
- \@Slf4j
- public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,
- RabbitTemplate.ReturnsCallback {
- \@Resource
- private RedisTemplate\<String, String\> redisTemplate;
- \@Resource
- private RabbitTemplate rabbitTemplate;
- private String finalId = null;
- private SmsDTO smsDTO = null;
- /\*\*
- \* 发布者确认的回调
- \*
- \* \@param correlationData 回调的相关数据。
- \* \@param b ack为真,nack为假
- \* \@param s 一个可选的原因,用于nack,如果可用,否则为空。
- \*/
- \@Override
- public void confirm(CorrelationData correlationData, boolean b, String
- s) {
- // 消息发送成功,将redis中消息的状态(status)修改为1
- if (b) {
- redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +
- finalId, "status", 1);
- } else {
- // 发送失败,放入redis失败集合中,并删除集合数据
- log.error("短信消息投送失败:{}\--\>{}", correlationData, s);
- redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
- redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
- this.smsDTO);
- }
- }
- /\*\*
- \* 发生异常时的消息返回提醒
- \*
- \* \@param returnedMessage
- \*/
- \@Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- log.error("发生异常,返回消息回调:{}", returnedMessage);
- // 发送失败,放入redis失败集合中,并删除集合数据
- redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
- redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
- this.smsDTO);
- }
- \@PostConstruct
- public void init() {
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnsCallback(this);
- }
- }
复制代码 消息确认机制
手动确认- yml配置
- #手动确认 manual
- listener:
- simple:
- acknowledge-mode: manual
复制代码 写法一
首先在消费者项目中创建MessageListenerConfig- import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;
- import org.springframework.amqp.core.AcknowledgeMode;
- import org.springframework.amqp.core.Queue;
- import
- org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import
- org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- \@Configuration
- public class MessageListenerConfig {
- \@Autowired
- private CachingConnectionFactory connectionFactory;
- \@Autowired
- private MyAckReceiver myAckReceiver;//消息接收处理类
- \@Bean
- public SimpleMessageListenerContainer simpleMessageListenerContainer() {
- SimpleMessageListenerContainer container = new
- SimpleMessageListenerContainer(connectionFactory);
- container.setConcurrentConsumers(1);
- container.setMaxConcurrentConsumers(1);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //
- RabbitMQ默认是自动确认,这里改为手动确认消息
- //设置一个队列
- container.setQueueNames("TestDirectQueue");
- //如果同时设置多个如下: 前提是队列都是必须已经创建存在的
- //
- container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
- //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
- //container.setQueues(new Queue("TestDirectQueue",true));
- //container.addQueues(new Queue("TestDirectQueue2",true));
- //container.addQueues(new Queue("TestDirectQueue3",true));
- container.setMessageListener(myAckReceiver);
- return container;
- }
- }
复制代码 然后创建手动确认监听类MyAckReceiver(手动确认模式需要实现ChannelAwareMessageListener)- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import
- org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
- import java.io.ByteArrayInputStream;
- import java.io.ObjectInputStream;
- import java.util.Map;
- \@Component
- public class MyAckReceiver implements ChannelAwareMessageListener {
- \@Override
- public void onMessage(Message message, Channel channel) throws Exception
- {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- byte\[\] body = message.getBody();
- ObjectInputStream ois = new ObjectInputStream(new
- ByteArrayInputStream(body));
- Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();
- String messageId = msgMap.get("messageId");
- String messageData = msgMap.get("messageData");
- String createTime = msgMap.get("createTime");
- ois.close();
- System.out.println(" MyAckReceiver messageId:"+messageId+"
- messageData:"+messageData+" createTime:"+createTime);
- System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
- channel.basicAck(deliveryTag, true);
- //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认
- delivery_tag 小于等于传入值的所有消息
- //channel.basicReject(deliveryTag,
- true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
- } catch (Exception e) {
- channel.basicReject(deliveryTag, false);
- e.printStackTrace();
- }
- }
- }
复制代码 如果想实现不同的队列,有不同的监听确认处理机制,做不同的业务处理,那么这样做:
首先需要在配置类中绑定队列,然后只需要根据消息来自不同的队列名进行区分处理即可- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import
- org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
- import org.springframework.stereotype.Component;
- import java.io.ByteArrayInputStream;
- import java.io.ObjectInputStream;
- import java.util.Map;
- \@Component
- public class MyAckReceiver implements ChannelAwareMessageListener {
- \@Override
- public void onMessage(Message message, Channel channel) throws Exception
- {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- byte\[\] body = message.getBody();
- ObjectInputStream ois = new ObjectInputStream(new
- ByteArrayInputStream(body));
- Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();
- String messageId = msgMap.get("messageId");
- String messageData = msgMap.get("messageData");
- String createTime = msgMap.get("createTime");
- ois.close();
- if
- ("TestDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){
- System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
- System.out.println("消息成功消费到 messageId:"+messageId+"
- messageData:"+messageData+" createTime:"+createTime);
- System.out.println("执行TestDirectQueue中的消息的业务处理流程\...\...");
- }
- if
- ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){
- System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
- System.out.println("消息成功消费到 messageId:"+messageId+"
- messageData:"+messageData+" createTime:"+createTime);
- System.out.println("执行fanout.A中的消息的业务处理流程\...\...");
- }
- channel.basicAck(deliveryTag, true);
- //channel.basicReject(deliveryTag, true);//为true会重新放回队列
- } catch (Exception e) {
- channel.basicReject(deliveryTag, false);
- e.printStackTrace();
- }
- }
- }
复制代码 写法二
- \@Component
- \@Slf4j
- public class SendSmsListener {
- \@Resource
- private RedisTemplate\<String, String\> redisTemplate;
- \@Resource
- private SendSmsUtils sendSmsUtils;
- /\*\*
- \* 监听发送短信普通队列
- \* \@param smsDTO
- \* \@param message
- \* \@param channel
- \* \@throws IOException
- \*/
- \@RabbitListener(queues = SMS_QUEUE_NAME)
- public void sendSmsListener(SmsDTO smsDTO, Message message, Channel
- channel) throws IOException {
- String messageId = message.getMessageProperties().getMessageId();
- int retryCount = (int)
- redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +
- messageId, "retryCount");
- if (retryCount \> 3) {
- //重试次数大于3,直接放到死信队列
- log.error("短信消息重试超过3次:{}", messageId);
- //basicReject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
- //该方法reject后,该消费者还是会消费到该条被reject的消息。
- channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
- redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
- return;
- }
- try {
- String phoneNum = smsDTO.getPhoneNum();
- String code = smsDTO.getCode();
- if(StringUtils.isAnyBlank(phoneNum,code)){
- throw new RuntimeException("sendSmsListener参数为空");
- }
- // 发送消息
- SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,
- code);
- SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();
- SendStatus sendStatus = sendStatusSet\[0\];
- if(!"Ok".equals(sendStatus.getCode()) \|\|!"send
- success".equals(sendStatus.getMessage())){
- throw new RuntimeException("发送验证码失败");
- }
- //手动确认消息
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- log.info("短信发送成功:{}",smsDTO);
- redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
- } catch (Exception e) {
- redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,"retryCount",retryCount+1);
- channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
- }
- }
- /\*\*
- \* 监听到发送短信死信队列
- \* \@param sms
- \* \@param message
- \* \@param channel
- \* \@throws IOException
- \*/
- \@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)
- public void smsDelayQueueListener(SmsDTO sms, Message message, Channel
- channel) throws IOException {
- try{
- log.error("监听到死信队列消息==\>{}",sms);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }catch (Exception e){
- channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
- }
- }
- }
复制代码 消费端限流
- #配置RabbitMQ
- spring:
- rabbitmq:
- host: 192.168.126.3
- port: 5672
- username: guest
- password: guest
- virtual-host: /
- #开启自动确认 none 手动确认 manual
- listener:
- simple:
- #消费端限流机制必须开启手动确认
- acknowledge-mode: manual
- #消费端最多拉取的消息条数,签收后不满该条数才会继续拉取
- prefetch: 5
复制代码 消息存活时间TTL
可以设置队列的存活时间,也可以设置具体消息的存活时间
设置队列中所有消息的存活时间
return QueueBuilder
.durable(QUEUE_NAME)//队列持久化
.ttl(10000)//设置队列的所有消息存活10s
.build();
即在创建队列时,设置存活时间
设置某条消息的存活时间
//发送消息,并设置该消息的存活时间- \@Test
- public void testSendMessage()
- {
- //1.创建消息属性
- MessageProperties messageProperties = new MessageProperties();
- //2.设置存活时间
- messageProperties.setExpiration("10000");
- //3.创建消息对象
- Message message = new
- Message("sendMessage\...".getBytes(),messageProperties);
- //4.发送消息
- rabbitTemplate.convertAndSend("my_topic_exchange1","my_routing",message);
- }
复制代码 若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。
在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准
死信队列
死信队列和普通队列没有任何区别,只需要将普通队列需要绑定死信交换机和死信队列就能够实现功能延迟队列
RabbitMQ并未实现延迟队列功能,所以可以通过死信队列实现延迟队列的功能
即给普通队列设置存活时间30分钟,过期后发送至死信队列,在死信消费者监听死信队列消息,查看订单状态,是否支付,未支付则取消订单,回退库存即可。
消费者监听延迟队列- \@Component
- public class ExpireOrderConsumer {
- //监听过期订单队列
- \@RabbitListener(queues = "expire_queue")
- public void listenMessage(String orderId)
- {
- //模拟处理数据库等业务
- System.out.println("查询"+orderId+"号订单的状态,如果已支付无需处理,如果未支付则回退库存");
- }
- }
- 控制层代码
- \@RestController
- public class OrderController {
- \@Autowired
- private RabbitTemplate rabbitTemplate;
- \@RequestMapping(value = "/place/{orderId}",method =
- RequestMethod.GET)
- public String placeOrder(@PathVariable String orderId)
- {
- //模拟service层处理
- System.out.println("处理订单数据\...");
- //将订单id发送到订单队列
- rabbitTemplate.convertAndSend("order_exchange","order_routing",orderId);
- return "下单成功,修改库存";
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |