RabbitMQ快速使用代码手册

曹旭辉  金牌会员 | 2023-6-16 17:42:33 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 835|帖子 835|积分 2505

本篇博客的内容为RabbitMQ在开发过程中的快速上手使用,侧重于代码部分,几乎没有相关概念的介绍,相关概念请参考以下csdn博客,两篇都是我找的精华帖,供大家学习。本篇博客也持续更新~~~
内容代码部分由于word转md格式有些问题,可以直接查看我的有道云笔记,链接:https://note.youdao.com/s/Ab7Cjiu
参考文档

csdn博客:
基础部分:https://blog.csdn.net/qq_35387940/article/details/100514134
高级部分:https://blog.csdn.net/weixin_49076273/article/details/124991012
application.yml
  1. server:
  2. port: 8021
  3. spring:
  4. #给项目来个名字
  5. application:
  6. name: rabbitmq-provider
  7. #配置rabbitMq 服务器
  8. rabbitmq:
  9. host: 127.0.0.1
  10. port: 5672
  11. username: root
  12. password: root
  13. #虚拟host 可以不设置,使用server默认host
  14. virtual-host: JCcccHost
  15. #确认消息已发送到交换机(Exchange)
  16. #publisher-confirms: true
  17. publisher-confirm-type: correlated
  18. #确认消息已发送到队列(Queue)
  19. publisher-returns: true
复制代码
完善更多信息
  1. spring:
  2. rabbitmq:
  3. host: localhost
  4. port: 5672
  5. virtual-host: /
  6. username: guest
  7. password: guest
  8. publisher-confirm-type: correlated
  9. publisher-returns: true
  10. template:
  11. mandatory: true
  12. retry:
  13. #发布重试,默认false
  14. enabled: true
  15. #重试时间 默认1000ms
  16. initial-interval: 1000
  17. #重试最大次数 最大3
  18. max-attempts: 3
  19. #重试最大间隔时间
  20. max-interval: 10000
  21. #重试的时间隔乘数,比如配2,0
  22. 第一次等于10s,第二次等于20s,第三次等于40s
  23. multiplier: 1
  24. listener:
  25. \# 默认配置是simple
  26. type: simple
  27. simple:
  28. \# 手动ack Acknowledge mode of container. auto none
  29. acknowledge-mode: manual
  30. #消费者调用程序线程的最小数量
  31. concurrency: 10
  32. #消费者最大数量
  33. max-concurrency: 10
  34. #限制消费者每次只处理一条信息,处理完在继续下一条
  35. prefetch: 1
  36. #启动时是否默认启动容器
  37. auto-startup: true
  38. #被拒绝时重新进入队列
  39. default-requeue-rejected: true
复制代码
相关注解说明

@RabbitListener 注解是指定某方法作为消息消费的方法,例如监听某 Queue
里面的消息。
@RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致。
  1. \@Component
  2. public class PointConsumer {
  3. //监听的队列名
  4. \@RabbitListener(queues = "point.to.point")
  5. public void processOne(String name) {
  6. System.out.println("point.to.point:" + name);
  7. }
  8. }
复制代码
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给
@RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。
  1. \@Component
  2. \@RabbitListener(queues = "consumer_queue")
  3. public class Receiver {
  4. \@RabbitHandler
  5. public void processMessage1(String message) {
  6. System.out.println(message);
  7. }
  8. \@RabbitHandler
  9. public void processMessage2(byte\[\] message) {
  10. System.out.println(new String(message));
  11. }
  12. }
复制代码
@Payload
可以获取消息中的 body 信息
  1. \@RabbitListener(queues = "debug")
  2. public void processMessage1(@Payload String body) {
  3. System.out.println("body:"+body);
  4. }
复制代码
@Header,@Headers
可以获得消息中的 headers 信息
  1. \@RabbitListener(queues = "debug")
  2. public void processMessage1(@Payload String body, \@Header String token)
  3. {
  4. System.out.println("body:"+body);
  5. System.out.println("token:"+token);
  6. }
  7. \@RabbitListener(queues = "debug")
  8. public void processMessage1(@Payload String body, \@Headers
  9. Map\<String,Object\> headers) {
  10. System.out.println("body:"+body);
  11. System.out.println("Headers:"+headers);
  12. }
复制代码
快速使用

配置xml文件
  1. <dependency\>
  2. \<groupId\>org.springframework.boot\</groupId\>
  3. \<artifactId\>spring-boot-starter-amqp\</artifactId\>
  4. \</dependency\>
复制代码
配置exchange、queue

注解快速创建版本
  1. \@Configuration
  2. public class RabbitmqConfig {
  3. //创建交换机
  4. //通过ExchangeBuilder能创建direct、topic、Fanout类型的交换机
  5. \@Bean("bootExchange")
  6. public Exchange bootExchange() {
  7. return
  8. ExchangeBuilder.topicExchange("zx_topic_exchange").durable(true).build();
  9. }
  10. //创建队列
  11. \@Bean("bootQueue")
  12. public Queue bootQueue() {
  13. return QueueBuilder.durable("zx_queue").build();
  14. }
  15. /\*\*
  16. \* 将队列与交换机绑定
  17. \*
  18. \* \@param queue
  19. \* \@param exchange
  20. \* \@return
  21. \*/
  22. \@Bean
  23. public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,
  24. \@Qualifier("bootExchange") Exchange exchange) {
  25. return
  26. BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
  27. }
  28. }
复制代码
Direct
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /\*\*
  8. \* \@Author : JCccc
  9. \* \@CreateTime : 2019/9/3
  10. \* \@Description :
  11. \*\*/
  12. \@Configuration
  13. public class DirectRabbitConfig {
  14. //队列 起名:TestDirectQueue
  15. \@Bean
  16. public Queue TestDirectQueue() {
  17. //
  18. durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  19. //
  20. exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
  21. //
  22. autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
  23. // return new Queue("TestDirectQueue",true,true,false);
  24. //一般设置一下队列的持久化就好,其余两个就是默认false
  25. return new Queue("TestDirectQueue",true);
  26. }
  27. //Direct交换机 起名:TestDirectExchange
  28. \@Bean
  29. DirectExchange TestDirectExchange() {
  30. // return new DirectExchange("TestDirectExchange",true,true);
  31. return new DirectExchange("TestDirectExchange",true,false);
  32. }
  33. //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  34. \@Bean
  35. Binding bindingDirect() {
  36. return
  37. BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  38. }
  39. \@Bean
  40. DirectExchange lonelyDirectExchange() {
  41. return new DirectExchange("lonelyDirectExchange");
  42. }
  43. }
复制代码
Fanout
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.FanoutExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /\*\*
  8. \* \@Author : JCccc
  9. \* \@CreateTime : 2019/9/3
  10. \* \@Description :
  11. \*\*/
  12. \@Configuration
  13. public class FanoutRabbitConfig {
  14. /\*\*
  15. \* 创建三个队列 :fanout.A fanout.B fanout.C
  16. \* 将三个队列都绑定在交换机 fanoutExchange 上
  17. \* 因为是扇型交换机, 路由键无需配置,配置也不起作用
  18. \*/
  19. \@Bean
  20. public Queue queueA() {
  21. return new Queue("fanout.A");
  22. }
  23. \@Bean
  24. public Queue queueB() {
  25. return new Queue("fanout.B");
  26. }
  27. \@Bean
  28. public Queue queueC() {
  29. return new Queue("fanout.C");
  30. }
  31. \@Bean
  32. FanoutExchange fanoutExchange() {
  33. return new FanoutExchange("fanoutExchange");
  34. }
  35. \@Bean
  36. Binding bindingExchangeA() {
  37. return BindingBuilder.bind(queueA()).to(fanoutExchange());
  38. }
  39. \@Bean
  40. Binding bindingExchangeB() {
  41. return BindingBuilder.bind(queueB()).to(fanoutExchange());
  42. }
  43. \@Bean
  44. Binding bindingExchangeC() {
  45. return BindingBuilder.bind(queueC()).to(fanoutExchange());
  46. }
  47. }
复制代码
Topic
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.core.TopicExchange;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /\*\*
  8. \* \@Author : JCccc
  9. \* \@CreateTime : 2019/9/3
  10. \* \@Description :
  11. \*\*/
  12. \@Configuration
  13. public class TopicRabbitConfig {
  14. //绑定键
  15. public final static String man = "topic.man";
  16. public final static String woman = "topic.woman";
  17. \@Bean
  18. public Queue firstQueue() {
  19. return new Queue(TopicRabbitConfig.man);
  20. }
  21. \@Bean
  22. public Queue secondQueue() {
  23. return new Queue(TopicRabbitConfig.woman);
  24. }
  25. \@Bean
  26. TopicExchange exchange() {
  27. return new TopicExchange("topicExchange");
  28. }
  29. //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
  30. //这样只要是消息携带的路由键是topic.man,才会分发到该队列
  31. \@Bean
  32. Binding bindingExchangeMessage() {
  33. return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
  34. }
  35. //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
  36. // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
  37. \@Bean
  38. Binding bindingExchangeMessage2() {
  39. return
  40. BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
  41. }
  42. }
复制代码
生产者发送消息

直接发送给队列
  1. //指定消息队列的名字,直接发送消息到消息队列中
  2. \@Test
  3. public void testSimpleQueue() {
  4. // 队列名称
  5. String queueName = "simple.queue";
  6. // 消息
  7. String message = "hello, spring amqp!";
  8. // 发送消息
  9. rabbitTemplate.convertAndSend(queueName, message);
  10. }
复制代码
发送给交换机,然后走不同的模式
  1. ////指定交换机的名字,将消息发送给交换机,然后不同模式下,消息队列根据key得到消息
  2. \@Test
  3. public void testSendDirectExchange() {
  4. // 交换机名称,有三种类型
  5. String exchangeName = "itcast.direct";
  6. // 消息
  7. String message =
  8. "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
  9. // 发送消息,red为队列的key,因此此队列会得到消息
  10. rabbitTemplate.convertAndSend(exchangeName, "red", message);
  11. }
复制代码
也可以将发送的消息封装到HashMap中然后发送给交换机
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import java.time.LocalDateTime;
  6. import java.time.format.DateTimeFormatter;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. import java.util.UUID;
  10. /\*\*
  11. \* \@Author : JCccc
  12. \* \@CreateTime : 2019/9/3
  13. \* \@Description :
  14. \*\*/
  15. \@RestController
  16. public class SendMessageController {
  17. \@Autowired
  18. RabbitTemplate rabbitTemplate;
  19. //使用RabbitTemplate,这提供了接收/发送等等方法
  20. \@GetMapping("/sendDirectMessage")
  21. public String sendDirectMessage() {
  22. String messageId = String.valueOf(UUID.randomUUID());
  23. String messageData = "test message, hello!";
  24. String createTime =
  25. LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd
  26. HH:mm:ss"));
  27. Map\<String,Object\> map=new HashMap\<\>();
  28. map.put("messageId",messageId);
  29. map.put("messageData",messageData);
  30. map.put("createTime",createTime);
  31. //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
  32. rabbitTemplate.convertAndSend("TestDirectExchange",
  33. "TestDirectRouting", map);
  34. return "ok";
  35. }
  36. }
复制代码
消费者接收消息
  1. //使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
  2. \@Component
  3. public class MessageListener {
  4. \@RabbitListener(queues = "direct_queue")
  5. public void receive(String id){
  6. System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id);
  7. }
  8. }
  9. 参数用Map接收也可以
  10. \@Component
  11. \@RabbitListener(queues = "TestDirectQueue")//监听的队列名称
  12. TestDirectQueue
  13. public class DirectReceiver {
  14. \@RabbitHandler
  15. public void process(Map testMessage) {
  16. System.out.println("DirectReceiver消费者收到消息 : " +
  17. testMessage.toString());
  18. }
  19. }
复制代码
高级特性

消息可靠性传递

有confirm和return两种
在application.yml中添加以下配置项:
  1. server:
  2. port: 8021
  3. spring:
  4. #给项目来个名字
  5. application:
  6. name: rabbitmq-provider
  7. #配置rabbitMq 服务器
  8. rabbitmq:
  9. host: 127.0.0.1
  10. port: 5672
  11. username: root
  12. password: root
  13. #虚拟host 可以不设置,使用server默认host
  14. virtual-host: JCcccHost
  15. #确认消息已发送到交换机(Exchange)
  16. #publisher-confirms: true
  17. publisher-confirm-type: correlated
  18. #确认消息已发送到队列(Queue)
  19. publisher-returns: true
