如何使用RabbitMq来实现死信队列
[*]起首什么是MQ消息中心件
全称MessageQueue,主要是⽤于步伐和步伐直接通讯,异步+解耦 使⽤场景: 核⼼应⽤ 解耦:订单系统-》物流系统 异步:⽤户注册-》发送邮件,初始化信息 削峰:秒杀、⽇志处理 跨平台 、多语⾔ 分布式事务、最终⼀致性 RPC调⽤上卑鄙对接,数据源变动->关照部属 总而言之 可以对各种场景进行异步校验,应用广泛可以保持链路的完备性
[*]交换机类型
[*]Direct Exchange 定向 将⼀个队列绑定到交换机上,要求该消息与⼀个特 定的路由键完全匹配 例⼦:假如⼀个队列绑定到该交换机上要求路由键 “aabb” ,则只有被标记为 “aabb” 的消息才被转发, 不会转发 aabb.cc ,也不会转发 gg.aabb ,只会转发 aabb 处理路由健
[*] Fanout Exchange ⼴播 只必要简单的将队列绑定到交换机上,⼀个发送到 交换机的消息都会被转发到与该交换机绑定的所有 队列上。很像⼦⽹⼴播,每台⼦⽹内的主机都获得 了⼀份复制的消息 Fanout 交换机转发消息是最快的,⽤于发布订阅, ⼴播形式,中⽂是扇形 不处理路由健
[*] Topic Exchange 通配符 主题交换机是⼀种发布 / 订阅的模式,结合了直连交 换机与扇形交换机的特点 将路由键和某模式进⾏匹配。此时队列必要绑定要 ⼀个模式上 符号 “#” 匹配⼀个或多个词,符号 “*” 匹配不多不少⼀ 个词 例⼦:因此 “abc.#” 可以大概匹配到 “abc.def.ghi” ,但是 “abc.*” 只会匹配到 “abc.def” 。
[*] Headers Exchanges(少⽤) 根据发送的消息内容中的 headers 属性进⾏匹配 , 在 绑定 Queue 与 Exchange 时指定⼀组键值对 当消息发送到 RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进⾏匹配; 假如完全匹配则消息会路由到该队列,否则不会路 由到该队列 不处理路由键
[*] 什么是 TTL time to live 消息存活时间 假如消息在存活时间内未被消耗,则会别扫除 RabbitMQ ⽀持两种 ttl 设置 单独消息进⾏配置 ttl 整个队列进⾏配置 ttl (居多)
[*] 什么是rabbitmq 的死信队列 没有被实时消耗的消息存放的队列
[*] 什么是rabbitmq 的死信交换机 Dead Letter Exchange (死信交换机,缩写: DLX )当 消息成为死信后,会被重新发送到另⼀个交换机,这个 交换机就是 DLX死信交换机。
[*] 下面就是实现的大概流程: https://i-blog.csdnimg.cn/direct/03ae7cf2329440178fe78886925715d2.png
例子:
RabbitMq不自带死信队列,那么我们创建好交换机,延迟队列设置过期时间为15秒,对延迟队列和交换机以及死信队列(平凡队列)和交换机进行绑定 ,同时设置交换机为Topic主题交换机。随后消耗者通过RabbitMqListener进行监听,同时使用ACK进行标记的确认。
@Configuration
@Data
public class RabbitMqConfig {
/**
* 交换机
*/
@Value("${mqconfig.coupon_event_exchange}")
private String eventExchange;
/**
* 第⼀个队列延迟队列,
*/
@Value("${mqconfig.coupon_release_delay_queue}")
private String couponReleaseDelayQueue;
/**
* 第⼀个队列的路由key
* 进⼊队列的路由key
*/
@Value("${mqconfig.coupon_release_delay_routing_key}")
private String couponReleaseDelayRoutingKey;
/**
* 第⼆个队列,被监听恢复库存的队列
*/
@Value("${mqconfig.coupon_release_queue}")
private String couponReleaseQueue;
/**
* 第⼆个队列的路由key
* <p>
* 即进⼊死信队列的路由key
*/
@Value("${mqconfig.coupon_release_routing_key}")
private String couponReleaseRoutingKey;
/**
* 过期时间
*/
@Value("${mqconfig.ttl}")
private Integer ttl;
/**
* 消息转换器
*
* @return
*/
@Bean
public MessageConverter messageConvertor() {
return new Jackson2JsonMessageConverter();
}
/**
* 交换机
*
* @return
*/
@Bean
public TopicExchange exchange() {
return new TopicExchange(eventExchange, true, false);
}
/**
* 创建延迟队列
*
* @return
*/
@Bean
public Queue couponReleaseDelayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", ttl);
args.put("x-dead-letter-exchange", eventExchange);
args.put("x-dead-letter-routing-key", couponReleaseRoutingKey);
return new Queue(couponReleaseDelayQueue, true, false, false, args);
}
/**
* 创建死信队列 普通队列用于监听
*
* @return
*/
@Bean
public Queue couponReleaseQueue() {
return new Queue(couponReleaseQueue, true, false, false);
}
/**
* 绑定延迟队列和交换机
*
* @return
*/
@Bean
public Binding couponReleaseDelayBinding() {
return new Binding(couponReleaseDelayQueue, Binding.DestinationType.QUEUE,
eventExchange,couponReleaseDelayRoutingKey,null);
}
/**
* 绑定死信队列队列和交换机
* @return
*/
@Bean
public Binding couponReleaseDeadBinding() {
return new Binding(couponReleaseQueue, Binding.DestinationType.QUEUE,
eventExchange, couponReleaseRoutingKey, null);
}
}
新建测试:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CouponApplication.class)
@Slf4j
public class MQTest {
@Autowired
private RabbitTemplate rabbitTemplate1;
@Test
public void send(){
rabbitTemplate1.convertAndSend("coupon.event.exchange","coupon.release.delay.routing.key","5qeqweqw");
}
}
http://127.0.0.1:15672/#进入RabbitMq后台进行查察
配置文件配置如下:
#消息队列
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
password: guest
username: guest
#开启⼿动确认消息
listener:
simple:
acknowledge-mode: manual
mqconfig:
#延迟队列,不能被监听消费
coupon_release_delay_queue: coupon.release.delay.queue
#延迟队列的消息过期后转发的队列
coupon_release_queue: coupon.release.queue
#交换机
coupon_event_exchange: coupon.event.exchange
#进⼊延迟队列的路由key
coupon_release_delay_routing_key: coupon.release.delay.routing.key
#消息过期,进⼊释放死信队列的key
coupon_release_routing_key: coupon.release.routing.key
#消息过期时间,毫秒,测试改为15秒
ttl: 15000监听者进行监听,同时使用ACK进行标记的确认。代码如下:
@Slf4j
@Component
@RabbitListener(queues = "${mqconfig.coupon_release_queue}")
public class CouponMQListener {
@Autowired
private CouponRecordService couponRecordService;
@Autowired
private RedissonClient redissonClient;
@RabbitHandler
public void ReleaseCouponRecord(CouponRecordMessage recordMessage, Message message, Channel channel) throws IOException {
log.info("收到传递的消息:{}",recordMessage);
long tag = message.getMessageProperties().getDeliveryTag();
boolean flag =couponRecordService.ReleaseCouponRecord(recordMessage);
// RLock lock = redissonClient.getLock("coupon:lock:release:" + recordMessage.getTaskId());
// lock.lock();
try{
if(flag){
channel.basicAck(tag,false);
log.info("释放优惠券成功:{}",recordMessage);
}else {
log.error("释放优惠券失败:{}",recordMessage);
channel.basicReject(tag,true);
}
} catch (Exception e) {
log.error("释放优惠券异常:{}",e.getMessage());
channel.basicReject(tag,true);
}
// finally {
// lock.unlock();
// }
}tag后面的参数是是否重入队列。可以通过下面的网页使用配置的账号和密码进行查察
http://127.0.0.1:15672/#/
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]