RabbitMQ 学习笔记

种地  金牌会员 | 2024-5-10 08:03:50 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 942|帖子 942|积分 2826

为什么使用消息队列?

以用户下单购买商品的行为举例,在使用微服务架构时,我们需要调用多个服务,传统的调用方式是同步调用,这会存在一定的性能问题

使用消息队列可以实现异步的通信方式,相比于同步的通信方式,异步的方式可以让上游快速成功,极大提高系统的吞吐量

消息队列的使用场景有如下:

  • 异步处理:以上述用户下单购买商品为例,将多个不关联的任务放进消息队列,提高系统性能
  • 应用解耦:以上述用户下单购买商品为例,订单系统通知库存系统减库存,传统的做法是订单系统调用库存系统的接口,订单系统和库存系统高耦合,当库存系统出现故障时,订单就会失败。使用消息队列,用户下单后,订单系统完成持久化,将消息写入消息队列,返回用户下单成功。库存系统订阅下单消息,获取下单消息,进行减库存操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致系统异常
  • 流量削峰:举行秒杀活动时,为防止流量过大导致应用挂掉,服务器收到用户请求后,先写入消息队列,如果超过了消息队列长度的最大值,则直接抛弃或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理,缓解短时间的流量高峰

RabbitMQ 安装

以 ubuntu 22.04.3 为例,参考 RabbitMQ 官网提供的安装脚本
  1. #!/bin/sh
  2. ## 要想安装最新版本的 rabbitmq,可以选择 Cloudsmith 存储库下载,为此我们必须安装 apt-transport https 包
  3. sudo apt-get install curl gnupg apt-transport-https -y
  4. ## 获取 Cloudsmith 存储库提供的签名密钥并添加到系统中,这样这样才能使用 Cloudsmith 仓库下载包
  5. curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
  6. curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
  7. curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null
  8. ## 将描述 RabbitMQ 和 Erlang 包存储库的文件放在 /etc/apt/sources.list.d/ 目录下
  9. sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
  10. ## Provides modern Erlang/OTP releases
  11. ##
  12. deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
  13. deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
  14. # another mirror for redundancy
  15. deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
  16. deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
  17. ## Provides RabbitMQ
  18. ##
  19. deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
  20. deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
  21. # another mirror for redundancy
  22. deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
  23. deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
  24. EOF
  25. ## 更新存储库索引
  26. sudo apt-get update -y
  27. ## 安装 erlang
  28. sudo apt-get install -y erlang-base \
  29.                         erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
  30.                         erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
  31.                         erlang-runtime-tools erlang-snmp erlang-ssl \
  32.                         erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
  33. ## 安装 rabbitmq
  34. sudo apt-get install rabbitmq-server -y --fix-missing
复制代码
生产者发送消息设置消息唯一标识
  1. sudo systemctl stop rabbitmq-server
  2. sudo systemctl start rabbitmq-server
  3. sudo systemctl status rabbitmq-server
复制代码
2. 消费者确认

RabbitMQ 支持消息确定 ACK 机制,消费者从 RabbitMQ 收到消息并处理完成后,返回给 RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列删除
RabbitMQ 的消息确认方式有两种:自动确认和手动确认
RabbitMQ 默认是自动确认,即消息推送给消费者后,马上确认并销毁,但假如消费消息的过程中发生了异常,由于消息已经销毁,这样就会造成消息丢失
手动确认又分为肯定确认和否定确认
肯定确认:
  1. rabbitmq-plugins enable rabbitmq_management
复制代码
否定确认:
  1. # rabbitmqctl add_user {username} {password}
  2. # 设置账号和密码
  3. rabbitmqctl add_user admin 123
  4. # rabbitmqctl set_user_tags {username} {role}
  5. # 设置角色,administrator 是管理员角色
  6. rabbitmqctl set_user_tags admin
  7. # rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}
  8. # 设置权限:
  9. # {vhost} 表示待授权用户访问的 vhost 名称,默认为 "/"
  10. # {user} 表示待授权访问特定 vhost 的用户名称
  11. # {conf} 表示待授权用户的配置权限,是一个匹配资源名称的正则表达式
  12. # {write} 表示待授权用户的写权限,是一个匹配资源名称的正则表达式
  13. # {read} 表示待授权用户的读权限,是一个资源名称的正则表达式
  14. rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
复制代码
Springboot 提供了三种确认模式,配置如下:
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
整合 SpringBoot 代码如下:
  1. spring:
  2.     rabbitmq:
  3.         host: 127.0.0.1
  4.         port: 5672
  5.         username: root
  6.         password: 123
复制代码
RabbitMQ 推拉模型

RabbitMQ 有两种消息处理模式:推模式和拉模式
推模式下,生产者发布消息到队列时,会立即将这条消息发送给所有订阅该队列的消费者,优点:实现实时通信,缺点:如果消费者的处理能力跟不上生产者的速度,就会在消费者处造成消息堆积,因此需要根据消费能力做流控(比如 RabbitMQ 用 QOS 来限制),RabbitMQ 默认使用推消息
拉模式下,生产者发布消息到队列时,不会立即发送消息给消费者,而是等待消费者请求消息后才发送,优点:消费端可以按照自己的处理速度来消费,缺点:消息传递存在延迟,当处理速度小于发布速度时,容易造成消息堆积在队列
SpringBoot 实现拉消息代码如下:
  1. @Configuration
  2. public class RabbitMqConfig {
  3.   @Bean
  4.   public Queue rabbitmqTestDirectQueue() {
  5.       // Direct 队列
  6.       // name:队列名称
  7.       // durable:是否持久化
  8.       // exclusive:是否独享,如果设置 true,则只有创建者可以使用此队列
  9.       // autoDelete: 是否自动删除,也就是临时队列,当最后一个消费者断开连接就会自动删除
  10.       return new Queue("test_direct_queue", true, false, false);
  11.   }
  12.   @Bean
  13.   public DirectExchange rabbitmqTestDirectExchange() {
  14.       // Direct 交换机
  15.       return new DirectExchange("test_direct_exchange", true, false);
  16.   }
  17.   @Bean
  18.   public Binding bindDirect() {
  19.       return BindingBuilder
  20.               // // 绑定 Direct 队列
  21.               .bind(rabbitmqTestDirectQueue())
  22.               //到 Direct 交换机
  23.               .to(rabbitmqTestDirectExchange())
  24.               // 并设置匹配键
  25.               .with("test_direct_routing");
  26.   }
  27.   @Bean
  28.   public Queue rabbitmqTestFanoutQueueA() {
  29.       // fanout 队列 a
  30.       return new Queue("test_fanout_queue_a", true, false, false);
  31.   }
  32.   @Bean
  33.   public Queue rabbitmqTestFanoutQueueB() {
  34.       // fanout 队列 b
  35.       return new Queue("test_fanout_queue_b", true, false, false);
  36.   }
  37.   @Bean
  38.   public FanoutExchange rabbitmqTestFanoutExchange() {
  39.       // Fanout 交换机
  40.       return new FanoutExchange("test_fanout_exchange", true, false);
  41.   }
  42.   @Bean
  43.   public Binding bindFanoutA() {
  44.       return BindingBuilder
  45.               // 绑定 Fanout 队列 a
  46.               .bind(rabbitmqTestFanoutQueueA())
  47.               //到 Fanout 交换机
  48.               .to(rabbitmqTestFanoutExchange());
  49.   }
  50.   @Bean
  51.   public Binding bindFanoutB() {
  52.       return BindingBuilder
  53.               // 绑定 Fanout 队列 b
  54.               .bind(rabbitmqTestFanoutQueueB())
  55.               //到 Fanout 交换机
  56.               .to(rabbitmqTestFanoutExchange());
  57.   }
  58.   @Bean
  59.   public Queue rabbitmqTestTopicQueueA() {
  60.       // topic 队列 a
  61.       return new Queue("test_topic_queue_a", true, false, false);
  62.   }
  63.   @Bean
  64.   public Queue rabbitmqTestTopicQueueB() {
  65.       // topic 队列 b
  66.       return new Queue("test_topic_queue_b", true, false, false);
  67.   }
  68.   @Bean
  69.   public TopicExchange rabbitmqTestTopicExchange() {
  70.       // Topic 交换机
  71.       return new TopicExchange("test_topic_exchange", true, false);
  72.   }
  73.   @Bean
  74.   public Binding bindTopicA() {
  75.       return BindingBuilder
  76.               // 绑定 Topic 队列 a
  77.               .bind(rabbitmqTestTopicQueueA())
  78.               //到 Topic 交换机
  79.               .to(rabbitmqTestTopicExchange())
  80.               // 并设置匹配键
  81.               .with("a.*");
  82.   }
  83.   @Bean
  84.   public Binding bindTopicB() {
  85.       return BindingBuilder
  86.               // 绑定 Topic 队列 b
  87.               .bind(rabbitmqTestTopicQueueB())
  88.               //到 Topic 交换机
  89.               .to(rabbitmqTestTopicExchange())
  90.               // 并设置匹配键
  91.               .with("b.*");
  92.   }
  93.   @Bean
  94.   public Queue rabbitmqTestHeadersQueueA() {
  95.       // headers 队列 a
  96.       return new Queue("test_headers_queue_a", true, false, false);
  97.   }
  98.   @Bean
  99.   public Queue rabbitmqTestHeadersQueueB() {
  100.       // headers 队列 b
  101.       return new Queue("test_headers_queue_b", true, false, false);
  102.   }
  103.   @Bean
  104.   public HeadersExchange rabbitmqTestHeadersExchange() {
  105.       // Headers 交换机
  106.       return new HeadersExchange("test_headers_exchange", true, false);
  107.   }
  108.   @Bean
  109.   public Binding bindHeadersA() {
  110.       Map<String, Object> map = new HashMap<>();
  111.       map.put("key_a1", "a1");
  112.       map.put("key_a2", "a2");
  113.       return BindingBuilder
  114.               // 绑定 Headers 队列 a
  115.               .bind(rabbitmqTestHeadersQueueA())
  116.               //到 Headers 交换机
  117.               .to(rabbitmqTestHeadersExchange())
  118.               // 全部匹配
  119.               .whereAll(map).match();
  120.   }
  121.   @Bean
  122.   public Binding bindHeadersB() {
  123.       Map<String, Object> map = new HashMap<>();
  124.       map.put("key_b1", "b1");
  125.       map.put("key_b2", "b2");
  126.       return BindingBuilder
  127.               // 绑定 Headers 队列 b
  128.               .bind(rabbitmqTestHeadersQueueB())
  129.               //到 Headers 交换机
  130.               .to(rabbitmqTestHeadersExchange())
  131.               // 部分匹配
  132.               .whereAny(map).match();
  133.   }
  134. }
复制代码
RabbitMQ 的 Channel 提供了 basicGet 方法用于拉取消息,第二个参数为是否自动 ack。这里我们需要手动调用 process 方法来拉取消息,一般来说会让一个线程负责循环拉取消息,存入一个长度有限的阻塞队列,另一个线程从阻塞队列取出消息,处理完一条则手动 Ack 一条。如果想批量拉取消息,可以连续调用 basicGet 方法拉取多条消息,处理完成之后一次性 ACK

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

种地

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表