RabbitMQ详解-06RabbitMQ高级

打印 上一主题 下一主题

主题 822|帖子 822|积分 2466

1. 过期时间TTL

可以对消息设置预期的时间,在这个时间内都可以被消耗者接收获取;过了之后消息自动被删除。RabbitMQ可以对消息和队列设置TTL。有以下两种设置方法:


  • 通过队列属性设置,队列中所有消息都有类似的过期时间。
  • 对消息举行单独设置,每条消息TTL可以不同。
若两种方法同时使用,则消息的过期时间以两者之间TTL较小的谁人数值为准。消息在队列的生存时间一旦高出设置的TTL值,就称为dead message被投递到死信队列, 消耗者将无法再收到该消息。
1.1. 设置队列TTL

设置类中设置:
  1. args.put("x-message-ttl",5000);
  2. return QueueBuilder.durable(ITEM_QUEUE).withArguments(args).build();
复制代码
  参数 x-message-ttl 的值必须是非负 32 位整数 (0 <= n <= 2^32-1) ,以毫秒为单元表示 TTL 的值。这样,值 6000 表示存在于 队列 中的当前消息将最多只存活 6 秒钟。
  如果不设置TTL,则表示此消息不会过期。如果将TTL设置为0,则表示除非此时可以直接将消息投递到消耗者,否则该消息会被立刻丢弃。

1.2. 设置消息TTL

在发送消息(可以发送到任何队列,不管该队列是否属于某个互换机)的时候设置过期时间即可。在测试类中编写如下方法发送消息并设置过期时间到队列:
  1. /**
  2.      * 过期消息
  3.      * 该消息投递任何交换机或队列中的时候;如果到了过期时间则将从该队列中删除
  4.      */
  5.     @Test
  6.     public void ttlMessageTest(){
  7.         MessageProperties messageProperties = new MessageProperties();
  8.         //设置消息的过期时间,5秒
  9.         messageProperties.setExpiration("5000");
  10.         Message message = new Message("测试过期消息,5秒钟过期".getBytes(), messageProperties);
  11.         //路由键与队列同名
  12.         rabbitTemplate.convertAndSend("my_ttl_queue", message);
  13.     }
复制代码
  expiration 字段以毫秒为单元表示 TTL 值。且与 x-message-ttl 具有类似的约束条件。由于 expiration 字段必须为字符串范例,broker 将只会接受以字符串情势表达的数字。
当同时指定了 queue 和 message 的 TTL 值,则两者中较小的谁人才会起作用。
  2. 死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信互换机,也称为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个互换机中,这个互换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的缘故原由:


  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度
DLX也是一个正常的互换机,和一样平常的互换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定互换机即可。
2.1消息TTL过期

2.1.1演示

生产者:
  1. public class Producer {
  2.         private static final String NORMAL_EXCHANGE = "normal_exchange";
  3.                 public static void main(String[] argv) throws Exception {
  4.                         try (Channel channel = RabbitMqUtils.getChannel()) {
  5.                                 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  6.                                 //设置消息的 TTL 时间
  7.                                 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
  8.                                 //该信息是用作演示队列个数限制
  9.                                 for (int i = 1; i <11 ; i++) {
  10.                                         String message="info"+i;
  11.                                         channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties, message.getBytes());
  12.                                         System.out.println("生产者发送消息:"+message);
  13.                                 }
  14.                         }
  15.                 }
  16.         }
  17. }
复制代码
消耗者1代码(启动之后关闭该消耗者 模拟其接收不到消息)
  1. public class Consumer01 {
  2.         //普通交换机名称
  3.         private static final String NORMAL_EXCHANGE = "normal_exchange";
  4.         //死信交换机名称
  5.         private static final String DEAD_EXCHANGE = "dead_exchange";
  6.         public static void main(String[] argv) throws Exception {
  7.                 Channel channel = RabbitUtils.getChannel();
  8.                 //声明死信和普通交换机 类型为 direct
  9.                 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  10.                 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  11.                 //声明死信队列
  12.                 String deadQueue = "dead-queue";
  13.                 channel.queueDeclare(deadQueue, false, false, false, null);
  14.                 //死信队列绑定死信交换机与 routingkey
  15.                 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
  16.                 //正常队列绑定死信队列信息
  17.                 Map<String, Object> params = new HashMap<>();
  18.                 //正常队列设置死信交换机 参数 key 是固定值
  19.                 params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  20.                 //正常队列设置死信 routing-key 参数 key 是固定值
  21.                 params.put("x-dead-letter-routing-key", "lisi");
  22.                 String normalQueue = "normal-queue";
  23.                 channel.queueDeclare(normalQueue, false, false, false, params);
  24.                 channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
  25.                 System.out.println("等待接收消息.....");
  26.                 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  27.                         String message = new String(delivery.getBody(), "UTF-8");
  28.                         System.out.println("Consumer01 接收到消息"+message);
  29.                 };
  30.                 channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
  31.                 });
  32.         }
  33. }
复制代码

消耗者2代码(以上步调完成后 启动 C2 消耗者 它消耗死信队列里面的消息)
  1. public class Consumer02 {
  2.         private static final String DEAD_EXCHANGE = "dead_exchange";
  3.         public static void main(String[] argv) throws Exception {
  4.                 Channel channel = RabbitUtils.getChannel();
  5.                 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  6.                 String deadQueue = "dead-queue";
  7.                 channel.queueDeclare(deadQueue, false, false, false, null);
  8.                 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
  9.                 System.out.println("等待接收死信队列消息.....");
  10.                 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  11.                         String message = new String(delivery.getBody(), "UTF-8");
  12.                         System.out.println("Consumer02 接收死信队列的消息" + message);
  13.                 };
  14.                 channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
  15.                 });
  16.         }
  17. }
复制代码

2.1.2流程

详细由于队列消息过期而被投递到死信队列的流程:

2.2 队列达到最大长度

2.2.1演示

生产者:
  1. public class Producer {
  2.         private static final String NORMAL_EXCHANGE = "normal_exchange";
  3.                 public static void main(String[] argv) throws Exception {
  4.                         try (Channel channel = RabbitMqUtils.getChannel()) {
  5.                                 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  6.                                 //该信息是用作演示队列个数限制
  7.                                 for (int i = 1; i <11 ; i++) {
  8.                                         String message="info"+i;
  9.                                         channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
  10.                                         System.out.println("生产者发送消息:"+message);
  11.                                 }
  12.                         }
  13.                 }
  14.         }
  15. }
复制代码
消耗者1修改以下代码(启动之后关闭该消耗者 模拟其接收不到消息)

注意此时需要把原先队列删除 由于参数改变了
消耗者2代码稳定(启动 C2 消耗者)

2.2.2流程

消息高出队列最大消息长度而被投递到死信队列的流程在前面的图中已包含。
2.3消息被拒

消息生产者代码同上生产者同等
消耗者1代码(启动之后关闭该消耗者 模拟其接收不到消息)
  1. public class Consumer01 {
  2.         //普通交换机名称
  3.         private static final String NORMAL_EXCHANGE = "normal_exchange";
  4.         //死信交换机名称
  5.         private static final String DEAD_EXCHANGE = "dead_exchange";
  6.         public static void main(String[] argv) throws Exception {
  7.                 Channel channel = RabbitUtils.getChannel();
  8.                 //声明死信和普通交换机 类型为 direct
  9.                 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  10.                 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  11.                 //声明死信队列
  12.                 String deadQueue = "dead-queue";
  13.                 channel.queueDeclare(deadQueue, false, false, false, null);
  14.                 //死信队列绑定死信交换机与 routingkey
  15.                 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
  16.                 //正常队列绑定死信队列信息
  17.                 Map<String, Object> params = new HashMap<>();
  18.                 //正常队列设置死信交换机 参数 key 是固定值
  19.                 params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  20.                 //正常队列设置死信 routing-key 参数 key 是固定值
  21.                 params.put("x-dead-letter-routing-key", "lisi");
  22.                 String normalQueue = "normal-queue";
  23.                 channel.queueDeclare(normalQueue, false, false, false, params);
  24.                 channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
  25.                 System.out.println("等待接收消息.....");
  26.                 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  27.                         String message = new String(delivery.getBody(), "UTF-8");
  28.                         if(message.equals("info5")){
  29.                                 System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
  30.                                 //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
  31.                                 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
  32.                         }else {
  33.                                 System.out.println("Consumer01 接收到消息"+message);
  34.                                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  35.                         }
  36.                 };
  37.                 boolean autoAck = false;
  38.                 channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
  39.                 });
  40.         }
  41. }
复制代码

消耗者代码稳定
启动消耗者 1 然后再启动消耗者 2

3. 耽误队列

耽误队列存储的对象是对应的耽误消息;所谓“耽误消息” 是指当消息被发送以后,并不想让消耗者立刻拿到消息,而是等待特定时间后,消耗者才能拿到这个消息举行消耗。
在RabbitMQ中耽误队列可以通过 过期时间 + 死信队列 来实现;详细如下流程图所示:

在上图中;分别设置了两个5秒、10秒的过期队列,然后等到时间到了则会自动将这些消息转移投递到对应的死信队列中,然后消耗者再从这些死信队列接收消息就可以实现消息的耽误接收。
耽误队列的应用场景;如:


  • 在电商项目中的支付场景;如果在用户下单之后的几十分钟内没有支付成功;那么这个支付的订单算是支付失败,要举行支付失败的非常处理(将库存加归去),这时候可以通过使用耽误队列来处理。
  • 在系统中如有需要在指定的某个时间之后执行的任务都可以通过耽误队列处理。
4. 消息确认机制

确认并且保证消息被送达,提供了两种方式:发布确认和事件。(两者不可同时使用)在channel为事件时,不可引入确认模式;同样channel为确认模式下,不可使用事件。
4.1 发布确认

有两种方式:消息发送成功确认和消息发送失败回调。
4.1.1消息发送成功确认

在设置文件当中需要添加:
  1. spring.rabbitmq.publisher-confirm-type=correlated
复制代码
⚫ NONE
禁用发布确认模式,是默认值
⚫ CORRELATED
发布消息成功到互换器后会触发回调方法
⚫ SIMPLE
经测试有两种效果:
其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
消息确认回调方法:
  1. public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
  2.     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  3.         if (ack) {
  4.             System.out.println("消息确认成功....");
  5.         } else {
  6.             //处理丢失的消息
  7.             System.out.println("消息确认失败," + cause);
  8.         }
  9.     }
  10. }
复制代码
发送消息:
  1. @Test
  2.     public void queueTest(){
  3.         //路由键与队列同名
  4.         rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息。");
  5.     }
复制代码
管理界面确认消息发送成功:

消息确认回调:

4.1.2消息发送失败回调

消息失败回调方法:
  1. @Component
  2. public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
  3.         @Autowired
  4.         private RabbitTemplate rabbitTemplate;
  5.         //rabbitTemplate 注入之后就设置该值
  6.         @PostConstruct
  7.         private void init() {
  8.                 rabbitTemplate.setConfirmCallback(this);
  9.                 /**
  10.                 * true:
  11.                 * 交换机无法将消息进行路由时,会将该消息返回给生产者
  12.                 * false:
  13.                 * 如果发现消息无法进行路由,则直接丢弃
  14.                 */
  15.                 rabbitTemplate.setMandatory(true);
  16.                 //设置回退消息交给谁处理
  17.                 rabbitTemplate.setReturnCallback(this);
  18.         }
  19.     public void returnedMessage(Message message, int i, String s, String s1, String s2) {
  20.         String msgJson  = new String(message.getBody());
  21.         System.out.println("Returned Message:"+msgJson);
  22.     }
  23. }
复制代码
模拟消息发送失败:
  1. @Test
  2. public void testFailQueueTest() throws InterruptedException {
  3.     //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
  4.     amqpTemplate.convertAndSend("test_fail_exchange", "", "测试消息发送失败进行确认应答。");
  5. }
复制代码

4.2 事件支持

场景:业务处理伴随消息的发送,业务处理失败(事件回滚)后要求消息不发送。rabbitmq 使用调用者的外部事件,通常是首选,由于它是非侵入性的(低耦合)。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张国伟

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表