RabbitMQ中间件

打印 上一主题 下一主题

主题 518|帖子 518|积分 1554

RabbitMQ

配置环境

安装 erlang环境以及RabbitMQ

RabbitMQ端口号: 5672
去官网下载 https://www.rabbitmq.com
然后重启RabbitMQ服务 RabbitMQ安装教程
开放端口15672
这里,通过http://IP地址:15672 进行Web页面登录,输入账号密码(默认都是guest),完成页面访问。至此,全部安装结束。
导入依赖
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
配置相关信息

RabbitMQ的端口号是什么?

5672 :这是rabbitMQ的端口号;
15672 :这是那个RabbitMQ的web页面的端口号;
  1. spring.application.name=spirng-boot-rabbitmq
  2. spring.rabbitmq.host=127.0.0.1 ##主机ip
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.virtual-host=MmHost
  5. spring.rabbitmq.username=root
  6. spring.rabbitmq.password=root
复制代码
  1. spring:
  2.     rabbitmq:
  3.         username: root
  4.         password: root
  5.         addresses: 127.0.0.1:5672
  6.         cache:
  7.         connection:
  8.         #Cache connection mode, with default connections and multiple channels
  9.         mode: channel
  10.         #Multiple connections, multiple channels
  11.         # mode: connection
复制代码
  1. # rabbitmq
  2. server:
  3.   port: 8080
  4. spring:
  5.   #给项目来个名字
  6.   application:
  7.     name: rabbitmq-consumer
  8.   #配置rabbitMq 服务器
  9.   rabbitmq:
  10.     host: 127.0.0.1
  11.     port: 5672
  12.     username: root
  13.     password: root
  14.     #虚拟host 可以不设置,使用server默认host
  15.     virtual-host: MmHost
复制代码
发送消息
  1. @Component
  2. public class SenderTest{
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     @Test
  6.     public void Send() {
  7.         // 队列名称
  8.         String queueName = "ThisKey";
  9.         // 消息
  10.         String message = "Hello, Spring AMQP!";
  11.         // 发送消息
  12.         rabbitTemplate.convertAndSend(queueName, message);
  13.     }
  14. }
复制代码
rabbitTemplate.convertAndSend(key, message);
接收消息
  1. @Component
  2. public class SpringRabbitMQListener {
  3.     @RabbitListener(queues = "ThisKey")
  4.     public void listenSimpleQueueMsg(String msg){
  5.         System.out.println(msg);
  6.     }
  7. }
复制代码
  1. @Component
  2. //指定所监听的队列
  3. @RabbitListener(queues = "ThisKey")
  4. public class SpringRabbitMQListener {
  5.     //指定用来处理接收消息的方法
  6.     @RabbitHandler
  7.     public void listenSimpleQueueMsg(String msg){
  8.         System.out.println(msg);
  9.     }
  10. }
复制代码
注意:此处消息被消费后,对应的ThisKey中的消息就消失了。
原文链接 去的去看看

RabbitMQ-基础使用(Spring AMQP) - 简书 (jianshu.com)
如果使用其他交换机,则需要进行相关配置

可以看这篇文章:SpringBoot整合RabbitMQ
1、创建对应的配置文件
例如Direct交换机
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * @Author : JCccc
  9. * @CreateTime : 2019/9/3
  10. * @Description :
  11. **/
  12. @Configuration
  13. public class DirectRabbitConfig {
  14.     //队列 起名:TestDirectQueue
  15.     @Bean
  16.     public Queue TestDirectQueue() {
  17.         return new Queue("TestDirectQueue",true);
  18.     }
  19.     //Direct交换机 起名:TestDirectExchange
  20.     @Bean
  21.     DirectExchange TestDirectExchange() {
  22.         return new DirectExchange("TestDirectExchange");
  23.     }
  24.     //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  25.     @Bean
  26.     Binding bindingDirect() {
  27.         return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  28.     }
  29. }
复制代码
如何保证消息的可靠?

ack应答
消息应答

概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会导致消息丢失。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
各种消息模型实例

五种交换机类型

Direct Exchange

直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
Fanout Exchange

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
Topic Exchange

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
其他两种:WorkQueue(通过监听队列名称)
基本消息队列BasicQueue即为上方的代码,此处不再重复。
- 1、WorkQueue(通过监听队列名称)


WorkQueue.png
WorkQueue与BasicQueue不同之处,就是WorkQueue支持一对多发布消息(不是一个消息发给多个消费者,一个消息只会被一个消费者消费),多个消费者可以提高消息消费速度,当然相同之处也是消息消费后就会从Queue中消失(后续的几种模型都是如此)。
① 模拟消息堆积
[code]    // 队列名称    String queueName = "simple.queue";    // 消息    String message = "Message_";    for (int i = 1; i  积分折扣 ----> 消费券 ----> 短信验证 ----->支付完成,用户需要等待每个业务执行完毕才能支付成功!假设我们从点击支付 -----> 支付成功消耗时间为100/ms,后面我们每新增一个业务就会多耗时50/ms,上述的流程大概会耗时250/ms!如果说以后业务更多的话,那么用户支付订单的时间会越来越长,这样大大影响了用户的体验!参照下图理解
</p>我们使用消息中间件进行异步处理,当用户下单支付同时我们创建消息队列进行异步的处理其它业务,在我们支付模块中最重要的是用户支付,我们可以将一些不重要的业务放入消息队列执行,这样可以大大添加我们程序运行的速度,用户支付模块中也大大减少了支付时间,为用户添加了更好的体验。其它模块与其思想一致,就比如说用户注册!
2、流量削峰

假设我们有一个订单系统,我们的订单系统最大承受访问量是每秒1万次,如果说某天访问量过大我们的系统承受不住了,会对服务器造成宕机,这样的话我们的系统就瘫痪了,为了解决该问题我们可以使用中间件对流量进行消峰
未加入中间件之前,用户直接访问的是订单系统

加入中间件之后,用户直接访问的是中间件,通过中间件对用户进行消峰,好处是可以避免系统的宕机瘫痪,坏处是系统速度变慢,但是总比不能使用好

3、应用解耦

我们以商城项目为例,订单系统耦合调用支付、库存、物流系统,如果某天其中一个系统出现了异常就会造成订单系统故障!使用中间件后订单系统通过队列去访问支付、库存、物流系统就不会造成上述的问题,因为订单系统执行完成才会发消息给队列,接下来的任务就交给队列完成,队列会监督各个系统完成,如果完不成队列会一直监督,直到完成为止!所以说使用中间件后不会造成一个子系统出现故障而造成整个系统故障


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天津储鑫盛钢材现货供应商

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表