1 RabbitMQ概述
RabbitMQ简易安装教程
- # 拉取镜像
- docker pull rabbitmq:3.13-management
- # -d 参数:后台运行 Docker 容器
- # --name 参数:设置容器名称
- # -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
- # -v 参数:卷映射目录
- # -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
- docker run -d \
- --name rabbitmq \
- -p 5672:5672 \
- -p 15672:15672 \
- -v rabbitmq-plugin:/plugins \
- -e RABBITMQ_DEFAULT_USER=guest \
- -e RABBITMQ_DEFAULT_PASS=123456 \
- rabbitmq:3.13-management
- docker exec -it 5129c41ad3d8 /bin/bash # 数字是rabbitmq的id,可以通过docker ps查看
- rabbitmq-plugins enable rabbitmq_management #启用 RabbitMQ Management 插件,使得你可以轻松地监控和管理 RabbitMQ 服务器
复制代码 访问登录:http://192.168.145.130:15672,账号密码就是上面指定的
1.1 消息队列
消息队列是实现应用程序之间通讯的中间件
消息队列的好处
- 消息的发送者和吸收者进行异步通讯
- 流量高峰保证服务稳定,消息队列可以暂存大量消息,到达流量削峰
- 扩展性高,可以水平扩展以支持更多的发送者和吸收者,相应地增加或减少资源处置惩罚(功能处置惩罚)
- 解耦:消息的发送者和吸收者只专注于消息,无需关系彼此细节
主流MQ对比
1.2 RabbitMQ体系结构
- Channel(信道):信道是生产者消费者和RabbitMQ服务器之间通讯的桥梁。全部的消息发布和消费都由信道来完成的
- 建立在TCP连接上的虚拟连接,答应在单个TCP连接上建立多个信道,从而实现多线程处置惩罚
- 每个线程对应一个信道,信道在RabbitMQ中具有唯一的ID,保证了信道的私有性
- 引入信道的概念是为了减少建立和烧毁TCP连接的开销,提高体系性能
- Exchange(交换机):负责吸收消息并根据路由键将消息转发到绑定的队列
- Queue(队列):队列是RabbitMQ中用于存储消息的容器,消息按照先辈先出的次序进行处置惩罚
- Virtual Host(虚拟主机):是RabbitMQ中的命名空间(明白为分组),用于隔离不同的环境或应用程序。每个虚拟主机都有自己的队列、交换机和绑定关系
- Broker(代理服务器):指RabbitMQ服务器本身,多个Broker组合成一个RabbitMQ集群
2 RabbitMQ工作模式
- 简单模式:生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息
- 工作队列模式:生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)
- 发布/订阅模式:扇出交换机吸收消息并将消息发送给全部订阅了该交换机的队列
- 消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)
- 消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列
项目导入依赖:接纳原生的方式,开辟中都是集成框架的
- <dependencies>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.20.0</version>
- </dependency>
- </dependencies>
复制代码 封装连接工具类:
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class ConnectionUtil {
- public static final String HOST_ADDRESS = "192.168.145.160";
-
- public static Connection getConnection() throws Exception {
-
- // 定义连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置服务地址
- factory.setHost(HOST_ADDRESS);
- // 端口
- factory.setPort(5672);
- //设置账号信息,用户名、密码、vhost
- factory.setVirtualHost("/");
- factory.setUsername("guest");
- factory.setPassword("123456");
- // 通过工程获取连接
- Connection connection = factory.newConnection();
- return connection;
- }
- }
复制代码 2.1 简单模式(Simple Queue)
生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息
生产者:发送消息
- public class Producer {
-
- public static void main(String[] args) throws Exception {
- // 获取连接
- Connection connection = ConnectionUtil.getConnection();
-
- // 创建频道
- Channel channel = connection.createChannel();
-
- // 声明(创建)队列
- // queue 参数1:队列名称
- // durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
- // exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
- // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
- // arguments 参数5:队列其它参数
- channel.queueDeclare("simple_queue", true, false, false, null);
-
- // 要发送的信息
- String message = "你好;小兔子!";
-
- // 参数1:交换机名称,如果没有指定则使用默认Default Exchange
- // 参数2:路由key,简单模式可以传递队列名称
- // 参数3:配置信息
- // 参数4:消息内容
- channel.basicPublish("", "simple_queue", null, message.getBytes());
-
- System.out.println("已发送消息:" + message);
-
- // 关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 运行效果:新增队列:simple_queue


消费者
- public class Consumer {
-
- public static void main(String[] args) throws Exception {
- // 获取连接
- Connection connection = ConnectionUtil.getConnection();
- // 创建Channel
- Channel channel = connection.createChannel();
-
- // 创建队列
- // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
- // 参数1. queue:队列名称
- // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在
- // 参数3. exclusive:是否独占。
- // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
- // 参数5. arguments:其它参数。
- channel.queueDeclare("simple_queue",true,false,false,null);
-
- // 接收消息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- // 回调方法,当收到消息后,会自动执行该方法
- // 参数1. consumerTag:标识
- // 参数2. envelope:获取一些信息,交换机,路由key...
- // 参数3. properties:配置信息
- // 参数4. body:数据
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("consumerTag:"+consumerTag);
- System.out.println("Exchange:"+envelope.getExchange());
- System.out.println("RoutingKey:"+envelope.getRoutingKey());
- System.out.println("properties:"+properties);
- System.out.println("body:"+new String(body));
- }
- };
- // 参数1. queue:队列名称
- // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
- // 参数3. callback:回调对象
- // 消费者类似一个监听程序,主要是用来监听消息
- channel.basicConsume("simple_queue",true,consumer);
- }
- }
复制代码 控制台打印:

消息被消费掉了,所以RabbitMQ服务器上没有了

2.2 工作队列模式(Work Queues)
生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)
生产者
- public class Producer {
-
- public static final String QUEUE_NAME = "work_queue";
-
- public static void main(String[] args) throws Exception {
-
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
-
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
-
- for (int i = 1; i <= 10; i++) {
- String body = i+"hello rabbitmq~~~";
- channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
- }
-
- channel.close();
- connection.close();
- }
- }
复制代码 发送消息:

消费者1:
- public class Consumer1 {
-
- static final String QUEUE_NAME = "work_queue";
-
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
-
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("Consumer1 body:"+new String(body));
- }
- };
- channel.basicConsume(QUEUE_NAME,true,consumer);
- }
- }
复制代码 消费者2:
- public class Consumer2 {
- static final String QUEUE_NAME = "work_queue";
-
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("Consumer2 body:"+new String(body));
- }
- };
- channel.basicConsume(QUEUE_NAME,true,consumer);
- }
- }
复制代码 注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序
运行结果:两个消费者竞争消息队列中消息
2.3 发布/订阅模式(Publish/Subscribe)
rabbitmq消息通讯过程:消息生产者将消息发送给交换机,由交换机处置惩罚消息。Exchange(交换机)只负责转发消息,不存储消息,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
常见的交换机类型
- Fanout Exchange(扇出交换机),将消息发送给全部绑定到交换机的队列
- Direct Exchange(直连交换机),把消息交给符合指定routing key的队列
- Topic Exchange(主题交换机),把消息交给符合routing pattern(路由模式)的队列
- Default Exchange(默认交换机),把消息发送给指定队列
发布/订阅模式:扇出交换机吸收消息并将消息发送给全部订阅了该交换机的队列

生产者:
- public class Producer {
- public static void main(String[] args) throws Exception {
- // 1、获取连接
- Connection connection = ConnectionUtil.getConnection();
- // 2、创建频道
- Channel channel = connection.createChannel();
-
- // 参数1. exchange:交换机名称
- // 参数2. type:交换机类型
- // DIRECT("direct"):定向
- // FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
- // TOPIC("topic"):通配符的方式
- // HEADERS("headers"):参数匹配
- // 参数3. durable:是否持久化
- // 参数4. autoDelete:自动删除
- // 参数5. internal:内部使用。一般false
- // 参数6. arguments:其它参数
- String exchangeName = "test_fanout";
-
- // 3、创建交换机
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
-
- // 4、创建队列
- String queue1Name = "test_fanout_queue1";
- String queue2Name = "test_fanout_queue2";
- channel.queueDeclare(queue1Name,true,false,false,null);
- channel.queueDeclare(queue2Name,true,false,false,null);
-
- // 5、绑定队列和交换机
- // 参数1. queue:队列名称
- // 参数2. exchange:交换机名称
- // 参数3. routingKey:路由键,绑定规则
- // 如果交换机的类型为fanout,routingKey设置为""
- channel.queueBind(queue1Name,exchangeName,"");
- channel.queueBind(queue2Name,exchangeName,"");
-
- String body = "日志信息:张三调用了findAll方法...日志级别:info...";
-
- // 6、发送消息
- channel.basicPublish(exchangeName,"",null,body.getBytes());
-
- // 7、释放资源
- channel.close();
- connection.close();
- }
- }
复制代码 消费者1:
- public class Consumer1 {
-
- public static void main(String[] args) throws Exception {
-
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue1Name = "test_fanout_queue1";
-
- channel.queueDeclare(queue1Name,true,false,false,null);
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");
- }
-
- };
- channel.basicConsume(queue1Name,true,consumer);
- }
- }
复制代码 消费者2
- public class Consumer2 {
-
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue2Name = "test_fanout_queue2";
- channel.queueDeclare(queue2Name,true,false,false,null);
-
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");
- }
- };
- channel.basicConsume(queue2Name,true,consumer);
- }
- }
复制代码 先启动两个消费者,再启动生产者发送消息
交换机和队列的绑定关系如下图所示:
发布订阅模式与工作队列模式的区别:
- 工作队列模式消息由默认交换机处置惩罚,发布订阅模式消息由指定交换机处置惩罚
- 监听同一个队列的消费端程序彼此之间是竞争关系
- 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能吸收到消息
2.4 路由模式(Routing)
消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)
当Direct交换机用相同的路由键routing key绑定多个队列,就会有广播效果(类似发布订阅)

生产者:
- public class Producer {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String exchangeName = "test_direct";
- // 创建交换机
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
- // 创建队列
- String queue1Name = "test_direct_queue1";
- String queue2Name = "test_direct_queue2";
- // 声明(创建)队列
- channel.queueDeclare(queue1Name, true, false, false, null);
- channel.queueDeclare(queue2Name, true, false, false, null);
- // 队列绑定交换机
- // 队列1绑定error
- channel.queueBind(queue1Name, exchangeName, "error");
- // 队列2绑定info error warning
- channel.queueBind(queue2Name, exchangeName, "info");
- channel.queueBind(queue2Name, exchangeName, "error");
- channel.queueBind(queue2Name, exchangeName, "warning");
- String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
- // 发送消息
- channel.basicPublish(exchangeName, "warning", null, message.getBytes());
- System.out.println(message);
- // 释放资源
- channel.close();
- connection.close();
- }
- }
复制代码 消费者1:
- public class Consumer1 {
-
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue1Name = "test_direct_queue1";
- channel.queueDeclare(queue1Name,true,false,false,null);
-
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- System.out.println("Consumer1 将日志信息打印到控制台.....");
- }
- };
- channel.basicConsume(queue1Name,true,consumer);
- }
- }
复制代码 消费者2:
- public class Consumer2 {
-
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String queue2Name = "test_direct_queue2";
- channel.queueDeclare(queue2Name,true,false,false,null);
-
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
- System.out.println("body:"+new String(body));
- System.out.println("Consumer2 将日志信息存储到数据库.....");
- }
- };
- channel.basicConsume(queue2Name,true,consumer);
- }
- }
复制代码 先启动两个消费者,再启动生产者
绑定关系:
消费者2担当到消息,消费者1没有消息
2.5 通配符模式(Topics)
消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列
(通配符规则:#:匹配零个或多个词,*:匹配一个词)
生产者:
- public class Producer {
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String exchangeName = "test_topic";
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
- String queue1Name = "test_topic_queue1";
- String queue2Name = "test_topic_queue2";
- channel.queueDeclare(queue1Name, true, false, false, null);
- channel.queueDeclare(queue2Name, true, false, false, null);
- // 绑定队列和交换机
- // 参数1. queue:队列名称
- // 参数2. exchange:交换机名称
- // 参数3. routingKey:路由键,绑定规则
- // 如果交换机的类型为fanout ,routingKey设置为""
- // routing key 常用格式:系统的名称.日志的级别。
- // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
- channel.queueBind(queue1Name, exchangeName, "#.error");
- channel.queueBind(queue1Name, exchangeName, "order.*");
- channel.queueBind(queue2Name, exchangeName, "*.*");
- // 分别发送消息到队列:order.info、goods.info、goods.error
- String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
- channel.basicPublish(exchangeName, "order.info", null, body.getBytes());
- body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
- channel.basicPublish(exchangeName, "goods.info", null, body.getBytes());
- body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
- channel.basicPublish(exchangeName, "goods.error", null, body.getBytes());
- channel.close();
- connection.close();
- }
- }
复制代码 消费者1监听消息队列1
- public class Consumer1 {
-
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String QUEUE_NAME = "test_topic_queue1";
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
-
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- }
- };
- channel.basicConsume(QUEUE_NAME,true,consumer);
- }
- }
复制代码 消费者2监听消息队列2
- public class Consumer2 {
-
- public static void main(String[] args) throws Exception {
- Connection connection = ConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- String QUEUE_NAME = "test_topic_queue2";
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
-
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("body:"+new String(body));
- }
- };
- channel.basicConsume(QUEUE_NAME,true,consumer);
- }
- }
复制代码 先启动两个消费者,接着启动生产者发送消息
3 RabbitMQ整合SpringBoot
项目根本四步骤根本步骤:建module,改POM,写YAML,主启动
3.1 @RabbitListener注解属性
- bindings属性:指定交换机和队列之间的绑定关系,指定当前方法要监听的队列
潜伏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们
- queues属性
- @RabbitListener(queues = {QUEUE_LINZHUOWEI})
复制代码
- 作用:指定当前方法要监听的队列
- 此时框架不会创建相关交换机和队列,必须提前创建好
3.2 消费者工程
- 建module:module06-boot-consumer
- 改POM
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>3.1.5</version>
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <!--引入web模块为了保证项目一直运行,持久监听消息队列消息-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- </dependencies>
复制代码 - 写YAML
- spring:
- rabbitmq:
- host: 192.168.145.130
- port: 5672
- username: guest
- password: 123456
- virtual-host: /
-
- logging:
- level:
- com.linzhuowei.mq.listener.MyMessageListener: info
复制代码 - 主启动:正常添加@SpringBootApplication
- 监听器:
- import lombok.extern.slf4j.Slf4j;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- @Slf4j
- public class MyMessageListener {
-
- public static final String EXCHANGE_DIRECT = "exchange.direct.order
- ";
- public static final String ROUTING_KEY = "order";
- public static final String QUEUE_NAME = "queue.order";
-
- @RabbitListener(bindings = @QueueBinding(
- //队列信息
- value = @Queue(value = QUEUE_NAME, durable = "true"),
- //交换机信息
- exchange = @Exchange(value = EXCHANGE_DIRECT),
- //路由键信息,赋值为字符串数组
- key = {ROUTING_KEY}
- ))
- public void processMessage(
- //对应消息数据本身,形参类型需要和发送消息的数据类型对应
- String data,
- //对应发送的消息本身,可以通过message获取消息数据,包括data
- Message message,
- //频道对象
- Channel channel) {
- log.info(data);
- }
-
- }
复制代码 运行查察后台管理:
 如图:交换机exchange.direct.order
通过order路由键绑定消息队列
题外话:
- 使用@RabbitListener的bindings属性能绑定交换机和队列的关系并监听队列消息,如果RabbitMQ服务中没有交换机和队列,则会自动创建该队列
- 使用@RabbitListener的queues属性,监听指定消息队列
所以如果只是单纯监听消息队列,不考虑交换机和队列的创建以及绑定(由于这些创建操纵可以在后台页面点击完成嘿嘿),消费者代码也可以这样写:
- @RabbitListener(queues = {QUEUE_LINZHUOWEI})
- public void processMessage( //对应消息数据本身,形参类型需要和发送消息的数据类型对应 String data, //对应发送的消息本身,可以通过message获取消息数据,包罗data Message message, //频道对象 Channel channel) { log.info(data);}
复制代码 但建议照旧写第一种
3.3 生产者工程
- 新建模块:module05-boot-producer
- 改POM
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>3.1.5</version>
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- </dependency>
- </dependencies>
复制代码 - 写YAML
- spring:
- rabbitmq:
- host: 192.168.145.130
- port: 5672
- username: guest
- password: 123456
- virtual-host: /
复制代码 - 主启动
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class RabbitMQProducerMainType {
- public static void main(String[] args) {
- SpringApplication.run(RabbitMQProducerMainType.class, args);
- }
- }
复制代码 - 测试程序
- @SpringBootTest
- public class RabbitMQTest {
-
- public static final String EXCHANGE_DIRECT = "exchange.direct.order
- ";
- public static final String ROUTING_KEY = "order";
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMessage() {
- rabbitTemplate.convertAndSend(
- EXCHANGE_DIRECT,
- ROUTING_KEY,
- "Hello linzhuowei");
- }
- }
复制代码 4 消息可靠性投递
4.1 什么是消息可靠投递?
消息可靠投递是确保消息从生产者发送到消息队列,再从消息队列消费到消费者的过程中,不丢失消息或重复处置惩罚消息
消息可靠投递主要三个方面:
- 消息的可靠发送(生产者 -> 消息队列)
- 消息的可靠存储(消息队列内部存储)
- 消息的可靠消费(消息队列 -> 消费者)
下面分别说这三个部分
4.2 消息的可靠发送
通过消息发送回调接口或备用交换机保证消息从生产者乐成发送到消息队列中
4.2.1 消息确认机制
应答确认+ 失败重试
生产者发送消息后等待消息队列的相应,确保消息乐成送达,如果发送失败可以尝试重新发送
①模块准备
- 新建模块:module07-confirm-producer
- 改POM
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>3.1.5</version>
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- </dependencies>
复制代码 - 主启动
- 写YAML:启用消息确认机制
- spring:
- rabbitmq:
- host: 192.168.145.130
- port: 5672
- username: guest
- password: 123456
- virtual-host: /
- publisher-confirm-type: CORRELATED # 交换机的确认
- publisher-returns: true # 队列的确认
- logging:
- level:
- com.linzhuowei.mq.config.MQProducerAckConfig: info
复制代码 ②配置类阐明
通过配置类设置RabbitTemplate的回调接口,通过回调方法获取RabbitMQ服务器返回的确认信息,实现消息确认机制
代码实现过程:配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中
两个接口两个回调方法,是否发送到交换机和是否发送到消息队列
方法名方法功能所属接口接口所属类confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplatereturnedMessage()确认消息是否发送到队列ReturnsCallbackRabbitTemplate
- ConfirmCallback接口(RabbitTemplate内部的接口)
- /**
- * A callback for publisher confirmations.
- *
- */
- @FunctionalInterface
- public interface ConfirmCallback {
- /**
- * Confirmation callback.
- * @param correlationData correlation data for the callback.
- * @param ack true for ack, false for nack
- * @param cause An optional cause, for nack, when available, otherwise null.
- */
- void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
- }
复制代码 生产者端发送消息之后,回调confirm()方法
- ack参数值为true:表示消息乐成发送到了交换机
- ack参数值为false:表示消息没有发送到交换机
- ReturnCallback接口(RabbitTemplate内部的接口)
- /**
- * A callback for returned messages.
- *
- * @since 2.3
- */
- @FunctionalInterface
- public interface ReturnsCallback {
- /**
- * Returned message callback.
- * @param returned the returned message and metadata.
- */
- void returnedMessage(ReturnedMessage returned);
- }
复制代码 接口中的returnedMessage()方法仅在消息没有发送到队列时调用
ReturnedMessage类中主要属性寄义如下:
属性名类型寄义messageorg.springframework.amqp.core.Message消息以及消息相关数据replyCodeint应答码,类似于HTTP相应状态码replyTextString应答码阐明exchangeString交换机名称routingKeyString路由键名称 ③配置类示例
配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中
- import jakarta.annotation.PostConstruct;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.ReturnedMessage;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- @Component
- @Slf4j
- public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @PostConstruct
- public void init() {
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnsCallback(this);
- }
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (ack) {
- log.info("消息发送到交换机成功!数据:" + correlationData);
- } else {
- log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);
- }
- }
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- log.info("消息发送到消息队列失败...");
- log.info("消息主体: " + new String(returned.getMessage().getBody()));
- log.info("应答码: " + returned.getReplyCode());
- log.info("描述:" + returned.getReplyText());
- log.info("消息使用的交换器 exchange : " + returned.getExchange());
- log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
- }
- }
复制代码 ④测试代码
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- @SpringBootTest
- public class RabbitMQTest {
-
- public static final String EXCHANGE_DIRECT = "exchange.direct.order
- ";
- public static final String ROUTING_KEY = "order";
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMessage() {
- rabbitTemplate.convertAndSend(
- EXCHANGE_DIRECT,
- ROUTING_KEY,
- "Hello atguigu");
- }
-
- }
复制代码 通过调整代码,测试如下三种环境:
- 交换机正确、路由键正确
- 交换机正确、路由键不正确,无法发送到队列
- 交换机不正确,无法发送到交换机
回顾交换机作用:吸收消息并路由到消息队列
4.2.2 备用交换机
①备用交换机配置
当消息在队列中未被处置惩罚时(如消息逾期、消息被拒绝或到达最大重试次数,无匹配队列等),这些消息就会转发到备用交换机
本次案例模拟交换机没有匹配的消息队列,消息转至备用交换机
- 首先创建备用交换机(扇出类型):
- exchange.direct.order
- .backup
复制代码
- 创建备用消息队列
- 将备用消息队列绑定备用交换机
\
- 重新创建原交换机(置顶备用交换机)
需要删除原来的直连交换机,重新创建直连交换机,并设置备用交换机
- exchange.direct.order
- .backup
复制代码
- 原交换机绑定原队列
②备用交换机测试
消息发送端:
- @SpringBootTest public class RabbitMQTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order
- "; //exchange.direct.order
- 交换机绑定的路由键为order,这里order路由键错误,会转到备用交换机 public static final String ROUTING_KEY = "order1"; @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY+"11", "Hello 备用交换机"); } }
复制代码 结果:消息发送乐成,消息先发往直连交换机exchange.direct.order
,由于路由键无效,没有匹配的消息队列,所以消息发往备用的扇出交换机exchange.direct.order
.backup
,终极发送到消息队列queue.test.backup
4.3 消息的可靠存储
通过将消息持久化到硬盘上防止消息队列宕机导致内存中消息丢失(交换机默认持久化,消息队列有指定也是默认持久化)
4.3.1 非持久化交换机和队列
即消息在内存存储,重启消息丢失
- 创建非持久化交换机
- 创建非持久化消息队列
- 绑定交换机和消息队列的关系
测试:发送消息后,队列乐成收到消息。
重启rabbitmq,内存的消息丢失,内存掉电装备
4.3.2 持久化交换机和消息队列
先来看卡监听消息队列的写法
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = QUEUE_NAME, durable = "true"),
- exchange = @Exchange(value = EXCHANGE_DIRECT),
- key = {ROUTING_KEY}
- ))
- public void processMessage(String dateString,
- Message message,
- Channel channel) {
- log.info(dateString);
- }
复制代码 关注@RabbitListener中,@QueueBinding中的value和exchange两个注解,分别是Queue和Exchange类型
①@Queue注解分析
@Queue注解抽出关注的部分
- @Target({})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface Queue {
- /**
- * Specifies if this queue should be durable.
- * By default if queue name is provided it is durable.
- * @return true if the queue is to be declared as durable.
- * @see org.springframework.amqp.core.Queue#isDurable()
- */
- String durable() default "";
- /**
- * Specifies if this queue should be auto deleted when not used.
- * By default if queue name is provided it is not auto-deleted.
- * @return true if the queue is to be declared as auto-delete.
- * @see org.springframework.amqp.core.Queue#isAutoDelete()
- */
- String autoDelete() default "";
- }
复制代码
- durable属性:By default if queue name is provided it is durable
- autoDelete属性:By default if queue name is provided it is not auto-deleted
翻译就是:只要消息队列指定,默认持久化且不自动删除
②@Exchange注解分析
@Exchange注解抽出有用的部分
- @Target({})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface Exchange {
- /**
- * @return false if the exchange is to be declared as non-durable.
- */
- String durable() default TRUE;
- /**
- * @return true if the exchange is to be declared as auto-delete.
- */
- String autoDelete() default FALSE;
- }
复制代码
- durable属性默认true:false if the exchange is to be declared as non-durable
- autoDelete属性默认false:true if the exchange is to be declared as auto-delete
交换机默认持久化
4.4 消息的可靠消费
消息确认机制
- 自动确认:消费者吸收消息后自动返回ACK确认,RabbitMQ删除消息。自动确认机制,消息处置惩罚失败会导致消息丢失(由于消息已删)
- 手动确认:消费者处置惩罚消息乐成后,显式发送ACK给消息队列,通知RabbitMQ消息乐成消费删除消息,消费者处置惩罚消息失败后,显示发送NACK给消息队列,通知RabbitMQ消息消费失败,执行相应的失败计谋。手动确认机制保证消息的可靠消费
4.4.1 模块准备
- POM
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>3.1.5</version>
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- </dependencies>
复制代码 - YAML:开启手动确认机制
- spring:
- rabbitmq:
- host: 192.168.145.130
- port: 5672
- username: guest
- password: 123456
- virtual-host: /
- listener:
- simple:
- acknowledge-mode: manual # 把消息确认模式改为手动确认
复制代码 - 主启动
- 消息监听:其实durable和autoDelete可以不设置,默认值就是这样的
- @Componentpublic class MyMessageListener { public static final String EXCHANGE_DIRECT = "exchange.direct.order
- "; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order"; // 修饰监听方法 @RabbitListener( // 设置绑定关系 bindings = @QueueBinding( // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"), // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"), // 配置路由键信息 key = {ROUTING_KEY} )) public void processMessage(String dataString, Message message, Channel channel) { }}
复制代码 4.4.2 手动确认思路
- 步骤1:YAML配置文件把消息确认模式改为手动确认
- 步骤2:调用Channel对象的方法返复书息
- ACK:Acknowledgement,表示消息处置惩罚乐成
- NACK:Negative Acknowledgement,表示消息处置惩罚失败
- Reject:拒绝,同样表示消息处置惩罚失败
- 步骤3:拒绝或者消息处置惩罚失败的后续操纵
- requeue为true:重新放回队列,重新投递,再次尝试
- requeue为false:不放回队列,不重新投递
①basicAck()方法
- 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端乐成消费,这样Broker就可以把消息删除了
- 参数列表:
参数名称寄义long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识boolean multiple取值为true:为小于、即是deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息 ②basicNack()方法
- 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操纵取决于参数requeue的值
- 参数列表:
参数名称寄义long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识boolean multiple取值为true:为小于、即是deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列 ③basicReject()方法
- 方法功能:根据指定的deliveryTag,对该消息表示拒绝
- 参数列表:
参数名称寄义long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列 basicNack()和basicReject()有啥区别?
- basicNack()有批量操纵
- basicReject()没有批量操纵
Fanout交换机,同一个消息广播到不同的队列,deliveryTag会重复吗?不会,deliveryTag在Broker范围内唯一
4.4.3 可靠消费代码
- import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component@Slf4jpublic class MyMessageListener { public static final String EXCHANGE_DIRECT = "exchange.direct.order
- "; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order"; // 修饰监听方法 @RabbitListener( // 设置绑定关系 bindings = @QueueBinding( // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"), // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"), // 配置路由键信息 key = {ROUTING_KEY} )) public void processMessage(String dataString, Message message, Channel channel) throws IOException { // 1、获取当前消息的 deliveryTag 值备用 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 2、正常业务操纵 log.info("消费端吸收到消息内容:" + dataString); // System.out.println(10 / 0); // 3、给 RabbitMQ 服务器返回 ACK 确认信息 channel.basicAck(deliveryTag, false); } catch (Exception e) { // 4、获取信息,看当前消息是否曾经被投递过 Boolean redelivered = message.getMessageProperties().getRedelivered(); if (!redelivered) { // 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次 channel.basicNack(deliveryTag, false, true); } else { // 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列 channel.basicReject(deliveryTag, false); } } }}
复制代码 4.5 消息可靠性投递架构
MQ是体系解耦利器,能很好解除消息发送者和消息吸收者之间的耦合。怎样保证消息可靠性投递?通过前面我们知道主要分消息可靠发送,消息可靠存储,消息可靠消费。这一末节我们用另一个角度分析
要保证消息可靠性投递,我们分上下两个半场
- 上半场123分别对应:1发送方调用主动API发送消息,2MQ服务端收到消息并将消息落库(持久化),3发送方收到回调ACK(确认消息乐成投递到MQ服务器)
timer起作用:步骤 3,如果发送方没有收到回调确认(好比服务端由于网络问题或者其他缘故起因未能正确发送 ACK),则发送方会启动一个定时器,尝试重新发送消息。如果多次发送失败(超时),发送方会向业务方回调发送失败,这通常是在重试机制到达最大次数或超时后触发的
- 下半场456分别对应:4消费端吸收消息处置惩罚业务逻辑,5吸收方(消费者)发送 ACK 回应消息处置惩罚乐成,6MQ服务端收到ACK并将库中的消息删除
timer起作用:步骤 5,消费者没有实时发送 ACK(好比消费者处置惩罚超时或发生了异常),MQ 服务端会启动定时器等待 ACK
如果 MQ 服务端在规定时间内没有收到消费者的 ACK,timer 会触发重试机制,可能重新将消息投递到消费者,只到确认消息被处置惩罚并收到 ACK 后,消息才会从 MQ 服务端的持久化存储中删除,以确保消息的可靠性
上下半场均有重发,重发计谋有定时重发(如每个10s重发直到超出次数)和指数退避(X秒重发,2X秒重发,4X秒重发)
综合来看关键点在于怎样保证消息幂等
- 上半场消息幂等:发送方没有收到回调ACK,会重新发送消息到MQ服务器。上半场的消息幂等性有MQ服务器完成,MQ会为每条消息生玉成局唯一的message ID用作去重和幂等依据(上半场消息幂等由MQ服务器完成无需关注)
- 下半场消息幂等:MQ服务端超时未收到ACK,导致MQ重复投递消息。业务方会收到重复消息,业务方需要保证消息幂等性。好比消息携带全局唯一id用于保证幂等,再处置惩罚消息前判定即可
5 消费端限流
使用消息队列的削峰限流,平滑流量制止大量哀求涌入,限制哀求数目,制止对后端服务造成过大的压力
常见的削峰限流计谋有:
通过prefetch来设置消费者**同时吸收未确认的消息的数目**,每次预取的消息数目来实现流量削峰
5.1 未设置prefetch
首先向消息队列中发送100个消息
- public void testSendMessage() {
- for (int i = 0; i < 100; i++) {
- rabbitTemplate.convertAndSend(
- EXCHANGE_DIRECT,
- ROUTING_KEY,
- "Hello atguigu" + i);
- }
- }
复制代码
消息消费者监听对应的消息队列
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = QUEUE_NAME, durable = "true"),
- exchange = @Exchange(value = EXCHANGE_DIRECT),
- key = {ROUTING_KEY}
- ))
- public void processMessage(String dataString,
- Message message,
- Channel channel) throws InterruptedException, IOException {
- // 正常业务操作
- log.info("消费端接收到消息内容:" + dataString);
- //如果不睡1秒,瞬间为0
- TimeUnit.SECONDS.sleep(1);
- //手动确认ACK
- channel.basicAck(deliveryTag, false);
- }
复制代码 显示结果:Ready直接为0,Unack和Total徐徐减少直到0
5.2 设置prefetch
修改YAML
- spring:
- rabbitmq:
- host: 192.168.145.130
- port: 5672
- username: guest
- password: 123456
- virtual-host: /
- listener:
- simple:
- acknowledge-mode: manual
- prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息(同时接收未确认的消息的数量)
复制代码 首先发送消息:
- public void testSendMessage() {
- for (int i = 0; i < 100; i++) {
- rabbitTemplate.convertAndSend(
- EXCHANGE_DIRECT,
- ROUTING_KEY,
- "Hello atguigu" + i);
- }
- }
复制代码
消息消费者监听对应的消息队列
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = QUEUE_NAME, durable = "true"),
- exchange = @Exchange(value = EXCHANGE_DIRECT),
- key = {ROUTING_KEY}
- ))
- public void processMessage(String dataString,
- Message message,
- Channel channel) throws InterruptedException, IOException {
- // 正常业务操作
- log.info("消费端接收到消息内容:" + dataString);
- //如果不睡1秒,瞬间为0
- TimeUnit.SECONDS.sleep(1);
- //手动确认ACK
- channel.basicAck(deliveryTag, false);
- }
复制代码 效果,监听者每次只取一个消息消费,同时未确认消息只有prefetch个
6 消息超时
设置逾期时间,消息高出逾期时间自动删除(更准确的说超时消息会变成死信)
可通过两个层面设置逾期时间
- 队列层面:设置队列的消息逾期时间,队列内的消息超出逾期时间自动删除
- 消息层面:设置详细某个消息的逾期时间,消息超出逾期时间自动删除
如果两个层面都有设置,以逾期时间短的为准
6.1 队列层面设置逾期时间
创建交换机
创建消息队列,并设置逾期时间10000毫秒
绑定交换机
发送消息,不启动消费端,等待消息逾期
6.2 消息层面设置逾期时间
MessagePostProcessor 是 Spring Framework 的接口,在消息发送前对消息进行处置惩罚和修改。通过接口MessagePostProcessor接口在消息层面设置逾期时间
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- @Test
- public void testSendMessageTTL() {
-
- // 1、创建消息后置处理器对象
- MessagePostProcessor messagePostProcessor = (Message message) -> {
- // 设定 TTL 时间,以毫秒为单位
- message.getMessageProperties().setExpiration("5000");
- return message;
- };
-
- // 2、发送消息
- rabbitTemplate.convertAndSend(
- EXCHANGE_DIRECT,
- ROUTING_KEY,
- "Hello linzhuowei", messagePostProcessor);
- }
复制代码 原来消息队列queue.test.timeout逾期时间10000毫秒,消息层面设置逾期时间5000毫秒,以短的逾期时间为尺度,发送消息,等待消息逾期
7 死信和死信队列
无法正常被消费的消息就称为死信
死信的缘故起因有三种(就是消息没有被正常消费):
- 拒绝:消费者拒绝消息,basicNack()/basicReject(),并且不把消息重新放回原目标队列(requeue=false)
- 超时:消息到达超时时间未被消费
- 溢出:队列中消息数目到达最大限制,根据队列先辈先出原理,后来再进入一条消息,队列中最早的消息会变成死信
死信的处置惩罚方式大抵三种:
- 丢弃:不处置惩罚,死信直接丢弃
- 入库:死信写入数据库,日后处置惩罚
- 监听:死信进入死信队列,消费端监听死信队列,做后序处置惩罚(通常接纳)
下面分别演示三种死信成因
7.1 准备工作
7.1.1 正常交换机和正常消息队列
- 正常交换机:exchange.normal.video
- 正常队列:queue.normal.video
- 正常路由键:routing.key.normal.video
- 创建正常交换机
- 创建正常队列,写好死信队列和死信交换机
- 绑定正常消息队列和正常交换机
完成设施后设置如下
7.1.2 死信交换机和死信队列
- 死信交换机:exchange.dead.letter.video
- 死信队列:queue.dead.letter.video
- 死信路由键:routing.key.dead.letter.video
- 创建死信交换机
- 创建死信队列
- 绑定死信队列和死信交换机
7.1.3 常量声明
- public static final String EXCHANGE_NORMAL = "exchange.normal.video";
- public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video";
- public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
- public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video";
- public static final String QUEUE_NORMAL = "queue.normal.video";
- public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
复制代码 7.2 死信–拒绝
- 发送端发送消息到正常交换机
- @Test
- public void testSendMessageButReject() {
- rabbitTemplate
- .convertAndSend(
- EXCHANGE_NORMAL,
- ROUTING_KEY_NORMAL,
- "★[normal]发送消息--正常交换机--正常消息队列...");
- }
复制代码 - 消费端监听正常消息队列和死信队列
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
- @Component
- @Slf4j
- public class DeadLetterListener {
- public static final String QUEUE_NORMAL = "queue.normal.video";
- public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
- @RabbitListener(queues = {QUEUE_NORMAL})
- public void processMessageNormal(Message message, Channel channel) throws IOException {
- // 消费端监听正常消息队列,接收并拒绝消息
- log.info("★[normal]接收消息,但拒绝消息且不重新放入队列...");
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
- }
- @RabbitListener(queues = {QUEUE_DEAD_LETTER})
- public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
- // 消费端监听死信队列,接收并成功消费消息
- log.info("★[dead letter]监听死信队列,接收到死信消息...");
- log.info("★[dead letter]dataString = " + dataString);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
复制代码 先启动消费端监听死信队列和正常队列,再向正常消息队列发送消息
过程:发送端将消息发送到正常消息队列,监听正常消息队列的消费者吸收消息并拒绝,消息通过死信交换机路由到死信队列,监听死信队列的消费者吸收并乐成消费。
正常消息队列:queue.normal.video,由于消息刚到达就被消费者吸收,所以Queued messages没有变化

同一时间,死信队列也是刚吸收消息就被消费端消费,所以Queued messages没有变化

消费端控制台打印:
- ★[normal]接收消息,但拒绝消息且不重新放入队列...
- ★[dead letter]监听死信队列,接收到死信消息...
- ★[dead letter]dataString = ★[normal]发送消息--正常交换机--正常消息队列...
复制代码 7.3 死信–超时和溢出
前面创建正常消息队列时就置顶了正常消息队列最大消息数为10(x-max-length=10)且最大生存时间为10s(x-message-ttl=10000)
先关闭消费者,向正常消息队列发送20条消息
- @Test
- public void testSendMessageButReject() {
- for (int i = 0; i < 20; i++) {
- rabbitTemplate
- .convertAndSend(
- EXCHANGE_NORMAL,
- ROUTING_KEY_NORMAL,
- "★[normal]发送消息--正常交换机--正常消息队列...");
- }
- }
复制代码
- 发送者发送20条消息(m1,m2,…,m19,m20)
- 前十条消息(m1,m2,…,m9,m10)正常进入消息队列,到达最大消息数
- 后十条消息(m11,m12,…,m19,m20)进入消息队列,根据队列先辈先出,前十条消息(m1,m2,…,m9,m10)溢出
- 这十条消息(m1,m2,…,m9,m10)通过死信交换机进入死信队列(对应死信队列第一个上坡)
- 后十条消息(m11,m12,…,m19,m20)高出10s未被消费,超时,后十条消息(m11,m12,…,m19,m20)也进入死信队列(对应死信队列第二个上坡)
消费者端省略,就照旧监听然后消费…
8 延时队列
延时队列有两种实现思路
- 借助超时时间+死信队列来实现延时队列
- 通过RabbitMQ插件来完成延时队列
插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
需要将插件放入rabbitmq中容器的?/plugins目录,我们来看看该目录映射到宿主机的哪个目录?
运行结果:
- "Mounts": [
- {
- "Type": "volume",
- "Name": "rabbitmq-plugin",
- "Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data",
- "Destination": "/plugins",
- "Driver": "local",
- "Mode": "z",
- "RW": true,
- "Propagation": ""
- },
- {
- "Type": "volume",
- "Name": "0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4",
- "Source": "/var/lib/docker/volumes/0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4/_data",
- "Destination": "/var/lib/rabbitmq",
- "Driver": "local",
- "Mode": "",
- "RW": true,
- "Propagation": ""
- }
- ],
复制代码 和容器内/plugins目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data
## 8.1 下载延迟插件
RabbitMQ社区插件:https://www.rabbitmq.com/community-plugins.html
延迟插件:
下载插件安装文件,并移动到对应目录
- wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
- mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data
复制代码 启用插件
- # 登录进入容器内部
- docker exec -it rabbitmq /bin/bash
- # rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- # 退出Docker容器
- exit
- # 重启Docker容器
- docker restart rabbitmq
复制代码 延迟插件启动乐成:
8.2 延迟插件的使用
8.2.1 生产者端
通过MessageProcessor来设置延迟时间
- @Test
- public void testSendDelayMessage() {
- rabbitTemplate.convertAndSend(
- EXCHANGE_DELAY,
- ROUTING_KEY_DELAY,
- "测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",
- messageProcessor -> {
- // 设置延迟时间:以毫秒为单位
- messageProcessor.getMessageProperties().setHeader("x-delay", "10000");
- return messageProcessor;
- });
- }
复制代码 8.2.2 消费者端
① ui界面创建延迟交换机和队列
使用插件创建交换机exchange.delay.happy
使用rabbitmq_delayed_message_exchange插件要求交换机type=x-delayed-message,并通过x-delayed-type设置交换机的类型(direct、fanout、topic),创建方式如下:
创建消息队列queue.delay.video并绑定exchange.delay.happy交换机
- @Component
- @Slf4j
- public class MyDelayMessageListener {
-
- public static final String QUEUE_DELAY = "queue.delay.video";
-
- @RabbitListener(queues = {QUEUE_DELAY})
- public void process(String dataString, Message message, Channel channel) throws IOException {
- log.info("[生产者]" + dataString);
- log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
复制代码 ②代码创建延迟交换机和队列
- @Component
- @Slf4j
- public class MyDelayMessageListener {
-
- public static final String EXCHANGE_DELAY = "exchange.delay.video";
- public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";
- public static final String QUEUE_DELAY = "queue.delay.video";
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"),
- exchange = @Exchange(
- value = EXCHANGE_DELAY,
- durable = "true",
- autoDelete = "false",
- type = "x-delayed-message",
- arguments = @Argument(name = "x-delayed-type", value = "direct")),
- key = {ROUTING_KEY_DELAY}
- ))
- public void process(String dataString, Message message, Channel channel) throws IOException {
- log.info("[生产者]" + dataString);
- log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
复制代码 8.2.3 效果展示
前面消息可靠投递中说过,消息发送后回调confirm(),而returnMessage()只有在消息发送失败才会回调,但是使用rabbitmq_delayed_message_exchange插件后,即使消息乐成发送到队列上,也会导致returnedMessage()方法执行(问题不大嘛)
消费端效果:
- [生产者]测试基于插件的延迟消息 [12:41:29]
- [消费者]12:41:39
复制代码 9 变乱消息
9.1 什么是变乱消息?
将生产者发送消息的操纵打包成一个原子操纵,要么全部乐成要么全部失败,通过变乱消息保证消息发送的原子性
RabbitMQ 的变乱消息有点类似 Spring 的变乱,分为开始变乱、提交变乱、回滚变乱。
- txSelect():开始变乱,使用 txSelect() 开启变乱。
- txCommit():提交变乱,如果 txCommit() 提交变乱乐成了,则消息肯定会发送到 RabbitMQ。
- txRollback():回滚变乱,如果在执行 txCommit() 之前 RabbitMQ 发生了异常,txRollback() 会捕获异常进行回滚。
RabbitMQ 发送变乱消息流程:txSelect开启变乱,消息发送到 RabbitMQ 缓存,接着 txCommit 提交变乱,txCommit乐成后则消息肯定发送到了 RabbitMQ。
如果在 txCommit 完成前出现任何异常,我们就捕获这个异常然后执行 txRollback 进行回滚操纵,整个过程跟 Spring 的变乱机制没太大的区别。因此,我们可以通过 RabbitMQ 变乱机制保证消息肯定可以发送乐成。
了解了 RabbitMQ 的变乱消息机制,接下来我们就分享两种方式来实现 RabbitMQ 变乱消息
9.2 Springboot发送变乱消息
9.2.1 准备工作
- 改pom.xml
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>3.1.5</version>
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- </dependencies>
复制代码 - 写yaml
- spring:
- rabbitmq:
- host: 192.168.200.100
- port: 5672
- username: guest
- password: 123456
- virtual-host: /
复制代码 - 主启动
- 变乱配置
- @Configuration
- @Data
- public class RabbitConfig {
- @Bean
- public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
- return new RabbitTransactionManager(connectionFactory);
- }
- @Bean
- public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setChannelTransacted(true);
- return rabbitTemplate;
- }
- }
复制代码 9.2.2 没有变乱消息
没有变乱消息,无法保证消息发送原子性
- @SpringBootTest
- @Slf4j
- public class RabbitMQTest {
- public static final String EXCHANGE_NAME = "exchange.tx.dragon";
- public static final String ROUTING_KEY = "routing.key.tx.dragon";
- @Resource
- private RabbitTemplate rabbitTemplate;
- @Test
- public void testSendMessageInTx() {
- // 1、发送第一条消息
- rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");
- // 2、抛出异常
- log.info("do bad:" + 10 / 0);
- // 3、发送第二条消息
- rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");
- }
- }
复制代码 抛出异常前的消息发送了,抛异常后的消息没有发送:
9.2.3 使用变乱消息
由于在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操纵需要使用@RollBack注解操控
- @Test
- @Transactional
- @Rollback(value = false)
- public void testSendMessageInTx() {
- // 1、发送第一条消息
- rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");
- log.info("do bad:" + 10 / 0);
-
- // 2、发送第二条消息
- rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
- }
复制代码
9.3 Channel发送变乱消息
工具类
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import lombok.extern.slf4j.Slf4j;
- @Slf4j
- public class RabbitMqUtil {
- public static Channel getChannel() {
- // 创建一个连接工厂,并设置MQ的相关信息
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("xxxxxx");
- factory.setUsername("xxx");
- factory.setPassword("xxx");
- factory.setVirtualHost("/xxx");
- Channel channel = null;
- try {
- // 创建连接
- Connection connection = factory.newConnection();
- // 获取信道
- channel = connection.createChannel();
- } catch (Exception e) {
- log.error("创建 RabbitMQ Channel 失败", e);
- e.printStackTrace();
- }
- return channel;
- }
- }
复制代码 Channel发送变乱消息
- import com.rabbitmq.client.Channel;
- import com.user.service.util.RabbitMqUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.annotation.Transactional;
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- @Slf4j
- @Component
- public class RabbitTransactionChannelProducer {
- @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
- public void sendTransactionaChannelMessage(String message) {
- //获取 Channel
- Channel channel = RabbitMqUtil.getChannel();
- try {
- //开启事务
- channel.txSelect();
- //发送消息
- channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "1").getBytes(StandardCharsets.UTF_8));
- channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "2").getBytes(StandardCharsets.UTF_8));
-
- //发送第三条消息之前模拟一个错误 我们看下前两条消息否回滚了
- //int a = 1 / 0;
-
- channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "3").getBytes(StandardCharsets.UTF_8));
- //提交事务
- channel.txCommit();
- } catch (Exception e) {
- //回滚事务
- try {
- channel.txRollback();
- } catch (IOException ex) {
- log.error("txRollback error", e);
- ex.printStackTrace();
- }
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (Exception e) {
- log.error("channel close error", e);
- e.printStackTrace();
- }
- }
- }
- }
复制代码 10 惰性队列
创建队列分两种:
- default默认消息队列:消息存储在内存,当队列内存限制触发才会将部分消息移到磁盘
- lazy惰性消息队列:消息尽可能地生存在磁盘,内存中只保持须要的元数据
惰性队列,将消息尽可能地生存在磁盘,减少内存的使用。有用防止由于队列消息过多导致的内存溢出,是处置惩罚需要处置惩罚大量消息但内存有限的场景。但是由于消息存于磁盘,生产者发送消息和消费者消费比普通队列慢,尤其在高吞吐场景
队列创建置顶模式方式有:使用队列计谋(建议)和设置queue.declare参数
如果计谋和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。
10.1 队列计谋设定
- # 登录Docker容器
- docker exec -it rabbitmq /bin/bash
- # 运行rabbitmqctl命令
- rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
复制代码 下令解读:
- rabbitmqctl下令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量
- set_policy是子下令,表示设置计谋
- Lazy是当前要设置的计谋名称,是我们自己自定义的,不是体系定义的
- "^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置
- '{“queue-mode”:“lazy”}'是一个JSON格式的参数设置指定了队列的模式为"lazy"
- –-apply-to参数指定该计谋将应用于队列(queues)级别
- 下令执行后,全部名称符合正则表达式的队列都会应用指定计谋,包罗将来新创建的队列
如果需要修改队列模式可以执行如下下令(不必删除队列再重修):
- rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues
复制代码 10.3 queue.declare参数设定
参数x-queue-mode设定队列创建模式,lazy和default(默认)
Java代码原生API设置方式:
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-queue-mode", "lazy");
- channel.queueDeclare("myqueue", false, false, false, args);
复制代码 Java代码注解设置方式:
- @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {
- @Argument(name = "x-queue-mode", value = "lazy")
- })
复制代码 11 优先级队列
优先级队列答应你根据消息的优先级来处置惩罚消息。消息默认先辈先出,通过设置不同的优先级值,消费者可以优先处置惩罚重要或紧急的消息,而延迟处置惩罚优先级较低的消息
11.1 准备工作
- 创建交换机:exchange.test.priority
- 创建消息队列:queue.test.priority
RabbitMQ消息优先级范围 1到255 ,建议使用 1到5(数字越大优先级越高)
通过设置 x-max-priority来指定消息队列的最大优先级,默以为0。而消息的优先级不能大于x-max-priority,所以使用优先级队列肯定要指定x-max-priority,这里指定为x-max-priority=10
- 改POM
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>3.1.5</version>
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- </dependencies>
复制代码 - YAML
- spring:
- rabbitmq:
- host: 192.168.200.100
- port: 5672
- username: guest
- password: 123456
- virtual-host: /
复制代码 - 主启动
11.2 使用优先级队列
不要启动消费者程序,让多条不同优先级的消息滞留在队列中
- 第一次发送优先级为1的消息
- 第二次发送优先级为2的消息
- 第三次发送优先级为3的消息
先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息
消息生产者:
- import jakarta.annotation.Resource;
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.boot.test.context.SpringBootTest;
- @SpringBootTest
- public class RabbitMQTest {
- public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
- public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
- @Resource
- private RabbitTemplate rabbitTemplate;
- @Test
- public void testSendMessage() {
- //第一次发送优先级为1的消息
- rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
- message.getMessageProperties().setPriority(1);
- return message;
- });
-
- //第二次发送优先级为2的消息
- //rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{
- // message.getMessageProperties().setPriority(2);
- // return message;
- //});
-
- //第三次发送优先级为3的消息
- //rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{
- // message.getMessageProperties().setPriority(3);
- // return message;
- //});
- }
- }
复制代码
消费端:
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.*;
- import org.springframework.stereotype.Component;
- @Slf4j
- @Component
- public class MyMessageProcessor {
- public static final String QUEUE_PRIORITY = "queue.test.priority";
- @RabbitListener(queues = {QUEUE_PRIORITY})
- public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
- log.info(data);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
复制代码 效果:
- I am a message with priority 3.
- I am a message with priority 2.
- I am a message with priority 1.
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |