Java_RabbitMQ
RabbitMQ是一个高性能的异步通讯组件。(同步通讯就像两个人打视频电话,实时传输数据,还不能有其他人再加入,异步通讯像微信发消息,不具备实时性,也能有其他人加入。)内容概述:
https://i-blog.csdnimg.cn/direct/71cb1bab3a86480d9f852281591a493b.png
初识MQ:
同步调用:
以余额付出服务为例:
https://i-blog.csdnimg.cn/direct/13973128afdf44bb8a762df30b1b2616.png
优势:
时效性强,等待到效果才返回
标题:
也存在一些标题,比如拓展性差、性能下降、级联失败标题(雪崩)。
异步调用:
https://i-blog.csdnimg.cn/direct/8e8731619cb046faba8ae1f93855f9fd.png
以余额付出服务为例:
https://i-blog.csdnimg.cn/direct/516f8b76471449ad9493a433fb0a0582.png
优势:
耦合度低,拓展性强
异步调用,无需等待,性能好
故障隔离,下游服务故障不影响上游服务
缓存消息,流量削峰填谷
标题:
不能立即得到调用效果,时效性差
不确定下游业务实行是否乐成
业务安全依赖于Broker的可靠性
技术选型:
了解常用消息队列(MQ):
https://i-blog.csdnimg.cn/direct/7bfaaa2148144534a3b7a125a485a3cc.png
本文主要解说RabbitMQ。
RabbitMQ:
RabbitMQ是基于Erlang语言开发的开源消息通讯中心件,官网地址:
RabbitMQ: One broker to queue them all | RabbitMQ
介绍:
https://i-blog.csdnimg.cn/direct/7b8891b31877453ab21b42c6c942646d.png
安装:
docker run \
-e RABBITMQ_DEFAULT_USER=han\
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network my-net\
-d \
rabbitmq:3.8-management利用:
打开RabbitMQ管理界面(通过ip与定义的端标语),可以让交换机绑定队列,可以利用交换机模拟发送消息,也可以在绑定的队列中查看发送的消息。
消息发送的注意事项:
https://i-blog.csdnimg.cn/direct/850257dc95314d8eb59c1816201a3a9f.png
数据隔离:
在RabbitMQ管理界面,点击Admin按钮,右侧选择Users,先新增一个用户:
https://i-blog.csdnimg.cn/direct/991bd9e822b7439f8ec41b7ea5ef2cb3.png
新增的用户是没有绑定虚拟主机的,也不能查看别的虚拟主机内的队列中的信息。
接下来新建虚拟主机,注意切换到刚刚新建的用户登录然后创建,这样用户直接绑定登录用户。
https://i-blog.csdnimg.cn/direct/194ab8091561445cac3350d1c6d12be3.png
Java客户端:
协议与规范:
https://i-blog.csdnimg.cn/direct/d16632fe03e94ee0a0792d6a31a75ed1.png
SpringAmqp官方地址:Spring AMQP
快速入门:
https://i-blog.csdnimg.cn/direct/aece78b6ee4a465898211aee95745600.png
1.创建队列:
https://i-blog.csdnimg.cn/direct/82bb103f27c34dd8ae9e62430999b950.png
2.引入依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>3.设置信息:
spring:
rabbitmq:
host: ***.***.***.*** # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码4.发送消息:
利用RabbitTemplate发送消息
@SpringBootTest
class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
//1.队列名
String queueName = "simple.queue";
//2.消息
String message = "Hello,Spring Amqp!";
//3.发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}5.吸收消息:
利用注解@RabbitListener声明要监听的队列,监听消息。
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
log.info("监听到simple.queue发送的消息: {}", message);
}
}
WorkQueue:
如果我们只简朴的定义两个consumer监听一个queue,那么当publisher发送大量数据时,这两个consumer是不能吸收到发送的全部消息的,比如consumer1会吸收到1,3,5……,consumer2会吸收到2,4,6……,即使两个consumer处置惩罚消息速度不同等,它们也会各自完成各自要吸收的消息,然后快的会等待慢的,服从很低
https://i-blog.csdnimg.cn/direct/06491d1872ad4bdd9c7a4965fe3a186b.png
消费者消息推送限定:
https://i-blog.csdnimg.cn/direct/e336943f4ec049ffa7ada5fcc7b4cbd1.png
这样设置完成之后,发送大量消息,两个consumer就会能者多劳,会一直处置惩罚消息不绝。
Work模子的利用:
https://i-blog.csdnimg.cn/direct/22b88a1d3ef14692bcce7e92d24adb0a.png
交换机:
https://i-blog.csdnimg.cn/direct/47c39791a68f46fcb4e1cc15e8ba630a.png
Fanout交换机:
https://i-blog.csdnimg.cn/direct/da3c34933ad34e44b69b26e515b7c657.png
演示:
定义两个消费者绑定不同队列
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
log.info("消费者1监听到fanout.queue1发送的消息: {}", message);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
log.info("消费者2监听到fanout.queue2发送的消息: {}", message);
}
}
编写测试类,通过fanout交换机发消息
@Test
public void testFanoutQueue() {
//1.交换机名
String exchangeName = "hmall.fanout";
//2.消息
String message = "Hello,everyone!";
//3.发送消息
rabbitTemplate.convertAndSend(exchangeName, null, message);
}效果:
https://i-blog.csdnimg.cn/direct/86b4b356737e424c80e24c60734ed55a.png
Direct交换机:
https://i-blog.csdnimg.cn/direct/23e144f54ff142d98e5127b66a3e8152.png
演示:
定义两个消费者绑定不同队列,绑定队列1的bindingkey为red和blue,队列2的bindingkey为red和yellow。
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String message) {
log.info("消费者1监听到direct.queue1发送的消息: {}", message);
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String message) {
log.info("消费者2监听到direct.queue2发送的消息: {}", message);
}
}
编写测试类,通过direct交换机发消息
@Test
public void testDirectQueue() {
//1.交换机名
String exchangeName = "hmall.direct";
//2.消息
String message = "Hello,blue!";
//3.发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}效果:
https://i-blog.csdnimg.cn/direct/818e38db6abd4f4a847d3b832c3a7273.png
与Fanout差异:
https://i-blog.csdnimg.cn/direct/8f249eb2014e459fbfaf3d754f6e0049.png
Topic交换机:
https://i-blog.csdnimg.cn/direct/b3f8095b704443878a2f0c0f7f35ddc7.png
演示:
定义两个消费者绑定不同队列,绑定队列1的bindingkey为china.#,队列2的bindingkey为#.news。
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String message) {
log.info("消费者1监听到topic.queue1发送的消息: {}", message);
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String message) {
log.info("消费者2监听到topic.queue2发送的消息: {}", message);
}
}
编写测试类,通过direct交换机发消息
@Test
public void testTopicQueue() {
//1.交换机名
String exchangeName = "hmall.topic";
//2.消息
String message = "Hello,Chinese news!";
//3.发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}效果:
https://i-blog.csdnimg.cn/direct/ef11d644aff7426b9c4fc9a2624b055f.png
与Direct差异:
https://i-blog.csdnimg.cn/direct/d341b91fad094d13b19759b40765c74a.png
代码声明队列交换机:
https://i-blog.csdnimg.cn/direct/9578190379ea4ce2a7fad67c50edb737.png
new方式:
https://i-blog.csdnimg.cn/direct/4caf2016fe45484f884ac9d79c041092.png
bulider方式:
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
}
@Bean
public Queue fanoutQueue() {
return QueueBuilder.durable("hmall.fanout").build();
}
@Bean
public Binding fanoutQueue1Binding(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
}注解方式:
https://i-blog.csdnimg.cn/direct/449c968a8f76443d82d9b9ba7ef58438.png
消息转换器:
https://i-blog.csdnimg.cn/direct/b35ba0e2a163455890040a9912974c47.png
https://i-blog.csdnimg.cn/direct/612b44d83c77422092521a5f046a3f49.png
这样通报的其他集合等复杂Object类对象信息可以转化为json范例,更简洁、易懂。
业务实例改造:
https://i-blog.csdnimg.cn/direct/b52e032c4f044181be472e6dae47177b.png
1.引入依赖:
在消费者和发送者的pom.xml文件都引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2.设置RabbitMQ信息:
在消费者和发送者中都需要设置,这里我把它设置到了nacos注册中心的共享设置中。
spring:
rabbitmq:
host: ***.***.***.***# 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码3.设置消息转换器
在消费者和发送者中都需要设置,我将它设置到了common模块下:
@Configuration
public class MqConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
} 由于在其他模块扫描不到该设置类,所以利用Spring自动装配原理,将这个类放入spring.factories文件中:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.hmall.common.config.MyBatisConfig,\
com.hmall.common.config.MvcConfig,\
com.hmall.common.config.MqConfig,\
com.hmall.common.config.JsonConfig4.编写消费者:
@Component
@RequiredArgsConstructor
public class PayStatusListener {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.pay.success.queue", durable = "true"),
exchange = @Exchange(name = "pay.direct", type = "direct"),
key = "pay.success"
))
public void listenPaySuccess(Long orderId) {
orderService.markOrderPaySuccess(orderId);
}
}5.编写发送者:
利用try,catch发送消息,这样不管消息发送是否乐成,都不会对原代码造成影响。
@Override
@Transactional
public void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {
// 1.查询支付单
PayOrder po = getById(payOrderFormDTO.getId());
// 2.判断状态
if (!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())) {
// 订单不是未支付,状态异常
throw new BizIllegalException("交易已支付或关闭!");
}
// 3.尝试扣减余额
userClient.deductMoney(payOrderFormDTO.getPw(), po.getAmount());
// 4.修改支付单状态
boolean success = markPayOrderSuccess(payOrderFormDTO.getId(), LocalDateTime.now());
if (!success) {
throw new BizIllegalException("交易已支付或关闭!");
}
// TODO 5.修改订单状态
// tradeClient.markOrderPaySuccess(po.getBizOrderNo());
try {
rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
} catch (Exception e) {
log.error("发送支付状态通知失败,订单id:{}", po.getBizOrderNo(), e);
}
}消息可靠性标题:
发送者的可靠性:
发送者重连:
https://i-blog.csdnimg.cn/direct/df3f6a94bae343ec8c961f1d2a150472.png
注意:
https://i-blog.csdnimg.cn/direct/489891e87ba34920b8b2979b5d3122bc.png
发送者确认:
https://i-blog.csdnimg.cn/direct/38b8013842b1482698b97067f6c15e61.png
如何实现:
https://i-blog.csdnimg.cn/direct/a1302341ebb4443e8278f6cb5445757b.png
https://i-blog.csdnimg.cn/direct/d60533976b4d44b19a12c236c9ad7b44.png
(利用@PostConstruct注解的方法必须是void范例,且不能有参数。这个方法会在对象创建并完成依赖注入后被自动调用。通常用于实行一些资源初始化、设置加载等操纵。)
https://i-blog.csdnimg.cn/direct/76d37293115b4812868429255681eae7.png
注意:
由于发送者确认需要与MQ举行通讯与确认,所以会大大的影响消息发送的服从,不保举打开。如果要利用,也不要让消息重发无限的重试,要限定重发次数。
MQ的可靠性:
https://i-blog.csdnimg.cn/direct/bb9bae919f584641b55d57f806648c66.png
数据长期化:
交换机长期化:
https://i-blog.csdnimg.cn/direct/7f97a21f3dac4b988abc25b2042d9d95.png
默认是长期的
队列长期化:
https://i-blog.csdnimg.cn/direct/2f935749c9294d549afdcbf9ef664dc2.png
和交换机长期化类似,而且默认也是长期的
消息长期化:
https://i-blog.csdnimg.cn/direct/332977ec28ff4c3599c4068dd4155b88.png
发送长期化的消息,不仅会生存到内存,还会写入磁盘中,即使MQ重启也不会消失,但是重启后非长期化的消息会消失。写代码利用Spring AMQP发送消息默认也是长期化的。
由于发送非长期化消息时,消息过多会写入磁盘,导致吸收消息的速度呈现波浪形,服从很低。而发送长期化消息会对每条消息都长期化,而且吸收速度可以一直处于大概峰值的位置,服从更高。
注意:
保举打开它们的长期化,可以大大提高服从和可靠性。
Lazy Queue:
由于发送长期化的消息,不仅会生存到内存,还会写入磁盘中,耗时较长,就会导致整体的并发能力下降,为了解决这个标题,MQ引入了Lazy Queue。
https://i-blog.csdnimg.cn/direct/51d1d9a12ec94d5bb8a48d9a61ce155b.png
如何利用:
https://i-blog.csdnimg.cn/direct/16732692dd864dcea802ab1a42eda923.png
总结:
RabbitMQ自身如何保证消息的可靠性:
https://i-blog.csdnimg.cn/direct/1c53af62d1fc458eafb68ca5d3d9bfaf.png
消费者的可靠性:
消费者确认机制:
https://i-blog.csdnimg.cn/direct/23b96e33312b43ad88713fc4036f6603.png
如何利用:
https://i-blog.csdnimg.cn/direct/2a43c2ac02024efeb78a6150e6d05f5e.png
注意如果是吸收数据后处置惩罚数据导致了业务异常,那么SpringAMQP是不会抛异常的,这种情况一般需要程序员自己编写代码让MQ返回nack或者reject。
失败重试计谋:
https://i-blog.csdnimg.cn/direct/ec4a0d056ee949c69137da3330959344.png
失败消息处置惩罚计谋:
https://i-blog.csdnimg.cn/direct/6f7c64c7264f47dea256b0666776f69f.png
https://i-blog.csdnimg.cn/direct/fac2308bcc3f42e9b82f768c0754f5b4.png
业务幂等性:
幂等:
https://i-blog.csdnimg.cn/direct/bf432ee913f74a008d646513518a08f7.png
解决方案:
1.唯一消息id:
https://i-blog.csdnimg.cn/direct/c8b07eb16f024a86a8090aa6e708e23c.png
2.业务判断:
https://i-blog.csdnimg.cn/direct/f2b7a9dcd3a7481ca20404ac68c6043e.png
耽误消息:
介绍:
耽误消息:发送者发送消息时指定一个时间,消费者不会立即收到消息,而是在指定时间之后才收到消息。
耽误任务:设置在肯定时间之后才实行的任务
https://i-blog.csdnimg.cn/direct/1ac4031cedf5453a84a7522729b18905.png
死信交换机:
https://i-blog.csdnimg.cn/direct/77888c2e399f43c4a449cf9658cd8ac9.png
代码声明死信交换机:
https://i-blog.csdnimg.cn/direct/45b909bfc4fc403db95059ad401f00e7.png
代码设置发送消息逾期时间:
https://i-blog.csdnimg.cn/direct/a97d3bd0f9574297bc7fc7359ad097f6.png
耽误消息插件:
https://i-blog.csdnimg.cn/direct/ad76d849cd764d1fb12dd7799179b2cc.png
下载网址:
rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)
安装:
由于我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
docker volume inspect mq-plugins效果如下:
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
] 插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
接下来实行下令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange利用:
起首将交换机的delay属性设为true
https://i-blog.csdnimg.cn/direct/0bc9cbd32367430093aa1fd546ed40e8.png
发送消息时需要通过x-delay来设置逾期时间
https://i-blog.csdnimg.cn/direct/7c3e7267159c4c259b5b283211b9a09a.png
注意:
由于每个耽误消息都有自己的计时器时钟,所以耽误消息过多对CPU压力会很大,我们应该尽量制止同一时间出现过多耽误消息,可以接纳镌汰耽误等待时间的方式,一般接纳10或15秒即10000或15000毫秒。
面试大概出现标题:
1.如何保证付出服务与交易服务之间的订单同等性?
https://i-blog.csdnimg.cn/direct/1bea4f47e1fa47ff84dff01df265eefd.png
2.如果交易服务消息处置惩罚失败,有什么兜底方案?
①消息重试:一种常见的方法是在消息处置惩罚失败后举行重试。您可以设置一个重试机制,例如在消息处置惩罚失败后将消息重新放回队列,让消费者再次实验处置惩罚。您可以设置最大重试次数,制止无限重试导致死循环。
②死信队列(Dead Letter Exchange):通过设置死信队列,可以将处置惩罚失败的消息路由到一个专门的队列中,以便进一步处置惩罚或分析失败的消息。当消息处置惩罚失败时,可以将消息发送到死信队列,然后根据需要举行处置惩罚。
③消息长期化:确保消息是长期化的,这样即使消费者在处置惩罚消息时发生故障,消息也不会丢失。消息长期化可以通过将消息标记为长期化并设置队列为长期化来实现。
④监控和报警:设置监控和报警体系来及时发现消息处置惩罚失败的情况。通过监控消息队列的状态、消费者的运行状况以及消息处置惩罚失败的次数,可以及时发现标题并采取步伐。
⑤人工干预:在极度情况下,如果消息处置惩罚失败的情况无法通过自动化本领解决,大概需要人工干预。在这种情况下,可以设置警报,关照相关人员参与处置惩罚。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! 更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]