MQ的作用
MQ主要是接收并转发消息,在差别的应用场景下可以展现差别的作用
- 1.异步解耦
- 一些操作比较耗时,但是不影响整体程序的执行,不需要及时返回的结果,就可以使用MQ把这些异步化。
- 2.流量削峰
- 在某些特殊情况下,流量突然增大,为了防止服务器的崩溃,就会使用MQ进行流量削峰进行流量控制,例如限时秒杀活动等
- 3.消息分发
- 当多个系统需要同一个数据时,可以将数据放入到MQ中,其他哪个服务器需要则直接进行订阅即可,无需多次访问数据库
- 4.延迟通知
- 在需要特定时间后发送通知的场景中,可以使用MQ延迟发送,例如用户下单长时间没有支付,到时间点之后就自动取消订单
复制代码 RabbitMQ焦点概念
- 端口号信息
- 5672 : 客户端和服务器建立连接时的端口号
- 15672 : 管理界面端口号
- 25672 : 集群使用的端口号
复制代码 - rabbitMQ工作流程
AMQP
AMQP是一个高级消息队列协议,AMQP界说了一套确定的消息互换功能,包罗互换器(Exchange)、队列(Queue)等,这些组件共同工作使得生产者能将消息发送到互换机(Exchange),然后由互换机准确的分发到队列(Queue)中。
RabbiteMQ是遵循了AMQP协议的,换句话说,RabbiteMQ就是AMQP协议的Erlang的实现,以是他们的结构是一样的。
RabbitMQ快速上手
引入依赖
- 中央仓库搜索amqp
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.21.0</version>
- </dependency>
复制代码 生产者(Producer)
- 1.建立连接
- 2.创建Channel
- 3.声明队列Queue
- 4.发送消息
- 5.释放资源
复制代码- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class ProducerDemo {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("47.122.121.191");
- factory.setPort(5672); // 需要提前开放端口号
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setVirtualHost("new virtual");
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.声明交换机 使用内置的交换机
- // 4.声明队列
- /**
- * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
- * Map<String, Object> arguments)
- * 参数说明 :
- * queue : 队列名称
- * durable : 是否持久化
- * exclusive : 是否独占
- * autoDelete : 是否自动删除
- * arguments : 参数
- */
- channel.queueDeclare("hello2",true,false,false,null);
- // 5.发送消息
- /**
- * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- * 参数说明 :
- * exchange : 交换机的名称
- * routingKey : 内置交换机,routingKey和队列名称保持一致,表示发送给哪个队列
- * props : 属性配置
- * body : 消息
- */
- String mes = "hello rabbitmq";
- channel.basicPublish("","hello2",null,mes.getBytes());
- // 6.资源释放
- channel.close();
- connection.close();
- }
- }
复制代码 斲丧者(Consumer)
- 1. 建立连接
- 2. 创建信道(Channel)
- 3. 声明队列(Queue)
- 4. 接收消息
- 5. 释放资源
复制代码- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class ConsumerDemo {
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- // 1.创建连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("47.122.121.191");
- factory.setPort(5672);
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setVirtualHost("new virtual");
- Connection connection = factory.newConnection();
- // 2.创建Channel
- Channel channel = connection.createChannel();
- // 3.声明队列(如果队列已经存在可省略,如果不存在省略此声明的话会报错,建议不要省略)
- channel.queueDeclare("hello2",true,false,false,null);
- // 4.消费消息
- /**
- * basicConsume(String queue, boolean autoAck, Consumer callback)
- * 参数说明:
- * queue:队列名称
- * autoAck:是否自动确认
- * callback:接收到消息之后,执行的逻辑
- */
- Consumer consumer = new DefaultConsumer(channel){
- // 从队列中收到消息,就会执行的方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // TODO
- System.out.println("接收到消息:" + new String(body));
- }
- };
- channel.basicConsume("hello2",true,consumer);
- // 等待程序执行完成
- Thread.sleep(2000);
- // 5.释放资源
- channel.close();
- connection.close();
- }
- }
复制代码 RabbitMQ模式先容
- 简单模式(Simple)
- 一个生产者和一个消费者,也称作点对点的模式,即消息只能被单个消费者处理
复制代码 - 工作队列(Work Queue)
- 一个生产者,多个消费者,工作队列会将消息分派给不同的消费者,每个消费者都会收到不同的消费信息
- 特点 : 消息不会重复,分配给不同消费者,常用于集群做异步处理
复制代码 - 发布/订阅模式(Publish/Subscribe)
- 发布/订阅模式多了一个交换机`Exchange`,特点是一个生产者多个消费者,并且每个消费者都会拿到所有的消费信息,而不是评分这些信息,常用于广播消息的通知
复制代码 - 路由模式(Routing)
- 路由模式也是多了一个交换机,特点是在发布/订阅模式基础上增加路由key(Routing key),只有和消息队列的key(Binding key)相互匹配的key才进行消息的转发,适用于需要根据特定规则分发消息的场景
复制代码 - 通配符模式(Topics)
- 通配符模式是路由模式的一个升级,和路由模式一样可以进行key的筛选,但是它支持通配符,只要是满足响应通配符的条件都可进行队列消息的转发,他比路由模式更加灵活
复制代码 - RPC通讯
- RPC通信没有生产者消费者,只有client和server,通过两个消息队列实现了一个可以回调的过程
复制代码 - 发布确认模式(Publisher Confirms)
- 发布确认模式是RabbitMQ提供的一种确保消息可靠发送到RabbitMQ服务器的机制。通过此模式,生产者每发布一条消息都会获得一个唯一的ID,生产者可以将这些序号与消息关联起来,方便跟踪。当RabbitMQ服务器收到消息之后,服务器会异步发送一个确认(ACK)给生产者,其中就包含这个ID,表明此消息已送达服务器。
- 注意:发布确认模式只能确保生产者和服务器的可靠性,而不是生产者和消费者的可靠性
复制代码 代码案例
- 工作队列
生产者
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.声明交换机 使用内置的交换机
- // 4.声明队列
- channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
- // 5.发送消息
- String msg = null;
- for (int i = 0; i < 20; i++) {
- msg = "hello work queue " + i;
- channel.basicPublish("",Constants.WORK_QUEUE,null,msg.getBytes());
- }
- // 6.资源释放
- channel.close();
- connection.close();
- System.out.println("发送完毕~");
- }
- }
复制代码 斲丧者1
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- // 1.创建连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.创建Channel
- Channel channel = connection.createChannel();
- // 3.声明队列(如果队列已经存在可省略,如果不存在省略此声明的话会报错,建议不要省略)
- channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
- // 4.消费消息
- Consumer consumer = new DefaultConsumer(channel){
- // 从队列中收到消息,就会执行的方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("Consumer1接收到消息:" + new String(body));
- }
- };
- channel.basicConsume(Constants.WORK_QUEUE,true,consumer);
- // // 5.释放资源(先启动消费者不进行资源释放,否则消费者无法长时间运行)
- // channel.close();
- // connection.close();
- }
- }
复制代码 斲丧者2
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- // 1.创建连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.创建Channel
- Channel channel = connection.createChannel();
- // 3.声明队列(如果队列已经存在可省略,如果不存在省略此声明的话会报错,建议不要省略)
- channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
- // 4.消费消息
- Consumer consumer = new DefaultConsumer(channel){
- // 从队列中收到消息,就会执行的方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("Consumer2接收到消息:" + new String(body));
- }
- };
- channel.basicConsume(Constants.WORK_QUEUE,true,consumer);
- // // 5.释放资源(先启动消费者不进行资源释放,否则消费者无法长时间运行)
- // channel.close();
- // connection.close();
- }
- }
复制代码 - 发布/订阅
生产者
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.声明交换机
- /**
- * exchangeDeclare(String exchange, 交换机名称
- * BuiltinExchangeType type, 交换机类型
- * boolean durable, 是否可持久化
- * boolean autoDelete, 是否自动删除
- * boolean internal, 是否内部使用,内部使用不能用客户端发送
- * Map<String, Object> arguments) 参数
- */
- channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true,false,false,null);
- // 4.声明队列
- channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
- channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
- // 5.绑定交换机(routingKey为空表示,接收所有的消息)
- channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
- channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
- // 6.发布消息
- String msg = "hello fanout ";
- for (int i = 0; i < 20; i++) {
- msg = msg + i;
- channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,msg.getBytes());
- }
- System.out.println("消息发送成功~");
- // 7.释放资源
- channel.close();
- connection.close();
- }
- }
复制代码 消息队列客户端表现,每一个绑定的消息队列都有20条消息

斲丧者1
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.声明队列
- channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
- // 4.消费信息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消费者1消费: " + new String(body));
- }
- };
- channel.basicConsume(Constants.FANOUT_QUEUE1,true,consumer);
- // 消费者不用写资源释放
- }
- }
复制代码 斲丧者2
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.声明队列
- channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
- // 4.消费信息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消费者2消费: " + new String(body));
- }
- };
- channel.basicConsume(Constants.FANOUT_QUEUE2,true,consumer);
- // 消费者不用写资源释放
- }
- }
复制代码 - 路由模式
生产者
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.创建信道
- Channel channel = connection.createChannel();
- // 3.声明交换机
- channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);
- // 4.声明队列
- channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
- channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
- // 5.关联交换机
- channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a",null);
- channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a",null);
- channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b",null);
- channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c",null);
- // 6.发送消息
- String msg = "hello direct, my routingKey is a...";
- channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,msg.getBytes());
- msg = "hello direct, my routingKey is b...";
- channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,msg.getBytes());
- msg = "hello direct, my routingKey is c...";
- channel.basicPublish(Constants.DIRECT_EXCHANGE,"c",null,msg.getBytes());
- // 7.释放资源
- channel.close();
- connection.close();
- System.out.println("消息发送完毕~");
- }
- }
复制代码 运行上述代码,并查看客户端可以看到互换机和队列的绑定关系,右侧是两个队列收到的消息

斲丧者1
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.声明队列
- channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
- // 4.消费信息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 打印接收消息
- System.out.println("接收到消息: " + new String(body));
- }
- };
- channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);
- // 消费者不需要释放资源,持续消费
- }
- }
复制代码 斲丧者2
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.声明队列
- channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
- // 4.消费信息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 打印接收消息
- System.out.println("接收到消息: " + new String(body));
- }
- };
- channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);
- // 消费者不需要释放资源,持续消费
- }
- }
复制代码 运行结果

- 通配符模式
生产者
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.创建信道
- Channel channel = connection.createChannel();
- // 3.声明交换机
- channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);
- // 4.声明队列
- channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
- channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
- // 5.关联交换机
- channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*",null);
- channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b",null);
- channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#",null);
- // 6.发送消息
- String msg = "hello topic, my routingKey is '*.a.*'...";
- channel.basicPublish(Constants.TOPIC_EXCHANGE,"ee.a.c",null,msg.getBytes()); // 满足Q1
- msg = "hello topic, my routingKey is '*.*.b'...";
- channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b",null,msg.getBytes()); // 满足Q1/Q2
- msg = "hello topic, my routingKey 'is c.#'...";
- channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.dd",null,msg.getBytes()); // 满足Q2
- // 7.释放资源
- channel.close();
- connection.close();
- System.out.println("消息发送完毕~");
- }
- }
复制代码 运行生产者后,客户端向队列发送消息情况,互换机和队列关系情况如下

斲丧者1
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.声明队列
- channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
- // 4.消费信息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 打印接收消息
- System.out.println("接收到消息: " + new String(body));
- }
- };
- channel.basicConsume(Constants.TOPIC_QUEUE1,true,consumer);
- // 消费者不需要释放资源,持续消费
- }
- }
复制代码 斲丧者2
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.声明队列
- channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
- // 4.消费信息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 打印接收消息
- System.out.println("接收到消息: " + new String(body));
- }
- };
- channel.basicConsume(Constants.TOPIC_QUEUE2,true,consumer);
- // 消费者不需要释放资源,持续消费
- }
- }
复制代码 运行情况如下

- RPC 模式
客户端(Client)
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.UUID;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.TimeoutException;
- /**
- * RPC client
- * 1. 发送请求
- * 2. 接收响应
- */
- public class RpcClient {
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3.使用内置交换机
- // 4.声明队列
- channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
- channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);
- // 5.发送请求
- String msg = "hello rpc...";
- // 设置请求的唯一标识
- String correlationID = UUID.randomUUID().toString();
- // 设置请求的相关属性
- AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
- .replyTo(Constants.RPC_RESPONSE_QUEUE)
- .correlationId(correlationID)
- .build();
- channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());
- // 5.接收响应
- // 使用阻塞队列来存储响应信息
- final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String resMeg = new String(body);
- System.out.println("接收到回调消息:" + resMeg);
- if (correlationID.equals(properties.getCorrelationId())) {
- // 如果两个correlationID相等,就表示是该请求返回的响应
- response.offer(resMeg);
- }
- }
- };
- channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);
- String result = response.take();
- System.out.println("[RPC Client 响应结果]" + result);
- }
- }
复制代码 服务端(Server)
- import com.rabbitmq.client.*;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * RPC server
- * 1. 接收请求
- * 2. 发送发送响应
- */
- public class RpcServer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- Connection connection = factory.newConnection();
- // 2.开启信道
- Channel channel = connection.createChannel();
- // 3. 接收请求
- channel.basicQos(1);
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String request = new String(body);
- System.out.println("接收到请求:" + request);
- // 针对服务器构造响应内容进行响应
- String response = "针对请求" + request +", 响应成功.";
- AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
- .correlationId(properties.getCorrelationId())
- .build();
- channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,props,response.getBytes());
- // 手动确认
- channel.basicAck(envelope.getDeliveryTag(),false);
- }
- };
- channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);
- }
- }
复制代码 运行结果

- 发布确认模式
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConfirmListener;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import rabbitmq.constants.Constants;
- import java.io.IOException;
- import java.util.Collections;
- import java.util.SortedSet;
- import java.util.TreeSet;
- public class PublisherConfirms {
- private static final int MESSAGE_COUNT = 2000;
- public static Connection createConnection() throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(Constants.HOST);
- factory.setPort(Constants.PORT);
- factory.setUsername(Constants.USERNAME);
- factory.setPassword(Constants.PASSWORD);
- factory.setVirtualHost(Constants.VIRTUAL_HOST);
- return factory.newConnection();
- }
- public static void main(String[] args) throws Exception {
- // 单独确认
- publishingMessagesIndividually();
- // 批量确认
- publishingMessagesInBatches();
- // 异步确认
- handlingPublisherConfirmsAsynchronously();
- }
- /**
- * 异步确认
- */
- private static void handlingPublisherConfirmsAsynchronously() throws Exception {
- try (Connection connection = createConnection()) {
- // 1.开启信道
- Channel channel = connection.createChannel();
- // 2.设置为confirm模式
- channel.confirmSelect();
- // 3.使用内置交换机,声明队列
- channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);
- // 4.监听confirm
- long startTime = System.currentTimeMillis();
- // 创建有序集合,元素按照自然排序,用来存储未被确认的confirm消息序号
- SortedSet<Long> confirmSetNo = Collections.synchronizedSortedSet(new TreeSet<>());
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- if (multiple) {
- // 如果开启了批量确认,则批量确认之后要清空有序集合已经确认好的元素
- confirmSetNo.headSet(deliveryTag+1).clear();
- } else {
- // 如果没有开,那么只需要清除当前的这一个
- confirmSetNo.remove(deliveryTag);
- }
- }
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- if (multiple) {
- // 如果开启了批量确认,则批量确认之后要清空有序集合已经确认好的元素
- confirmSetNo.headSet(deliveryTag+1).clear();
- } else {
- // 如果没有开,那么只需要清除当前的这一个
- confirmSetNo.remove(deliveryTag);
- }
- // 根据业务进行改进(重新发送消息)
- }
- });
- // 5.发送消息
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String msg = "hello publisher confirms " + i;
- // 拿到当前要发送的序号
- long seqNo = channel.getNextPublishSeqNo();
- channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
- // 还未确认,放入集合
- confirmSetNo.add(seqNo);
- }
- while (!confirmSetNo.isEmpty()) {
- Thread.sleep(20);
- }
- long endTime = System.currentTimeMillis();
- System.out.printf("异步确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, endTime - startTime);
- }
- }
- /**
- * 批量确认<br/>
- * 一次性确认一批
- */
- private static void publishingMessagesInBatches() throws Exception {
- try (Connection connection = createConnection()) {
- // 1.开启信道
- Channel channel = connection.createChannel();
- // 2.设置confirm模式
- channel.confirmSelect();
- // 3.使用内部交换机,声明队列
- channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
- // 4.发送消息并返回确认
- int batchSize = 100;
- int outstandingMessages = 0;
- long startTime = System.currentTimeMillis();
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String msg = "hello publisher confirms " + i;
- channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
- outstandingMessages++;
- // 批量确认
- if (outstandingMessages == batchSize) {
- channel.waitForConfirms(5000);
- outstandingMessages = 0;
- }
- }
- // 消息发送完毕之后如果有遗漏,进行确认
- if (outstandingMessages != 0) {
- channel.waitForConfirms(5000);
- }
- long endTime = System.currentTimeMillis();
- System.out.printf("批量确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, endTime - startTime);
- }
- }
- /**
- * 单独确认<br/>
- * 每发送一个就要进行确认
- */
- private static void publishingMessagesIndividually() throws Exception {
- try (Connection connection = createConnection()) {
- // 1.开启信道
- Channel channel = connection.createChannel();
- // 2.设置confirm模式
- channel.confirmSelect();
- // 3.使用内部交换机,声明队列
- channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
- // 4.发送消息并返回确认
- long startTime = System.currentTimeMillis();
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String msg = "hello publish confirms " + i;
- channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
- // 等待确认
- channel.waitForConfirms(5000);
- }
- long endTime = System.cur rentTimeMillis();
- System.out.printf("单独确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, endTime - startTime);
- }
- }
- }
复制代码 三种模式时间对比

SpringBoot集成RabbitMQ
引入依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
复制代码 引入yml设置信息
- spring:
- application:
- name: rabbitmq-springboot
- # 配置RabbitMQ基本信息
- rabbitmq:
- host: 47.122.121.191
- port: 5672
- username: xxxx
- password: xxxx
- virtual-host: new virtual
- # 日志日期格式
- logging:
- pattern:
- dateformat: yyyy-MM-dd HH:mm:ss
复制代码
- 工作队列
生产者
1.编写RabbitMQConfig
- // 创建队列交给spring管理
- @Configuration
- public class RabbitMQConfig {
- @Bean("workQueue")
- public Queue workQueue() {
- return QueueBuilder.durable(Constants.WORK_QUEUE).build();
- }
- }
复制代码 2.编写ProducerController
- @RestController
- @RequestMapping("/producer")
- public class ProducerController {
- @Autowired
- private RabbitTemplate rabbittemplate;
- @RequestMapping("/work")
- public String work() {
- for (int i = 0; i < 10; i++) {
- String msg = "hello spring amqp: work...";
- // 使用内置交换机,发送信息
- rabbittemplate.convertAndSend("", Constants.WORK_QUEUE,msg);
- }
- return "发送成功~";
- }
- }
复制代码 斲丧者
1.创建WorkListener
- @Component
- public class WorkListener {
- // 消费者1
- @RabbitListener(queues = Constants.WORK_QUEUE)
- public void queueListener1(Message message) {
- // 参数讲解
- // 1.String 只包含消息内容
- // 2.Message 包含所有的信息,包括消息ID、内容、队列信息等
- // 3.Channel RabbitMQ信道的所有信息,高级用法会用到,如手动确认
- System.out.println("[ listener1 "+Constants.WORK_QUEUE+"] 接收到消息:" + message);
- }
- // 消费者2
- @RabbitListener(queues = Constants.WORK_QUEUE)
- public void queueListener2(Message message) {
- System.out.println("[ listener2 "+Constants.WORK_QUEUE+"] 接收到消息:" + message);
- }
- }
复制代码 运行结果
- 发布/订阅模式
生产者
1.编写RabbitMQConfig
- // RabbitMQConfig 是用来创建队列和交换机 以及关联队列和交换机的一个工具类
- @Bean("fanoutQueue1")
- public Queue fanoutQueue1() {
- return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
- }
- @Bean("fanoutQueue2")
- public Queue fanoutQueue2() {
- return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
- }
- @Bean("fanoutExchange")
- public FanoutExchange fanoutExchange() {
- return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
- }
- // 绑定交换机和队列
- @Bean
- public Binding fanoutBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) {
- return BindingBuilder.bind(queue).to(fanoutExchange);
- }
- @Bean
- public Binding fanoutBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue2") Queue queue) {
- return BindingBuilder.bind(queue).to(fanoutExchange);
- }
复制代码 2.编写ProducerController
- @RequestMapping("/fanout")
- public String fanout() {
- rabbittemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp:fanout...");
- return "发送成功~";
- }
复制代码 运行之后客户端的绑定关系图和所创建的队列如下
斲丧者
1.编写FanoutListener
- @Component
- public class FanoutListener {
- // 消费者1
- @RabbitListener(queues = Constants.FANOUT_QUEUE1)
- public void queueListener1(String message) {
- System.out.println("listener1 [" + Constants.FANOUT_QUEUE1 + "] 接收到消息:" + message);
- }
- // 消费者2
- @RabbitListener(queues = Constants.FANOUT_QUEUE2)
- public void queueListener2(String message) {
- System.out.println("listener2 [" + Constants.FANOUT_QUEUE2 +"] 接收到消息:" + message);
- }
- }
复制代码 运行结果
- 路由模式
生产者
1.编写RabbitMQConfig
- @Bean("directQueue1")
- public Queue directQueue1() {
- return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
- }
- @Bean("directQueue2")
- public Queue directQueue2() {
- return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
- }
- @Bean("directExchange")
- public DirectExchange directExchange() {
- return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
- }
- @Bean
- public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) {
- return BindingBuilder.bind(queue).to(directExchange).with("orange");
- }
- @Bean
- public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
- return BindingBuilder.bind(queue).to(directExchange).with("black");
- }
- @Bean
- public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
- return BindingBuilder.bind(queue).to(directExchange).with("orange");
- }
复制代码 2.编写ProducerController
- @RequestMapping("/direct")
- public String direct(String routingKey) {
- rabbittemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"hello spring amqp:direct ," +
- "my routingKey is " + routingKey);
- return "发送成功~";
- }
复制代码 斲丧者
1.编写FanoutListener
- @Component
- public class DirectListener {
- @RabbitListener(queues = Constants.DIRECT_QUEUE1)
- public void QueueListener1(String message) {
- System.out.println("listener1 [" + Constants.DIRECT_QUEUE1 + "] 接收到消息:" + message);
- }
- @RabbitListener(queues = Constants.DIRECT_QUEUE2)
- public void QueueListener2(String message) {
- System.out.println("listener2 [" + Constants.DIRECT_QUEUE2 + "] 接收到消息:" + message);
- }
- }
复制代码 当在浏览器url分别传参routingKey为orange和balck后,运行结果
- 通配符模式
生产者
1.编写RabbitMQConfig
- // 通配符模式
- @Bean("topicQueue1")
- public Queue topicQueue1() {
- return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
- }
- @Bean("topicQueue2")
- public Queue topicQueue2() {
- return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
- }
- @Bean("topicExchange")
- public TopicExchange topicExchange() {
- return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
- }
- @Bean("topicQueueBinding1")
- public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) {
- return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
- }
- @Bean("topicQueueBinding2")
- public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
- return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
- }
- @Bean("topicQueueBinding3")
- public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
- return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
- }
复制代码 2.编写ProducerController
- @RequestMapping("/topic")
- public String topic(String routingKey) {
- rabbittemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"hello spring amqp:topic ," +
- "my routingKey is " + routingKey);
- return "发送成功~";
- }
复制代码 斲丧者
1.编写TopicListener
- @Component
- public class TopicListener {
- @RabbitListener(queues = Constants.TOPIC_QUEUE1)
- public void topicQueue1(String message) {
- System.out.println("listener1 [" + Constants.TOPIC_QUEUE1 + "] 接收到消息:" + message);
- }
- @RabbitListener(queues = Constants.TOPIC_QUEUE2)
- public void topicQueue2(String message) {
- System.out.println("listener2 [" + Constants.TOPIC_QUEUE2 + "] 接收到消息:" + message);
- }
- }
复制代码 当在浏览器url分别传参routingKey后,运行结果
发送json对象
1.编写RabbitMQConfig
- @Configuration
- public class RabbitMQConfig {
- @Bean("orderQueue")
- public Queue orderQueue() {
- return QueueBuilder.durable("order.create").build();
- }
- // 自己创建rabbitTemplate 在rabbitTemplate中设置messageConverter(核心)
- @Bean
- public Jackson2JsonMessageConverter jsonMessageConverter () {
- return new Jackson2JsonMessageConverter();
- }
- @Bean
- // 这里的参数ConnectionFactory是自带的,Jackson2JsonMessageConverter是上面自己写的通过Bean交给的Spring
- public RabbitTemplate rabbitTemplate (ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jsonMessageConverter) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(jsonMessageConverter);
- return rabbitTemplate;
- }
- }
复制代码- // 提前编写好要传递的对象(orderInfo)
- @RequestMapping("/create2")
- public String create2() {
- OrderInfo orderInfo = new OrderInfo();
- String orderId = UUID.randomUUID().toString();
- orderInfo.setName("商品" + new Random().nextInt(100));
- orderInfo.setOrderId(orderId);
- rabbitTemplate.convertAndSend("","order.create",orderInfo);
- return "下单成功~";
- }
复制代码 查看客户端
接收json对象
1.编写ConsumerListener
- @Component
- @RabbitListener(queues = "order.create") // 将RabbitListener写在类上 要搭配@RabbitHandler使用
- public class OrderListener {
- @RabbitHandler // RabbitHandler中的参数,可以根据接收的数据类型来映射函数,此函数接收String类型
- public void handMessage(String message) {
- System.out.println("接收到订单消息String:" + message);
- }
- // 接收OrderInfo类型
- @RabbitHandler
- // 此函数接收OrderInfo类型,但是OrderInfo属于json类型,也要和设置rabbitTemplate中的setmessageConverter(核心)
- public void handMessage2(OrderInfo message) {
- System.out.println("接收到订单消息OrderInfo:" + message);
- }
- }
复制代码 2.编写RabbitMQConfig
- @Configuration
- public class RabbitMQConfig {
- @Bean
- public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
- return new Jackson2JsonMessageConverter();
- }
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jsonMessageConverter) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(jsonMessageConverter);
- return rabbitTemplate;
- }
- }
复制代码 分别发送String类型和OrderInfo类型实验的是差别的方法
RabbitMQ高级特性
消息确认(手动确认)
- 肯定确认
Channel.basicAck(long deliveryTag), boolean multiple
RabbitMQ已经知道该消息,而且成功的处理消息,可以将其丢掉了
deliveryTag: 消息唯一标识,是一个自增64位长整型
multiple:是否批量确认
- 单次否定确认
Channel.basicReject(long deliveryTag, boolean requeue)
斲丧者客户端可以拒绝此消息
deliveryTag: 消息唯一标识,是一个自增64位长整型
requeue:假如位true,表现重新发送,假如为false,表现从队列中移除
- 批量否定确认
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
斲丧者客户端批量拒绝消息
deliveryTag: 消息唯一标识,是一个自增64位长整型
requeue:假如位true,表现重新发送,假如为false,表现从队列中移除
multiple:为true表现拒绝deliveryTag之前所有的消息
SpringBoot确认机制
- 1.NONE : 自动确认
- 不管消费者是否收到消息,只要发送给消费者就直接删除
- 2.AUTO : 默认确认
- 如果消费者处理成功时,会自动确认删除消息,如果出现异常就不会确认消息不会进行删除
- 3.MANUAL : 手动确认
- 需要消费者手动调用basicAck方法来确认消息,如果未被确认RabbitMQ就会认为消息未被成功处理,且会重新投递该消息
复制代码- 主要流程:
- 1.配置确认机制(自动确认/手动确认)
- 2.生产者发送消息
- 3.消费端逻辑
- 4.测试
复制代码- # 配置确认机制
- spring:
- rabbitmq:
- host: 47.122.121.191
- port: 5672
- username: xxxx
- password: xxxx
- virtual-host: extension
- # 配置接收确认
- listener:
- simple:
- # acknowledge-mode: none #自动确认
- # acknowledge-mode: auto #成功自动确认,异常不会确认(默认)
- acknowledge-mode: manual #手动确认
复制代码 持久性
RabbitMQ可靠性包管机制之一
RabbitMQ的持久化是分为三个部门,分别是互换机持久化、队列持久化、消息持久化,也就是说要完整的包管RabbitMQ的持久化特性,就要包管这三个持久化的开启,但是不能百分百包管消息不丢失,这三个持久化只能包管RabbitMQ的服务器不进行消息的丢失,不包管生产者和斲丧者传输到服务区过程中的丢失情况
- 互换机持久化
互换机持久化是将durable设置为true,纵然是不去主动设置,他的底层代码也默认是持久化的
- 队列持久化
队列持久化也是和互换机持久化一样,通过durable来设置的,假如要设置为持久化就是用durable假如要设置非持久化,就是用nonDurable
- 消息持久化
在解说消息持久化之前,先明白一点,消息持久化是跟着队列持久化而来的,假如消息持久化开启了,但是队列持久化并未开启,服务器重启后因为队列的丢失消息也会跟着丢失,假如队列持久化开启了消息持久化没有开启,服务器重启后队列还在但是消息就会丢失,以是单独开启消息持久化是没故意义的,要再队列持久化基础上再开启消息持久化,而消息持久化默认也是开启的
发送方确认
发送方确认是生产者到RabbitMQ服务器之间的消息确认模式,确保了生产者到服务器的可靠性
发送方确认分为两种:
1.confirm模式 : 生产者到exchange之间的作用模式
2.return模式 : exchange到queue之间的作用模式
这两种模式不是互斥的,可以单独使用也可以联合使用
- confirm模式
confirm模式主要是通过设置rabbitTemplate中的setConfirmCallback方法,在设置中传入参数new RabbitTemplate.ConfirmCallback()重写此中的confirm方法,confirm方法中根据ack的参数来确保是否接收大概丢失,分别做处理
步调1:设置发送确认yaml文件
- spring:
- rabbitmq:
- # 配置发送确认
- publisher-confirm-type: correlated
复制代码 步调2:自界说rabbitTemplate重写confirm方法
为什么要自界说?
再写入消息的时候,我们会用到rabbitTemplate,这个是Spring为我们提供的,我们是直接注入进去的,以是只要用到了rabbit Template那就是同一个Bean对象,为了不影响其他地方对rabbit Template的使用,我们自己重新写一个之后设置他的setConfirmCallback即可。
- @Configuration
- public class RabbitTemplateConfig {
- // 单独写一个没做任何处理的rabbitTemplate方法,不需要使用发送方确认的就是用这个rabbitTemplate,因为我们自定义了
- // rabbitTemplate方法之后就会自动覆盖spring为我们提供的方法。
- @Bean("rabbitTemplate")
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- return new RabbitTemplate(connectionFactory);
- }
- // 自定义rabbitTemplate命名为confirmRabbitTemplate通过Bean交给spring管理
- @Bean("confirmRabbitTemplate")
- public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("执行confirm方法");
- if (ack) {
- System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());
- } else {
- System.out.printf("未收到消息, 消息ID: %s , cause : %s \n",correlationData == null ? null : correlationData.getId(),cause);
- }
- }
- });
- return rabbitTemplate;
- }
- }
复制代码 步调3:编写ProducerController
- @RestController
- @RequestMapping("/producer")
- public class ProducerController {
- // 注入的是不同的rabbitTemplate,根据名字来区分
- @Resource
- private RabbitTemplate rabbitTemplate;
- @Resource
- private RabbitTemplate confirmRabbitTemplate;
-
- // 不需要发送确认的
- @RequestMapping("/ack")
- public String ack() {
- // 创建一个Message对象设置为持久化或非持久化
- Message message = new Message("consumer ack mode test...".getBytes(),new MessageProperties());
- //message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);// 非持久化
- rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack",message);
- return "消息发送成功~";
- }
- // 需要发送确认的
- @RequestMapping("/confirm")
- public String confirm() {
- CorrelationData correlationData = new CorrelationData("1");
- confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","hello confirm",correlationData);
- return "消息发送成功~";
- }
- }
复制代码 注意!!:发送确认是生产者到互换机之间的发送确认,他只能包管到了互换机互换性能正常辨认到来了ack就是true,但是假如RoutingKey和BindingKey不匹配,他是不会将ack为false的,这种情况就要看后续讲的return退回模式!
- return 退回模式
互换机在接收到消息之后,未能找到与RoutingKey相同的BiindingKey,就会实验return退回模式的回调函数
步调1:设置发送确认yaml文件
- spring:
- rabbitmq:
- # 配置发送确认
- publisher-confirm-type: correlated
复制代码 步调2:自界说rabbitTemplate重写returnedMessage方法
- @Configuration
- public class RabbitTemplateConfig {
- // 单独写一个没做任何处理的rabbitTemplate方法,不需要使用发送方确认的就是用这个rabbitTemplate,因为我们自定义了
- // rabbitTemplate方法之后就会自动覆盖spring为我们提供的方法。
- @Bean("rabbitTemplate")
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- return new RabbitTemplate(connectionFactory);
- }
- // 自定义rabbitTemplate命名为confirmRabbitTemplate通过Bean交给spring管理
- @Bean("confirmRabbitTemplate")
- public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("执行confirm方法");
- if (ack) {
- System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());
- } else {
- System.out.printf("未收到消息, 消息ID: %s , cause : %s \n",correlationData == null ? null : correlationData.getId(),cause);
- }
- }
- });
- // 消息被退回时的回调方法(此方法这里和重写的confirm方法写在了一个定义rabbitTemplate中,并不是一定要这样写,他俩是可以单独开来的)
- rabbitTemplate.setMandatory(true);// 不要忘setMandatory(true)
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- System.out.println("消息退回: " + returned);
- }
- });
- return rabbitTemplate;
- }
- }
复制代码 持久化总结
如何确保可靠性传输?
1.Producer -> Broker 发送方确认
1)producer -> exchange : confirm 模式
2)exchange -> queue: returns 模式
3)队列溢出:死信等
2.Broker 持久化
1)互换机持久化
2)队列持久化
3)消息持久化
3.Broker -> Consumer 消息确认
1)主动确认
2)手动确认
重试机制
在消息通报过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息处理失败。为了办理这些问题, RabbitMQ 提供了重试机制, 答应消息在处理失败后重新发送。但假如是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数
1.设置信息
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: auto #消息接收确认
- retry:
- enabled: true # 开启消费者失败重试
- initial-interval: 5000ms # 初始失败等待时⻓为5秒
- max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次)
复制代码 注意:
- ⾃动确认模式下: 程序逻辑非常, 多次重试还是失败, 消息就会被⾃动确认, 那么消息就丢失了
- 手动确认模式下: 程序逻辑非常, 多次重试消息依然处理失败, 无法被确认。
TTL
TTL就是过期时间,也就是消息的寿命存活的时间,假如消息到达过期时间,还没有被斲丧,就会直接被扫除掉,消息的TTL牵扯到两个TTL
1.消息的TTL :队列还存在,消息的TTL时间到了,消息会消失
- @RequestMapping("/ttl")
- public String ttl() {
- MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setExpiration("10000"); // 10s
- return message;
- }
- };
- rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","hello ttl",messagePostProcessor);
- return "消息发送成功~";
- }
复制代码 2.队列的TTL : 队列TTL时间到了里面存放的消息也会随之消失
- // 设置ttl队列
- @Bean("ttlQueue2")
- public Queue ttlQueue2() {
- return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20_000).build(); // 20s
- }
复制代码 总结:假如同时设置了各自的TTL,那么取短时间作为过期时间缘故原由就是,队列时间短带着消息一起消失,消息时间短消息到时间一样消失
死信队列
死信:简单来说就是种种缘故原由无法被斲丧者斲丧的信息。当消息变成死信之后就会被重新发送到另一个互换机中,这个互换机就是DLX绑定DLX的队列就是死信队列。
变成死信的几种情况:
1.消息被拒绝:Basic.Reject/Basic.Nack,而且设置requeue为false
- @Component
- public class DlListener {
- @RabbitListener(queues = Constants.NORMAL_QUEUE)
- public void handMessage(Message message, Channel channel) throws Exception {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- // 消费者逻辑
- System.out.printf("接收到消息:%s, deliveryTag %d \n",
- new String(message.getBody(), "utf-8"), deliveryTag);
- // 进行业务逻辑处理
- System.out.println("业务逻辑处理...");
- int num = 3 / 0;
- System.out.println("业务处理完成");
- // 肯定确认
- channel.basicAck(deliveryTag, false);
- } catch (Exception e) {
- // 否定确认
- channel.basicNack(deliveryTag, false, false); // 设置不让入队,变成死信
- }
- }
- @RabbitListener(queues = Constants.DL_QUEUE)
- public void dlHandMessage(Message message) {
- // 死信消费者逻辑
- System.out.printf("[" + Constants.DL_QUEUE + "] 接收到消息:%s, deliveryTag: %d \n",
- new String(message.getBody()), message.getMessageProperties().getDeliveryTag());
- }
- }
复制代码 2.消息过期
- // 死信
- // 正常交换机和队列
- @Bean("normalQueue")
- public Queue normalQueue() {
- return QueueBuilder.durable(Constants.NORMAL_QUEUE)
- .deadLetterExchange(Constants.DL_EXCHANGE)// 配置和死信交换机的关联
- .deadLetterRoutingKey("dl") // 配置和死信交换机的RoutingKey
- .ttl(10000) // 配置ttl消息过期后直接回关联到死信队列中
- .maxLength(10) // 队列长度
- .build();
- }
- @Bean("normalExchange")
- public DirectExchange normalExchange() {
- return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
- }
- @Bean("normalBinding")
- public Binding normalBinding(@Qualifier("normalExchange") DirectExchange directExchange,@Qualifier("normalQueue") Queue queue) {
- return BindingBuilder.bind(queue).to(directExchange).with("normal");
- }
- // 死信交换机和队列
- @Bean("dlQueue")
- public Queue dlQueue() {
- return QueueBuilder.durable(Constants.DL_QUEUE).build();
- }
- @Bean("dlExchange")
- public DirectExchange dlExchange() {
- return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
- }
- @Bean("dlBinding")
- public Binding dlBinding(@Qualifier("dlExchange") DirectExchange directExchange,@Qualifier("dlQueue") Queue queue) {
- return BindingBuilder.bind(queue).to(directExchange).with("dl");
- }
复制代码 3.队列到达最大长度 : 代码如上消息过期
延迟队列
延迟队列指的是,消息发送之后并不想让斲丧者立即拿到消息,而是等待特定时间后才让斲丧者进行斲丧,可惜的是RabbitMQ并没有直接支持延迟队列,但是我们可以通过TTL+死信的模式来拟定一个延迟队列,当TTL时间到了之后,用户直接斲丧死信队列即可,起到一个延时的结果
延迟队列通过TTL+死信方式实现
1.设置队列的TTL
2.设置消息的TTL
当后来的消息过期时间早于,先到的消息时,会有延迟,第二个时间短的会和第一个时间长的一起出队列,而设置队列的TTL不存在这个情况
延迟队列插件
插件毗连:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- // 延迟队列创建Bean队列和交换机代码
- @Configuration
- public class DelayConfig {
- @Bean("delayQueue")
- public Queue delayQueue() {
- return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
- }
- @Bean("delayExchange")
- public DirectExchange delayExchange() {
- return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build(); // 延迟交换机
- }
- @Bean("delayBinding")
- public Binding delayBinding(@Qualifier("delayExchange") DirectExchange delayExchange,@Qualifier("delayQueue") Queue queue) {
- return BindingBuilder.bind(queue).to(delayExchange).with("delay");
- }
- }
复制代码- @RequestMapping("/delay")
- public String delay() {
- // 发送消息1(30s)
- MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setDelayLong(30_000L);
- return message;
- }
- };
- rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","hello delay",messagePostProcessor);
- // 发送消息2(10s)
- MessagePostProcessor messagePostProcessor1 = new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setDelayLong(10_000L);
- return message;
- }
- };
- rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","hello delay",messagePostProcessor1);
- System.out.printf("%tc 消息发送成功 \n",new Date());
- return "消息发送成功~";
- }
复制代码 二者对比:
a. 优点: 1) 灵活不需要额外的插件⽀持
b. 缺点: 1) 存在消息次序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性
a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
b. 缺点: 1) 需要依赖特定的插件, 有运维⼯作 2) 只适⽤特定版本
事务
- 步骤:
- 1.创建新的RabbitTemplate设置setChannelTransacted为true
- 2.创建RabbitTransactionManager对象交给spring管理
- 3.注入transRabbitTemplate到Producer中并使用
- 4.再Producer方法上加上@Transactional注解
复制代码- @Configuration
- public class RabbitTemplateConfig {
- @Bean("rabbitTemplate")
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- return new RabbitTemplate(connectionFactory);
- }
- // 事务
- @Bean("transRabbitTemplate")
- public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setChannelTransacted(true); // 开启事务
- return rabbitTemplate;
- }
- // 创建事务管理器
- @Bean("transactionManager")
- public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
- return new RabbitTransactionManager(connectionFactory);
- }
- }
复制代码- @RestController
- @RequestMapping("/producer")
- public class ProducerController {
- @Resource(name = "rabbitTemplate")
- private RabbitTemplate rabbitTemplate;
- // 依赖注入
- @Resource(name = "transRabbitTemplate")
- private RabbitTemplate transRabbitTemplate;
- // 加上@Transactional注解启用事务
- @Transactional
- @RequestMapping("/transaction")
- public String transaction() {
- System.out.println("transaction test...");
- transRabbitTemplate.convertAndSend("",Constants.TRANSACTION_QUEUE,"transaction1...");
- int num = 3/0;
- transRabbitTemplate.convertAndSend("",Constants.TRANSACTION_QUEUE,"transaction2...");
- return "消息发送成功~";
- }
- }
复制代码 消息分发
接纳消息分发可以做到以下功能:
1.限流
2.负载均衡
- 限流
设置prefetch参数,设置为手动确认
- #ack 确认⽅式:开启ack
- listener:
- simple:
- acknowledge-mode: manual #⼿动确认
- prefetch: 5
复制代码 设置互换机队列
- @Configuration
- public class QosConfig {
- @Bean("qosQueue")
- public Queue qosQueue() {
- return QueueBuilder.durable(Constants.QOS_QUEUE).build();
- }
- @Bean("qosExchange")
- public DirectExchange qosExchange() {
- return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
- }
- @Bean("qosBinding")
- public Binding qosBinding(@Qualifier("qosExchange") DirectExchange directExchange,@Qualifier("qosQueue") Queue queue) {
- return BindingBuilder.bind(queue).to(directExchange).with("qos");
- }
- }
复制代码 发送消息
- @RequestMapping("/qos")
- public String qos() {
- System.out.println("qos test...");
- for (int i = 0; i < 20; i++) {
- rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,"qos","qos test...");
- }
- return "消息发送成功~";
- }
复制代码 斲丧者监听
- @Component
- public class QosListener {
- @RabbitListener(queues = Constants.QOS_QUEUE)
- public void handMessage(Message message, Channel channel) throws Exception {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- // 消费者逻辑
- System.out.printf("接收到消息:%s, deliveryTag %d \n",
- new String(message.getBody(), "utf-8"), deliveryTag);
- // 进行业务逻辑处理
- // System.out.println("业务逻辑处理...");
- // int num = 3 / 0;
- // System.out.println("业务处理完成");
- // // 肯定确认
- // channel.basicAck(deliveryTag, false);
- } catch (Exception e) {
- // 否定确认
- channel.basicNack(deliveryTag, false, true);
- }
- }
- }
复制代码
- 负载均衡
负载均衡和限流一样,只是负载均衡并非是一个斲丧者监听,而是多个
幂等性保障
幂等性指的是在应用程序中,对一个系统重复调用,不论请求多少次,这些请求对系统的影响都是相等的结果,例如数据库中的select操作。
MQ的幂等性
对于MQ⽽⾔, 幂等性是指同⼀条消息, 多次斲丧, 对系统的影响是相同的。
⼀般消息中心件的消息传输保障分为三个层级.
- At most once:最多⼀次. 消息可能会丢失,但绝不会重复传输.
- At least once:最少⼀次. 消息绝不会丢失,但可能会重复传输.
- Exactly once:恰好⼀次. 每条消息肯定会被传输⼀次且仅传输⼀次.
RabbitMQ⽀持"最多⼀次"和"最少⼀次"。
幂等性办理方案
全局唯一ID
1.设置ID:给每条消息分配一个唯一的ID(UUID等)
2.判定:斲丧者收到消息后,判定ID是否已经被斲丧过,假如斲丧过则放弃处理
3.保存ID:假如为斲丧过,斲丧者开始斲丧,但是要把此全局ID保存起来(数据库或Redis等)
可以使⽤Redis 的原⼦性操作setnx来包管幂等性, 将唯⼀ID作为key放到redis中 (SETNX messageID 1) . 返回1, 说明之前没有斲丧过, 正常斲丧. 返回0, 说明这条消息之前已斲丧过, 抛弃.
业务逻辑判定
在业务逻辑层⾯实现消息处理的幂等性.
例如: 通过检查数据库中是否已存在相关数据纪录, 大概使⽤乐观锁机制来避免更新已被其他事务更改的数据, 再大概在处理消息之前, 先检查相关业务的状态, 确保消息对应的操作尚未执⾏, 然后才进行处理, 详细根据业务场景来处理
次序性保障
生产者生产消息的次序和斲丧者斲丧消息的次序是同等的
次序性保障方案
1.单个队列的斲丧者
最简单的⽅法是使⽤单个队列, 并由单个斲丧者进⾏处理. 同⼀个队列中的消息是先辈先出的, 这是RabbitMQ来资助我们包管的。
2.分区斲丧
单个斲丧者的吞吐太低了, 当需要多个斲丧者以提⾼处理速率时, 可以使⽤分区斲丧. 把⼀个队列分割成多个分区, 每个分区由⼀个斲丧者处理, 以此来保持每个分区内消息的次序性。
3.消息确认机制
使⽤手动消息确认机制, 斲丧者在处理完⼀条消息后, 显式地发送确认, 如许RabbitMQ才会移除并继承发送下⼀条消息
4.业务逻辑控制
在某些情况下, 纵然消息乱序到达, 也可以在业务逻辑层⾯实现次序控制. ⽐如通过在消息中嵌⼊序列号, 并在斲丧时根据这些信息来处理
RabbitMQ本⾝并不包管全局的严格次序性, 特别是在分布式系统中. 在实际应⽤开辟中, 根据详细的业务需求, 可能需要联合多种策略来实现所需要的次序包管
消息积压
消息积压是指在消息队列(如RabbitMQ)中, 待处理的消息数目超过了斲丧者处理能⼒, 导致消息在队列中不断堆积的现象。
通常有以下情况:
1.消息生产过快
2.斲丧者处理能力不足
3.网络问题
4.RabbitMQ服务器设置偏低
办理方案:
1.进步斲丧者效率:
a. 增加斲丧者实例数目, ⽐如新增机器
b. 优化业务逻辑, ⽐如使⽤多线程来处理业务
c. 设置prefetchCount, 当⼀个斲丧者阻塞时, 消息转发到其他未阻塞的斲丧者.
d. 消息发生非常时, 设置符合的重试策略, 大概转入到死信队列
2.限制生产者效率(流量控制,限流算法等)
a. 流量控制: 在消息生产者中实现流量控制逻辑, 根据斲丧者处理能力动态调解发送速率
b. 限流: 使用限流工具, 为消息发送速率设置⼀个上限
c. 设置过期时间. 假如消息过期未斲丧, 可以设置死信队列, 以避免消息丢失, 并淘汰对主队列的压力
3.资源设置优化
比如升级RabbitMQ服务器的硬件, 调解RabbitMQ的设置参数等
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |