题记:在Java微服务开辟中,对于一个功能须要调用另一个服务下的功能才华实现的情况,我们通常会使用异步调用取代同步调用,进而实现增强业务的可拓展性和实现故障隔离以及流量削峰填谷的目的。而消息队列就是异步调用的解决方案之一。不外在使用消息队列实现异步调用的时间,可能会出现消息无法传递到位进而导致业务信息出现差别的情况,因此消息的传递的可靠性就显得尤为紧张。
传递消息的流程:
要保障消息传递的可靠性,我们可以从消息队列的每一步分析,以RabbitMQ为例,其发送消息的流程大致如下图所示:
不难发现消息传递一共会履历三名角色,分别是消息发送者,MQ和消息接收者。因此我们要包管消息乐成传递并被精确处理就须要包管这三者的可靠性。
发送者可靠性:
1.1生产者重连
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的毗连停止。为了解决这个题目,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ毗连超时后,多次重试。
我们可以在消息发送者的yml设置文件中加入如下信息:
- spring:
- rabbitmq:
- connection-timeout: 1s # 设置MQ的连接超时时间
- template:
- retry:
- enabled: true # 开启超时重试机制,默认时关闭的
- initial-interval: 1000ms # 失败后的初始等待时间
- multiplier: 2 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
- max-attempts: 3 # 最大重试次数
复制代码 我们可以在消息发送者工程下创建一个Test模拟向rabbitMQ发送消息(在此之前我们须要把rabbiitMQ关闭或是断掉网络)
- @Test
- public void TestTimeOutPublish(){
- rabbitTemplate.convertAndSend("simple.queue","Hello,world");
- }
复制代码 代码执行后会是这样的效果:
通过日记我们可以观察到一共重连了三次,与我们在yml文件中设置的max-attempt属性一致。须要留意的是,这里等候的时间还须要再加上connect-timeout这一判断毗连超时的设置。
1.2生产者确认机制
在包管网络畅通而且RabbitMQ服务精确启动了的前提下,我们就可以乐成将消息发送到MQ中了。
不外仍有少数情况下会出现消息进入mq后丢失的情况:
- 无法找到指定的exchange,大部分情况下就是交换机名称堕落
- exchange无法精确路由到queue,可能是routeKey错误导致的
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
当我们在yml文件中增长了相应的设置后流程就变得如下图所示:
设置完成之后:
- 对于全部乐成投递如Mq中的消息都会返回Ack,体现投递乐成。
- 对于投递乐成但路由失败的消息,会返回Publish Return并返回Ack
- 除此之外的全部消息都返回NAck,体现消息投递失败
须要留意的是,对于暂时消息而言是进入队列Publisher Comfirm就是返回Ack,如果是长期消息则须要写入磁盘后才是返回Ack。
实现方式如下:
publisher消息发送者yml设置文件中加入:
- spring:
- rabbitmq:
- publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
- publisher-returns: true # 开启publisher return机制
复制代码 pubulisher-confirm-type一共有三种属性提供选择:
- none:默认选项,关闭
- correlated:异步回调返回回执
- simple:同步阻塞等候mq回执
而且我们还须要在设置类中设置confrim Return报文:
- public class MqConfig {
- private final RabbitTemplate rabbitTemplate;
- @PostConstruct //在注入rabbitTemplate依赖后执行
- public void init(){
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- log.error("触发return callback,");
- log.debug("exchange: {}", returned.getExchange());
- log.debug("routingKey: {}", returned.getRoutingKey());
- log.debug("message: {}", returned.getMessage());
- log.debug("replyCode: {}", returned.getReplyCode());
- log.debug("replyText: {}", returned.getReplyText());
- }
- });
- }
- }
复制代码 对于不同的业务我们有不同的处理方案,因此CallbackComfirm须要在每次发送方法前界说,而且由作为convertMessage方法的参数,如下所示:
- @Test
- void testPublisherConfirm() {
- // 1.创建CorrelationData
- CorrelationData cd = new CorrelationData();
- // 2.给Future添加ConfirmCallback
- cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
- @Override
- public void onFailure(Throwable ex) {
- // 2.1.Future发生异常时的处理逻辑,基本不会触发
- log.error("send message fail", ex);
- }
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
- if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
- log.debug("发送消息成功,收到 ack!");
- }else{ // result.getReason(),String类型,返回nack时的异常描述
- log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
- }
- }
- });
- // 3.发送消息
- rabbitTemplate.convertAndSend("simple.direct", "simple", "hello,RabbiteMQ", cd);
- }
复制代码 我们可以在onSuccess和onFailure中编写我们对这一消息发送乐成以及失败情况的对应处理。
如果关于上述有其他更好的发起以及疑问,接待留言,我将尽快回复。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |