笑看天下无敌手 发表于 2024-12-4 01:38:14

如何使用RabbitMq来实现死信队列


[*]起首什么是MQ消息中心件
           全称MessageQueue,主要是⽤于步伐和步伐直接通讯,异步+解耦                使⽤场景:                   核⼼应⽤                   解耦:订单系统-》物流系统                   异步:⽤户注册-》发送邮件,初始化信息                   削峰:秒杀、⽇志处理                   跨平台 、多语⾔                   分布式事务、最终⼀致性                   RPC调⽤上卑鄙对接,数据源变动->关照部属            总而言之 可以对各种场景进行异步校验,应用广泛可以保持链路的完备性   

[*]交换机类型
[*]Direct Exchange 定向       将⼀个队列绑定到交换机上,要求该消息与⼀个特         定的路由键完全匹配         例⼦:假如⼀个队列绑定到该交换机上要求路由键         “aabb”    ,则只有被标记为    “aabb”    的消息才被转发,         不会转发    aabb.cc    ,也不会转发    gg.aabb    ,只会转发         aabb         处理路由健
[*]       Fanout Exchange ⼴播         只必要简单的将队列绑定到交换机上,⼀个发送到         交换机的消息都会被转发到与该交换机绑定的所有         队列上。很像⼦⽹⼴播,每台⼦⽹内的主机都获得         了⼀份复制的消息         Fanout    交换机转发消息是最快的,⽤于发布订阅,         ⼴播形式,中⽂是扇形    不处理路由健
[*]       Topic Exchange 通配符         主题交换机是⼀种发布    /    订阅的模式,结合了直连交         换机与扇形交换机的特点         将路由键和某模式进⾏匹配。此时队列必要绑定要         ⼀个模式上         符号    “#”    匹配⼀个或多个词,符号    “*”    匹配不多不少⼀         个词         例⼦:因此    “abc.#”    可以大概匹配到    “abc.def.ghi”    ,但是         “abc.*”   只会匹配到    “abc.def”    。
[*]       Headers Exchanges(少⽤)         根据发送的消息内容中的    headers    属性进⾏匹配    ,   在         绑定    Queue    与    Exchange    时指定⼀组键值对         当消息发送到    RabbitMQ    时会取到该消息的    headers         与    Exchange    绑定时指定的键值对进⾏匹配;         假如完全匹配则消息会路由到该队列,否则不会路         由到该队列         不处理路由键
[*]       什么是    TTL         time to live   消息存活时间         假如消息在存活时间内未被消耗,则会别扫除         RabbitMQ    ⽀持两种    ttl    设置         单独消息进⾏配置    ttl         整个队列进⾏配置    ttl    (居多)
[*]       什么是rabbitmq    的死信队列         没有被实时消耗的消息存放的队列
[*]       什么是rabbitmq    的死信交换机         Dead Letter Exchange    (死信交换机,缩写:    DLX    )当         消息成为死信后,会被重新发送到另⼀个交换机,这个         交换机就是    DLX死信交换机。
[*]       下面就是实现的大概流程:    https://i-blog.csdnimg.cn/direct/03ae7cf2329440178fe78886925715d2.png
   例子:
RabbitMq不自带死信队列,那么我们创建好交换机,延迟队列设置过期时间为15秒,对延迟队列和交换机以及死信队列(平凡队列)和交换机进行绑定 ,同时设置交换机为Topic主题交换机。随后消耗者通过RabbitMqListener进行监听,同时使用ACK进行标记的确认。

@Configuration
@Data
public class RabbitMqConfig {

    /**
   * 交换机
   */
    @Value("${mqconfig.coupon_event_exchange}")
    private String eventExchange;
    /**
   * 第⼀个队列延迟队列,
   */

    @Value("${mqconfig.coupon_release_delay_queue}")
    private String couponReleaseDelayQueue;
    /**
   * 第⼀个队列的路由key
   * 进⼊队列的路由key
   */

    @Value("${mqconfig.coupon_release_delay_routing_key}")
    private String couponReleaseDelayRoutingKey;
    /**
   * 第⼆个队列,被监听恢复库存的队列
   */
    @Value("${mqconfig.coupon_release_queue}")
    private String couponReleaseQueue;
    /**
   * 第⼆个队列的路由key
   * <p>
   * 即进⼊死信队列的路由key
   */

    @Value("${mqconfig.coupon_release_routing_key}")
    private String couponReleaseRoutingKey;
    /**
   * 过期时间
   */
    @Value("${mqconfig.ttl}")
    private Integer ttl;

    /**
   * 消息转换器
   *
   * @return
   */
    @Bean
    public MessageConverter messageConvertor() {
      return new Jackson2JsonMessageConverter();
    }

    /**
   * 交换机
   *
   * @return
   */
    @Bean
    public TopicExchange exchange() {
      return new TopicExchange(eventExchange, true, false);
    }

    /**
   * 创建延迟队列
   *
   * @return
   */
    @Bean
    public Queue couponReleaseDelayQueue() {
      Map<String, Object> args = new HashMap<>();
      args.put("x-message-ttl", ttl);
      args.put("x-dead-letter-exchange", eventExchange);
      args.put("x-dead-letter-routing-key", couponReleaseRoutingKey);

      return new Queue(couponReleaseDelayQueue, true, false, false, args);
    }

    /**
   * 创建死信队列   普通队列用于监听
   *
   * @return
   */
    @Bean
    public Queue couponReleaseQueue() {
      return new Queue(couponReleaseQueue, true, false, false);
    }


    /**
   * 绑定延迟队列和交换机
   *
   * @return
   */
    @Bean
    public Binding couponReleaseDelayBinding() {
      return new Binding(couponReleaseDelayQueue, Binding.DestinationType.QUEUE,
                eventExchange,couponReleaseDelayRoutingKey,null);
    }

    /**
   * 绑定死信队列队列和交换机
   * @return
   */
    @Bean
    public Binding couponReleaseDeadBinding() {
      return new Binding(couponReleaseQueue, Binding.DestinationType.QUEUE,
                eventExchange, couponReleaseRoutingKey, null);
    }

}
新建测试:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CouponApplication.class)
@Slf4j
public class MQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate1;
    @Test
    public void send(){

      rabbitTemplate1.convertAndSend("coupon.event.exchange","coupon.release.delay.routing.key","5qeqweqw");

    }
}
http://127.0.0.1:15672/#进入RabbitMq后台进行查察 
配置文件配置如下:
#消息队列
rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    password: guest
    username: guest
    #开启⼿动确认消息
    listener:
      simple:
      acknowledge-mode: manual
mqconfig:
   #延迟队列,不能被监听消费
    coupon_release_delay_queue: coupon.release.delay.queue
    #延迟队列的消息过期后转发的队列
    coupon_release_queue: coupon.release.queue
    #交换机
    coupon_event_exchange: coupon.event.exchange

    #进⼊延迟队列的路由key
    coupon_release_delay_routing_key: coupon.release.delay.routing.key

    #消息过期,进⼊释放死信队列的key
    coupon_release_routing_key: coupon.release.routing.key

    #消息过期时间,毫秒,测试改为15秒
    ttl: 15000监听者进行监听,同时使用ACK进行标记的确认。代码如下:
@Slf4j
@Component
@RabbitListener(queues = "${mqconfig.coupon_release_queue}")
public class CouponMQListener {

    @Autowired
    private CouponRecordService couponRecordService;
    @Autowired
    private RedissonClient redissonClient;

    @RabbitHandler
    public void ReleaseCouponRecord(CouponRecordMessage recordMessage, Message message, Channel channel) throws IOException {
    log.info("收到传递的消息:{}",recordMessage);
      long tag = message.getMessageProperties().getDeliveryTag();
      boolean flag =couponRecordService.ReleaseCouponRecord(recordMessage);
//      RLock lock = redissonClient.getLock("coupon:lock:release:" + recordMessage.getTaskId());
//      lock.lock();
      try{
            if(flag){
                channel.basicAck(tag,false);
                log.info("释放优惠券成功:{}",recordMessage);
            }else {
                log.error("释放优惠券失败:{}",recordMessage);
                channel.basicReject(tag,true);
            }
      } catch (Exception e) {
            log.error("释放优惠券异常:{}",e.getMessage());
            channel.basicReject(tag,true);
      }
//      finally {
//            lock.unlock();
//      }
    }tag后面的参数是是否重入队列。可以通过下面的网页使用配置的账号和密码进行查察
http://127.0.0.1:15672/#/

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 如何使用RabbitMq来实现死信队列