复制代码
有两种配置方法:
写到配置类中
写到工具类或者普通类中,但是这个类得实现那两个接口
写法一

编写消息确认回调函数
  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. \@Configuration
  8. public class RabbitConfig {
  9. \@Bean
  10. public RabbitTemplate createRabbitTemplate(ConnectionFactory
  11. connectionFactory){
  12. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  13. rabbitTemplate.setConnectionFactory(connectionFactory);
  14. //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
  15. rabbitTemplate.setMandatory(true);
  16. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  17. \@Override
  18. public void confirm(CorrelationData correlationData, boolean ack, String
  19. cause) {
  20. System.out.println("ConfirmCallback:
  21. "+"相关数据:"+correlationData);
  22. System.out.println("ConfirmCallback: "+"确认情况:"+ack);
  23. System.out.println("ConfirmCallback: "+"原因:"+cause);
  24. }
  25. });
  26. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  27. \@Override
  28. public void returnedMessage(Message message, int replyCode, String
  29. replyText, String exchange, String routingKey) {
  30. System.out.println("ReturnCallback: "+"消息:"+message);
  31. System.out.println("ReturnCallback: "+"回应码:"+replyCode);
  32. System.out.println("ReturnCallback: "+"回应信息:"+replyText);
  33. System.out.println("ReturnCallback: "+"交换机:"+exchange);
  34. System.out.println("ReturnCallback: "+"路由键:"+routingKey);
  35. }
  36. });
  37. return rabbitTemplate;
  38. }
  39. }
复制代码
写法二
  1. \@Component
  2. \@Slf4j
  3. public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,
  4. RabbitTemplate.ReturnsCallback {
  5. \@Resource
  6. private RedisTemplate\<String, String\> redisTemplate;
  7. \@Resource
  8. private RabbitTemplate rabbitTemplate;
  9. private String finalId = null;
  10. private SmsDTO smsDTO = null;
  11. /\*\*
  12. \* 发布者确认的回调
  13. \*
  14. \* \@param correlationData 回调的相关数据。
  15. \* \@param b ack为真,nack为假
  16. \* \@param s 一个可选的原因,用于nack,如果可用,否则为空。
  17. \*/
  18. \@Override
  19. public void confirm(CorrelationData correlationData, boolean b, String
  20. s) {
  21. // 消息发送成功,将redis中消息的状态(status)修改为1
  22. if (b) {
  23. redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +
  24. finalId, "status", 1);
  25. } else {
  26. // 发送失败,放入redis失败集合中,并删除集合数据
  27. log.error("短信消息投送失败:{}\--\>{}", correlationData, s);
  28. redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
  29. redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
  30. this.smsDTO);
  31. }
  32. }
  33. /\*\*
  34. \* 发生异常时的消息返回提醒
  35. \*
  36. \* \@param returnedMessage
  37. \*/
  38. \@Override
  39. public void returnedMessage(ReturnedMessage returnedMessage) {
  40. log.error("发生异常,返回消息回调:{}", returnedMessage);
  41. // 发送失败,放入redis失败集合中,并删除集合数据
  42. redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
  43. redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
  44. this.smsDTO);
  45. }
  46. \@PostConstruct
  47. public void init() {
  48. rabbitTemplate.setConfirmCallback(this);
  49. rabbitTemplate.setReturnsCallback(this);
  50. }
  51. }
复制代码
消息确认机制

手动确认
  1. yml配置
  2. #手动确认 manual
  3. listener:
  4. simple:
  5. acknowledge-mode: manual
复制代码
写法一

首先在消费者项目中创建MessageListenerConfig
  1. import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;
  2. import org.springframework.amqp.core.AcknowledgeMode;
  3. import org.springframework.amqp.core.Queue;
  4. import
  5. org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  6. import
  7. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. \@Configuration
  12. public class MessageListenerConfig {
  13. \@Autowired
  14. private CachingConnectionFactory connectionFactory;
  15. \@Autowired
  16. private MyAckReceiver myAckReceiver;//消息接收处理类
  17. \@Bean
  18. public SimpleMessageListenerContainer simpleMessageListenerContainer() {
  19. SimpleMessageListenerContainer container = new
  20. SimpleMessageListenerContainer(connectionFactory);
  21. container.setConcurrentConsumers(1);
  22. container.setMaxConcurrentConsumers(1);
  23. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //
  24. RabbitMQ默认是自动确认,这里改为手动确认消息
  25. //设置一个队列
  26. container.setQueueNames("TestDirectQueue");
  27. //如果同时设置多个如下: 前提是队列都是必须已经创建存在的
  28. //
  29. container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
  30. //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
  31. //container.setQueues(new Queue("TestDirectQueue",true));
  32. //container.addQueues(new Queue("TestDirectQueue2",true));
  33. //container.addQueues(new Queue("TestDirectQueue3",true));
  34. container.setMessageListener(myAckReceiver);
  35. return container;
  36. }
  37. }
复制代码
然后创建手动确认监听类MyAckReceiver(手动确认模式需要实现ChannelAwareMessageListener)
  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import
  4. org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.ByteArrayInputStream;
  7. import java.io.ObjectInputStream;
  8. import java.util.Map;
  9. \@Component
  10. public class MyAckReceiver implements ChannelAwareMessageListener {
  11. \@Override
  12. public void onMessage(Message message, Channel channel) throws Exception
  13. {
  14. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  15. try {
  16. byte\[\] body = message.getBody();
  17. ObjectInputStream ois = new ObjectInputStream(new
  18. ByteArrayInputStream(body));
  19. Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();
  20. String messageId = msgMap.get("messageId");
  21. String messageData = msgMap.get("messageData");
  22. String createTime = msgMap.get("createTime");
  23. ois.close();
  24. System.out.println(" MyAckReceiver messageId:"+messageId+"
  25. messageData:"+messageData+" createTime:"+createTime);
  26. System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
  27. channel.basicAck(deliveryTag, true);
  28. //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认
  29. delivery_tag 小于等于传入值的所有消息
  30. //channel.basicReject(deliveryTag,
  31. true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
  32. } catch (Exception e) {
  33. channel.basicReject(deliveryTag, false);
  34. e.printStackTrace();
  35. }
  36. }
  37. }
复制代码
如果想实现不同的队列,有不同的监听确认处理机制,做不同的业务处理,那么这样做:
首先需要在配置类中绑定队列,然后只需要根据消息来自不同的队列名进行区分处理即可
  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import
  4. org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.ByteArrayInputStream;
  7. import java.io.ObjectInputStream;
  8. import java.util.Map;
  9. \@Component
  10. public class MyAckReceiver implements ChannelAwareMessageListener {
  11. \@Override
  12. public void onMessage(Message message, Channel channel) throws Exception
  13. {
  14. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  15. try {
  16. byte\[\] body = message.getBody();
  17. ObjectInputStream ois = new ObjectInputStream(new
  18. ByteArrayInputStream(body));
  19. Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();
  20. String messageId = msgMap.get("messageId");
  21. String messageData = msgMap.get("messageData");
  22. String createTime = msgMap.get("createTime");
  23. ois.close();
  24. if
  25. ("TestDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){
  26. System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
  27. System.out.println("消息成功消费到 messageId:"+messageId+"
  28. messageData:"+messageData+" createTime:"+createTime);
  29. System.out.println("执行TestDirectQueue中的消息的业务处理流程\...\...");
  30. }
  31. if
  32. ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){
  33. System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
  34. System.out.println("消息成功消费到 messageId:"+messageId+"
  35. messageData:"+messageData+" createTime:"+createTime);
  36. System.out.println("执行fanout.A中的消息的业务处理流程\...\...");
  37. }
  38. channel.basicAck(deliveryTag, true);
  39. //channel.basicReject(deliveryTag, true);//为true会重新放回队列
  40. } catch (Exception e) {
  41. channel.basicReject(deliveryTag, false);
  42. e.printStackTrace();
  43. }
  44. }
  45. }
复制代码
写法二
  1. \@Component
  2. \@Slf4j
  3. public class SendSmsListener {
  4. \@Resource
  5. private RedisTemplate\<String, String\> redisTemplate;
  6. \@Resource
  7. private SendSmsUtils sendSmsUtils;
  8. /\*\*
  9. \* 监听发送短信普通队列
  10. \* \@param smsDTO
  11. \* \@param message
  12. \* \@param channel
  13. \* \@throws IOException
  14. \*/
  15. \@RabbitListener(queues = SMS_QUEUE_NAME)
  16. public void sendSmsListener(SmsDTO smsDTO, Message message, Channel
  17. channel) throws IOException {
  18. String messageId = message.getMessageProperties().getMessageId();
  19. int retryCount = (int)
  20. redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +
  21. messageId, "retryCount");
  22. if (retryCount \> 3) {
  23. //重试次数大于3,直接放到死信队列
  24. log.error("短信消息重试超过3次:{}", messageId);
  25. //basicReject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
  26. //该方法reject后,该消费者还是会消费到该条被reject的消息。
  27. channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
  28. redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
  29. return;
  30. }
  31. try {
  32. String phoneNum = smsDTO.getPhoneNum();
  33. String code = smsDTO.getCode();
  34. if(StringUtils.isAnyBlank(phoneNum,code)){
  35. throw new RuntimeException("sendSmsListener参数为空");
  36. }
  37. // 发送消息
  38. SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,
  39. code);
  40. SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();
  41. SendStatus sendStatus = sendStatusSet\[0\];
  42. if(!"Ok".equals(sendStatus.getCode()) \|\|!"send
  43. success".equals(sendStatus.getMessage())){
  44. throw new RuntimeException("发送验证码失败");
  45. }
  46. //手动确认消息
  47. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  48. log.info("短信发送成功:{}",smsDTO);
  49. redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
  50. } catch (Exception e) {
  51. redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,"retryCount",retryCount+1);
  52. channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
  53. }
  54. }
  55. /\*\*
  56. \* 监听到发送短信死信队列
  57. \* \@param sms
  58. \* \@param message
  59. \* \@param channel
  60. \* \@throws IOException
  61. \*/
  62. \@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)
  63. public void smsDelayQueueListener(SmsDTO sms, Message message, Channel
  64. channel) throws IOException {
  65. try{
  66. log.error("监听到死信队列消息==\>{}",sms);
  67. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  68. }catch (Exception e){
  69. channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
  70. }
  71. }
  72. }
复制代码
消费端限流
  1. #配置RabbitMQ
  2. spring:
  3. rabbitmq:
  4. host: 192.168.126.3
  5. port: 5672
  6. username: guest
  7. password: guest
  8. virtual-host: /
  9. #开启自动确认 none 手动确认 manual
  10. listener:
  11. simple:
  12. #消费端限流机制必须开启手动确认
  13. acknowledge-mode: manual
  14. #消费端最多拉取的消息条数,签收后不满该条数才会继续拉取
  15. prefetch: 5
复制代码
消息存活时间TTL

可以设置队列的存活时间,也可以设置具体消息的存活时间
设置队列中所有消息的存活时间
return QueueBuilder
.durable(QUEUE_NAME)//队列持久化
.ttl(10000)//设置队列的所有消息存活10s
.build();
即在创建队列时,设置存活时间
设置某条消息的存活时间
//发送消息,并设置该消息的存活时间
  1. \@Test
  2. public void testSendMessage()
  3. {
  4. //1.创建消息属性
  5. MessageProperties messageProperties = new MessageProperties();
  6. //2.设置存活时间
  7. messageProperties.setExpiration("10000");
  8. //3.创建消息对象
  9. Message message = new
  10. Message("sendMessage\...".getBytes(),messageProperties);
  11. //4.发送消息
  12. rabbitTemplate.convertAndSend("my_topic_exchange1","my_routing",message);
  13. }
复制代码
若设置中间的消息的存活时间,当过期时,该消息不会被移除,但是该消息已经不会被消费了,需要等到该消息到队里顶端才会被移除。因为队列是头出,尾进,故而要移除它需要等到它在顶端时才可以。
在队列设置存活时间,也在单条消息设置存活时间,则以时间短的为准
死信队列

死信队列和普通队列没有任何区别,只需要将普通队列需要绑定死信交换机和死信队列就能够实现功能
  1. import org.springframework.amqp.core.\*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. \@Configuration//Rabbit配置类
  6. public class RabbitConfig4 {
  7. private final String DEAD_EXCHANGE = "dead_exchange";
  8. private final String DEAD_QUEUE = "dead_queue";
  9. private final String NORMAL_EXCHANGE = "normal_exchange";
  10. private final String NORMAL_QUEUE = "normal_queue";
  11. //创建死信交换机
  12. \@Bean(DEAD_EXCHANGE)
  13. public Exchange deadExchange()
  14. {
  15. return ExchangeBuilder
  16. .topicExchange(DEAD_EXCHANGE)//交换机类型 ;参数为名字
  17. topic为通配符模式的交换机
  18. .durable(true)//是否持久化,true即存到磁盘,false只在内存上
  19. .build();
  20. }
  21. //创建死信队列
  22. \@Bean(DEAD_QUEUE)
  23. public Queue deadQueue()
  24. {
  25. return QueueBuilder
  26. .durable(DEAD_QUEUE)//队列持久化
  27. //.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源
  28. .build();
  29. }
  30. //死信交换机绑定死信队列
  31. \@Bean
  32. //@Qualifier注解,使用名称装配进行使用
  33. public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange
  34. exchange, \@Qualifier(DEAD_QUEUE) Queue queue)
  35. {
  36. return BindingBuilder
  37. .bind(queue)
  38. .to(exchange)
  39. .with("dead_routing")
  40. .noargs();
  41. }
  42. //创建普通交换机
  43. \@Bean(NORMAL_EXCHANGE)
  44. public Exchange normalExchange()
  45. {
  46. return ExchangeBuilder
  47. .topicExchange(NORMAL_EXCHANGE)//交换机类型 ;参数为名字
  48. topic为通配符模式的交换机
  49. .durable(true)//是否持久化,true即存到磁盘,false只在内存上
  50. .build();
  51. }
  52. //创建普通队列
  53. \@Bean(NORMAL_QUEUE)
  54. public Queue normalQueue()
  55. {
  56. return QueueBuilder
  57. .durable(NORMAL_QUEUE)//队列持久化
  58. //.maxPriority(10)//设置队列的最大优先级,最大可以设置255,但官网推荐不超过10,太高比较浪费资源
  59. .deadLetterExchange(DEAD_EXCHANGE)//绑定死信交换机
  60. .deadLetterRoutingKey("dead_routing")//死信队列路由关键字
  61. .ttl(10000)//消息存活10s
  62. .maxLength(10)//队列最大长度为10
  63. .build();
  64. }
  65. //普通交换机绑定普通队列
  66. \@Bean
  67. //@Qualifier注解,使用名称装配进行使用
  68. public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange
  69. exchange, \@Qualifier(NORMAL_QUEUE) Queue queue)
  70. {
  71. return BindingBuilder
  72. .bind(queue)
  73. .to(exchange)
  74. .with("my_routing")
  75. .noargs();
  76. }
  77. }
复制代码
延迟队列

RabbitMQ并未实现延迟队列功能,所以可以通过死信队列实现延迟队列的功能
即给普通队列设置存活时间30分钟,过期后发送至死信队列,在死信消费者监听死信队列消息,查看订单状态,是否支付,未支付则取消订单,回退库存即可。
消费者监听延迟队列
  1. \@Component
  2. public class ExpireOrderConsumer {
  3. //监听过期订单队列
  4. \@RabbitListener(queues = "expire_queue")
  5. public void listenMessage(String orderId)
  6. {
  7. //模拟处理数据库等业务
  8. System.out.println("查询"+orderId+"号订单的状态,如果已支付无需处理,如果未支付则回退库存");
  9. }
  10. }
  11. 控制层代码
  12. \@RestController
  13. public class OrderController {
  14. \@Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. \@RequestMapping(value = "/place/{orderId}",method =
  17. RequestMethod.GET)
  18. public String placeOrder(@PathVariable String orderId)
  19. {
  20. //模拟service层处理
  21. System.out.println("处理订单数据\...");
  22. //将订单id发送到订单队列
  23. rabbitTemplate.convertAndSend("order_exchange","order_routing",orderId);
  24. return "下单成功,修改库存";
  25. }
  26. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曹旭辉

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表