【RabbitMQ】入门到实用再到底层看完这篇就够了

火影  论坛元老 | 2025-2-22 10:50:10 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1015|帖子 1015|积分 3045

MQ的作用

MQ主要是接收并转发消息,在差别的应用场景下可以展现差别的作用
  1. 1.异步解耦
  2.         一些操作比较耗时,但是不影响整体程序的执行,不需要及时返回的结果,就可以使用MQ把这些异步化。
  3. 2.流量削峰
  4.         在某些特殊情况下,流量突然增大,为了防止服务器的崩溃,就会使用MQ进行流量削峰进行流量控制,例如限时秒杀活动等
  5. 3.消息分发
  6.         当多个系统需要同一个数据时,可以将数据放入到MQ中,其他哪个服务器需要则直接进行订阅即可,无需多次访问数据库
  7. 4.延迟通知
  8.         在需要特定时间后发送通知的场景中,可以使用MQ延迟发送,例如用户下单长时间没有支付,到时间点之后就自动取消订单
复制代码
RabbitMQ焦点概念



  • 端口号信息

    1. 5672 : 客户端和服务器建立连接时的端口号
    2. 15672 : 管理界面端口号
    3. 25672 : 集群使用的端口号
    复制代码
  • rabbitMQ工作流程

AMQP

AMQP是一个高级消息队列协议,AMQP界说了一套确定的消息互换功能,包罗互换器(Exchange)、队列(Queue)等,这些组件共同工作使得生产者能将消息发送到互换机(Exchange),然后由互换机准确的分发到队列(Queue)中。
RabbiteMQ是遵循了AMQP协议的,换句话说,RabbiteMQ就是AMQP协议的Erlang的实现,以是他们的结构是一样的。

RabbitMQ快速上手

引入依赖

  1. 中央仓库搜索amqp
  2. <dependency>
  3.     <groupId>com.rabbitmq</groupId>
  4.     <artifactId>amqp-client</artifactId>
  5.     <version>5.21.0</version>
  6. </dependency>
复制代码
生产者(Producer)

  1. 1.建立连接
  2. 2.创建Channel
  3. 3.声明队列Queue
  4. 4.发送消息
  5. 5.释放资源
复制代码
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class ProducerDemo {
  7.     public static void main(String[] args) throws IOException, TimeoutException {
  8.         // 1.建立连接
  9.         ConnectionFactory factory = new ConnectionFactory();
  10.         factory.setHost("47.122.121.191");
  11.         factory.setPort(5672); // 需要提前开放端口号
  12.         factory.setUsername("admin");
  13.         factory.setPassword("admin");
  14.         factory.setVirtualHost("new virtual");
  15.         Connection connection = factory.newConnection();
  16.         // 2.开启信道
  17.         Channel channel = connection.createChannel();
  18.         // 3.声明交换机  使用内置的交换机
  19.         // 4.声明队列
  20.         /**
  21.          * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
  22.          *                                  Map<String, Object> arguments)
  23.          * 参数说明 :
  24.          *          queue : 队列名称
  25.          *          durable : 是否持久化
  26.          *          exclusive : 是否独占
  27.          *          autoDelete : 是否自动删除
  28.          *          arguments : 参数
  29.          */
  30.         channel.queueDeclare("hello2",true,false,false,null);
  31.         // 5.发送消息
  32.         /**
  33.          * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  34.          * 参数说明 :
  35.          *          exchange : 交换机的名称
  36.          *          routingKey : 内置交换机,routingKey和队列名称保持一致,表示发送给哪个队列
  37.          *          props : 属性配置
  38.          *          body : 消息
  39.          */
  40.         String mes = "hello rabbitmq";
  41.         channel.basicPublish("","hello2",null,mes.getBytes());
  42.         // 6.资源释放
  43.         channel.close();
  44.         connection.close();
  45.     }
  46. }
复制代码
斲丧者(Consumer)

  1. 1. 建立连接
  2. 2. 创建信道(Channel)
  3. 3. 声明队列(Queue)
  4. 4. 接收消息
  5. 5. 释放资源
复制代码
  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class ConsumerDemo {
  5.     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  6.         // 1.创建连接
  7.         ConnectionFactory factory = new ConnectionFactory();
  8.         factory.setHost("47.122.121.191");
  9.         factory.setPort(5672);
  10.         factory.setUsername("admin");
  11.         factory.setPassword("admin");
  12.         factory.setVirtualHost("new virtual");
  13.         Connection connection = factory.newConnection();
  14.         // 2.创建Channel
  15.         Channel channel = connection.createChannel();
  16.         // 3.声明队列(如果队列已经存在可省略,如果不存在省略此声明的话会报错,建议不要省略)
  17.         channel.queueDeclare("hello2",true,false,false,null);
  18.         // 4.消费消息
  19.         /**
  20.          * basicConsume(String queue, boolean autoAck, Consumer callback)
  21.          * 参数说明:
  22.          * queue:队列名称
  23.          * autoAck:是否自动确认
  24.          * callback:接收到消息之后,执行的逻辑
  25.          */
  26.         Consumer consumer = new DefaultConsumer(channel){
  27.             // 从队列中收到消息,就会执行的方法
  28.             @Override
  29.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties                                 properties, byte[] body) throws IOException {
  30.                 // TODO
  31.                 System.out.println("接收到消息:" + new String(body));
  32.             }
  33.         };
  34.         channel.basicConsume("hello2",true,consumer);
  35.         // 等待程序执行完成
  36.         Thread.sleep(2000);
  37.         // 5.释放资源
  38.         channel.close();
  39.         connection.close();
  40.     }
  41. }
复制代码
RabbitMQ模式先容



  • 简单模式(Simple)
    1. 一个生产者和一个消费者,也称作点对点的模式,即消息只能被单个消费者处理
    复制代码
  • 工作队列(Work Queue)
    1. 一个生产者,多个消费者,工作队列会将消息分派给不同的消费者,每个消费者都会收到不同的消费信息
    2. 特点 : 消息不会重复,分配给不同消费者,常用于集群做异步处理
    复制代码
  • 发布/订阅模式(Publish/Subscribe)
    1. 发布/订阅模式多了一个交换机`Exchange`,特点是一个生产者多个消费者,并且每个消费者都会拿到所有的消费信息,而不是评分这些信息,常用于广播消息的通知
    复制代码
  • 路由模式(Routing)
    1. 路由模式也是多了一个交换机,特点是在发布/订阅模式基础上增加路由key(Routing key),只有和消息队列的key(Binding key)相互匹配的key才进行消息的转发,适用于需要根据特定规则分发消息的场景
    复制代码
  • 通配符模式(Topics)
    1. 通配符模式是路由模式的一个升级,和路由模式一样可以进行key的筛选,但是它支持通配符,只要是满足响应通配符的条件都可进行队列消息的转发,他比路由模式更加灵活
    复制代码
  • RPC通讯
    1. RPC通信没有生产者消费者,只有client和server,通过两个消息队列实现了一个可以回调的过程
    复制代码
  • 发布确认模式(Publisher Confirms)
    1. 发布确认模式是RabbitMQ提供的一种确保消息可靠发送到RabbitMQ服务器的机制。通过此模式,生产者每发布一条消息都会获得一个唯一的ID,生产者可以将这些序号与消息关联起来,方便跟踪。当RabbitMQ服务器收到消息之后,服务器会异步发送一个确认(ACK)给生产者,其中就包含这个ID,表明此消息已送达服务器。
    2. 注意:发布确认模式只能确保生产者和服务器的可靠性,而不是生产者和消费者的可靠性
    复制代码
代码案例



  • 工作队列
    生产者
    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. import rabbitmq.constants.Constants;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. public class Producer {
    8.     public static void main(String[] args) throws IOException, TimeoutException {
    9.         // 1.建立连接
    10.         ConnectionFactory factory = new ConnectionFactory();
    11.         factory.setHost(Constants.HOST);
    12.         factory.setPort(Constants.PORT);
    13.         factory.setUsername(Constants.USERNAME);
    14.         factory.setPassword(Constants.PASSWORD);
    15.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    16.         Connection connection = factory.newConnection();
    17.         // 2.开启信道
    18.         Channel channel = connection.createChannel();
    19.         // 3.声明交换机  使用内置的交换机
    20.         // 4.声明队列
    21.         channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
    22.         // 5.发送消息
    23.         String msg = null;
    24.         for (int i = 0; i < 20; i++) {
    25.             msg = "hello work queue " + i;
    26.             channel.basicPublish("",Constants.WORK_QUEUE,null,msg.getBytes());
    27.         }
    28.         // 6.资源释放
    29.         channel.close();
    30.         connection.close();
    31.         System.out.println("发送完毕~");
    32.     }
    33. }
    复制代码
    斲丧者1
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer1 {
    6.     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    7.         // 1.创建连接
    8.         ConnectionFactory factory = new ConnectionFactory();
    9.         factory.setHost(Constants.HOST);
    10.         factory.setPort(Constants.PORT);
    11.         factory.setUsername(Constants.USERNAME);
    12.         factory.setPassword(Constants.PASSWORD);
    13.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    14.         Connection connection = factory.newConnection();
    15.         // 2.创建Channel
    16.         Channel channel = connection.createChannel();
    17.         // 3.声明队列(如果队列已经存在可省略,如果不存在省略此声明的话会报错,建议不要省略)
    18.         channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
    19.         // 4.消费消息
    20.         Consumer consumer = new DefaultConsumer(channel){
    21.             // 从队列中收到消息,就会执行的方法
    22.             @Override
    23.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    24.                 System.out.println("Consumer1接收到消息:" + new String(body));
    25.             }
    26.         };
    27.         channel.basicConsume(Constants.WORK_QUEUE,true,consumer);
    28. //        // 5.释放资源(先启动消费者不进行资源释放,否则消费者无法长时间运行)
    29. //        channel.close();
    30. //        connection.close();
    31.     }
    32. }
    复制代码
    斲丧者2
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer2 {
    6.     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    7.         // 1.创建连接
    8.         ConnectionFactory factory = new ConnectionFactory();
    9.         factory.setHost(Constants.HOST);
    10.         factory.setPort(Constants.PORT);
    11.         factory.setUsername(Constants.USERNAME);
    12.         factory.setPassword(Constants.PASSWORD);
    13.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    14.         Connection connection = factory.newConnection();
    15.         // 2.创建Channel
    16.         Channel channel = connection.createChannel();
    17.         // 3.声明队列(如果队列已经存在可省略,如果不存在省略此声明的话会报错,建议不要省略)
    18.         channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
    19.         // 4.消费消息
    20.         Consumer consumer = new DefaultConsumer(channel){
    21.             // 从队列中收到消息,就会执行的方法
    22.             @Override
    23.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    24.                 System.out.println("Consumer2接收到消息:" + new String(body));
    25.             }
    26.         };
    27.         channel.basicConsume(Constants.WORK_QUEUE,true,consumer);
    28. //        // 5.释放资源(先启动消费者不进行资源释放,否则消费者无法长时间运行)
    29. //        channel.close();
    30. //        connection.close();
    31.     }
    32. }
    复制代码
  • 发布/订阅
    生产者
    1. import com.rabbitmq.client.BuiltinExchangeType;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import rabbitmq.constants.Constants;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. public class Producer {
    9.     public static void main(String[] args) throws IOException, TimeoutException {
    10.         // 1.建立连接
    11.         ConnectionFactory factory = new ConnectionFactory();
    12.         factory.setHost(Constants.HOST);
    13.         factory.setPort(Constants.PORT);
    14.         factory.setUsername(Constants.USERNAME);
    15.         factory.setPassword(Constants.PASSWORD);
    16.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    17.         Connection connection = factory.newConnection();
    18.         // 2.开启信道
    19.         Channel channel = connection.createChannel();
    20.         // 3.声明交换机
    21.         /**
    22.          * exchangeDeclare(String exchange,    交换机名称
    23.          *         BuiltinExchangeType type,   交换机类型
    24.          *         boolean durable,            是否可持久化
    25.          *         boolean autoDelete,         是否自动删除
    26.          *         boolean internal,           是否内部使用,内部使用不能用客户端发送
    27.          *         Map<String, Object> arguments)  参数
    28.          */
    29.         channel.exchangeDeclare(Constants.FANOUT_EXCHANGE,                  BuiltinExchangeType.FANOUT,true,false,false,null);
    30.         // 4.声明队列
    31.         channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
    32.         channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
    33.         // 5.绑定交换机(routingKey为空表示,接收所有的消息)
    34.         channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
    35.         channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
    36.         // 6.发布消息
    37.         String msg = "hello fanout ";
    38.         for (int i = 0; i < 20; i++) {
    39.             msg = msg + i;
    40.             channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,msg.getBytes());
    41.         }
    42.         System.out.println("消息发送成功~");
    43.         // 7.释放资源
    44.         channel.close();
    45.         connection.close();
    46.     }
    47. }
    复制代码
    消息队列客户端表现,每一个绑定的消息队列都有20条消息

    斲丧者1
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer1 {
    6.     public static void main(String[] args) throws IOException, TimeoutException {
    7.         // 1.建立连接
    8.         ConnectionFactory factory = new ConnectionFactory();
    9.         factory.setHost(Constants.HOST);
    10.         factory.setPort(Constants.PORT);
    11.         factory.setUsername(Constants.USERNAME);
    12.         factory.setPassword(Constants.PASSWORD);
    13.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    14.         Connection connection = factory.newConnection();
    15.         // 2.开启信道
    16.         Channel channel = connection.createChannel();
    17.         // 3.声明队列
    18.         channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
    19.         // 4.消费信息
    20.         Consumer consumer = new DefaultConsumer(channel) {
    21.             @Override
    22.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    23.                 System.out.println("消费者1消费: " + new String(body));
    24.             }
    25.         };
    26.         channel.basicConsume(Constants.FANOUT_QUEUE1,true,consumer);
    27.         // 消费者不用写资源释放
    28.     }
    29. }
    复制代码
    斲丧者2
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer2 {
    6.     public static void main(String[] args) throws IOException, TimeoutException {
    7.         // 1.建立连接
    8.         ConnectionFactory factory = new ConnectionFactory();
    9.         factory.setHost(Constants.HOST);
    10.         factory.setPort(Constants.PORT);
    11.         factory.setUsername(Constants.USERNAME);
    12.         factory.setPassword(Constants.PASSWORD);
    13.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    14.         Connection connection = factory.newConnection();
    15.         // 2.开启信道
    16.         Channel channel = connection.createChannel();
    17.         // 3.声明队列
    18.         channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
    19.         // 4.消费信息
    20.         Consumer consumer = new DefaultConsumer(channel) {
    21.             @Override
    22.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    23.                 System.out.println("消费者2消费: " + new String(body));
    24.             }
    25.         };
    26.         channel.basicConsume(Constants.FANOUT_QUEUE2,true,consumer);
    27.         // 消费者不用写资源释放
    28.     }
    29. }
    复制代码
  • 路由模式
    生产者
    1. import com.rabbitmq.client.BuiltinExchangeType;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import rabbitmq.constants.Constants;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. public class Producer {
    9.     public static void main(String[] args) throws IOException, TimeoutException {
    10.         // 1.建立连接
    11.         ConnectionFactory factory = new ConnectionFactory();
    12.         factory.setHost(Constants.HOST);
    13.         factory.setPort(Constants.PORT);
    14.         factory.setUsername(Constants.USERNAME);
    15.         factory.setPassword(Constants.PASSWORD);
    16.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    17.         Connection connection = factory.newConnection();
    18.         // 2.创建信道
    19.         Channel channel = connection.createChannel();
    20.         // 3.声明交换机
    21.         channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);
    22.         // 4.声明队列
    23.         channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
    24.         channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
    25.         // 5.关联交换机
    26.         channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a",null);
    27.         channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a",null);
    28.         channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b",null);
    29.         channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c",null);
    30.         // 6.发送消息
    31.         String msg = "hello direct, my routingKey is a...";
    32.         channel.basicPublish(Constants.DIRECT_EXCHANGE,"a",null,msg.getBytes());
    33.         msg = "hello direct, my routingKey is b...";
    34.         channel.basicPublish(Constants.DIRECT_EXCHANGE,"b",null,msg.getBytes());
    35.         msg = "hello direct, my routingKey is c...";
    36.         channel.basicPublish(Constants.DIRECT_EXCHANGE,"c",null,msg.getBytes());
    37.         // 7.释放资源
    38.         channel.close();
    39.         connection.close();
    40.         System.out.println("消息发送完毕~");
    41.     }
    42. }
    复制代码
    运行上述代码,并查看客户端可以看到互换机和队列的绑定关系,右侧是两个队列收到的消息

    斲丧者1
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer1 {
    6.     public static void main(String[] args) throws IOException, TimeoutException {
    7.         // 1.建立连接
    8.         ConnectionFactory factory = new ConnectionFactory();
    9.         factory.setHost(Constants.HOST);
    10.         factory.setPort(Constants.PORT);
    11.         factory.setUsername(Constants.USERNAME);
    12.         factory.setPassword(Constants.PASSWORD);
    13.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    14.         Connection connection = factory.newConnection();
    15.         // 2.开启信道
    16.         Channel channel = connection.createChannel();
    17.         // 3.声明队列
    18.         channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
    19.         // 4.消费信息
    20.         Consumer consumer = new DefaultConsumer(channel) {
    21.             @Override
    22.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    23.                 // 打印接收消息
    24.                 System.out.println("接收到消息: " + new String(body));
    25.             }
    26.         };
    27.         channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);
    28.         // 消费者不需要释放资源,持续消费
    29.     }
    30. }
    复制代码
    斲丧者2
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer2 {
    6.     public static void main(String[] args) throws IOException, TimeoutException {
    7.         // 1.建立连接
    8.         ConnectionFactory factory = new ConnectionFactory();
    9.         factory.setHost(Constants.HOST);
    10.         factory.setPort(Constants.PORT);
    11.         factory.setUsername(Constants.USERNAME);
    12.         factory.setPassword(Constants.PASSWORD);
    13.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    14.         Connection connection = factory.newConnection();
    15.         // 2.开启信道
    16.         Channel channel = connection.createChannel();
    17.         // 3.声明队列
    18.         channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
    19.         // 4.消费信息
    20.         Consumer consumer = new DefaultConsumer(channel) {
    21.             @Override
    22.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    23.                 // 打印接收消息
    24.                 System.out.println("接收到消息: " + new String(body));
    25.             }
    26.         };
    27.         channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);
    28.         // 消费者不需要释放资源,持续消费
    29.     }
    30. }
    复制代码
    运行结果

  • 通配符模式
    生产者
    1. import com.rabbitmq.client.BuiltinExchangeType;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import rabbitmq.constants.Constants;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. public class Producer {
    9.     public static void main(String[] args) throws IOException, TimeoutException {
    10.         // 1.建立连接
    11.         ConnectionFactory factory = new ConnectionFactory();
    12.         factory.setHost(Constants.HOST);
    13.         factory.setPort(Constants.PORT);
    14.         factory.setUsername(Constants.USERNAME);
    15.         factory.setPassword(Constants.PASSWORD);
    16.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    17.         Connection connection = factory.newConnection();
    18.         // 2.创建信道
    19.         Channel channel = connection.createChannel();
    20.         // 3.声明交换机
    21.         channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);
    22.         // 4.声明队列
    23.         channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
    24.         channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
    25.         // 5.关联交换机
    26.         channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*",null);
    27.         channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b",null);
    28.         channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#",null);
    29.         // 6.发送消息
    30.         String msg = "hello topic, my routingKey is '*.a.*'...";
    31.         channel.basicPublish(Constants.TOPIC_EXCHANGE,"ee.a.c",null,msg.getBytes()); // 满足Q1
    32.         msg = "hello topic, my routingKey is '*.*.b'...";
    33.         channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b",null,msg.getBytes()); // 满足Q1/Q2
    34.         msg = "hello topic, my routingKey 'is c.#'...";
    35.         channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.dd",null,msg.getBytes()); // 满足Q2
    36.         // 7.释放资源
    37.         channel.close();
    38.         connection.close();
    39.         System.out.println("消息发送完毕~");
    40.     }
    41. }
    复制代码
    运行生产者后,客户端向队列发送消息情况,互换机和队列关系情况如下

    斲丧者1
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer1 {
    6.     public static void main(String[] args) throws IOException, TimeoutException {
    7.         // 1.建立连接
    8.         ConnectionFactory factory = new ConnectionFactory();
    9.         factory.setHost(Constants.HOST);
    10.         factory.setPort(Constants.PORT);
    11.         factory.setUsername(Constants.USERNAME);
    12.         factory.setPassword(Constants.PASSWORD);
    13.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    14.         Connection connection = factory.newConnection();
    15.         // 2.开启信道
    16.         Channel channel = connection.createChannel();
    17.         // 3.声明队列
    18.         channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
    19.         // 4.消费信息
    20.         Consumer consumer = new DefaultConsumer(channel) {
    21.             @Override
    22.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    23.                 // 打印接收消息
    24.                 System.out.println("接收到消息: " + new String(body));
    25.             }
    26.         };
    27.         channel.basicConsume(Constants.TOPIC_QUEUE1,true,consumer);
    28.         // 消费者不需要释放资源,持续消费
    29.     }
    30. }
    复制代码
    斲丧者2
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer2 {
    6.     public static void main(String[] args) throws IOException, TimeoutException {
    7.         // 1.建立连接
    8.         ConnectionFactory factory = new ConnectionFactory();
    9.         factory.setHost(Constants.HOST);
    10.         factory.setPort(Constants.PORT);
    11.         factory.setUsername(Constants.USERNAME);
    12.         factory.setPassword(Constants.PASSWORD);
    13.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    14.         Connection connection = factory.newConnection();
    15.         // 2.开启信道
    16.         Channel channel = connection.createChannel();
    17.         // 3.声明队列
    18.         channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
    19.         // 4.消费信息
    20.         Consumer consumer = new DefaultConsumer(channel) {
    21.             @Override
    22.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    23.                 // 打印接收消息
    24.                 System.out.println("接收到消息: " + new String(body));
    25.             }
    26.         };
    27.         channel.basicConsume(Constants.TOPIC_QUEUE2,true,consumer);
    28.         // 消费者不需要释放资源,持续消费
    29.     }
    30. }
    复制代码
    运行情况如下

  • RPC 模式
    客户端(Client)
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.UUID;
    5. import java.util.concurrent.ArrayBlockingQueue;
    6. import java.util.concurrent.BlockingQueue;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * RPC client
    10. * 1. 发送请求
    11. * 2. 接收响应
    12. */
    13. public class RpcClient {
    14.     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    15.         // 1.建立连接
    16.         ConnectionFactory factory = new ConnectionFactory();
    17.         factory.setHost(Constants.HOST);
    18.         factory.setPort(Constants.PORT);
    19.         factory.setUsername(Constants.USERNAME);
    20.         factory.setPassword(Constants.PASSWORD);
    21.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    22.         Connection connection = factory.newConnection();
    23.         // 2.开启信道
    24.         Channel channel = connection.createChannel();
    25.         // 3.使用内置交换机
    26.         // 4.声明队列
    27.         channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
    28.         channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);
    29.         // 5.发送请求
    30.         String msg = "hello rpc...";
    31.         // 设置请求的唯一标识
    32.         String correlationID = UUID.randomUUID().toString();
    33.         // 设置请求的相关属性
    34.         AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
    35.                 .replyTo(Constants.RPC_RESPONSE_QUEUE)
    36.                 .correlationId(correlationID)
    37.                 .build();
    38.         channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());
    39.         // 5.接收响应
    40.         // 使用阻塞队列来存储响应信息
    41.         final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
    42.         Consumer consumer = new DefaultConsumer(channel) {
    43.             @Override
    44.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    45.                 String resMeg = new String(body);
    46.                 System.out.println("接收到回调消息:" + resMeg);
    47.                 if (correlationID.equals(properties.getCorrelationId())) {
    48.                     // 如果两个correlationID相等,就表示是该请求返回的响应
    49.                     response.offer(resMeg);
    50.                 }
    51.             }
    52.         };
    53.         channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);
    54.         String result = response.take();
    55.         System.out.println("[RPC Client 响应结果]" + result);
    56.     }
    57. }
    复制代码
    服务端(Server)
    1. import com.rabbitmq.client.*;
    2. import rabbitmq.constants.Constants;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6. * RPC server
    7. * 1. 接收请求
    8. * 2. 发送发送响应
    9. */
    10. public class RpcServer {
    11.     public static void main(String[] args) throws IOException, TimeoutException {
    12.         // 1.建立连接
    13.         ConnectionFactory factory = new ConnectionFactory();
    14.         factory.setHost(Constants.HOST);
    15.         factory.setPort(Constants.PORT);
    16.         factory.setUsername(Constants.USERNAME);
    17.         factory.setPassword(Constants.PASSWORD);
    18.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    19.         Connection connection = factory.newConnection();
    20.         // 2.开启信道
    21.         Channel channel = connection.createChannel();
    22.         // 3. 接收请求
    23.         channel.basicQos(1);
    24.         Consumer consumer = new DefaultConsumer(channel) {
    25.             @Override
    26.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    27.                 String request = new String(body);
    28.                 System.out.println("接收到请求:" + request);
    29.                 // 针对服务器构造响应内容进行响应
    30.                 String response = "针对请求" + request +", 响应成功.";
    31.                 AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
    32.                         .correlationId(properties.getCorrelationId())
    33.                         .build();
    34.                 channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,props,response.getBytes());
    35.                 // 手动确认
    36.                 channel.basicAck(envelope.getDeliveryTag(),false);
    37.             }
    38.         };
    39.         channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);
    40.     }
    41. }
    复制代码
    运行结果

  • 发布确认模式
    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.ConfirmListener;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import rabbitmq.constants.Constants;
    6. import java.io.IOException;
    7. import java.util.Collections;
    8. import java.util.SortedSet;
    9. import java.util.TreeSet;
    10. public class PublisherConfirms {
    11.     private static final int MESSAGE_COUNT = 2000;
    12.     public static Connection createConnection() throws Exception {
    13.         ConnectionFactory factory = new ConnectionFactory();
    14.         factory.setHost(Constants.HOST);
    15.         factory.setPort(Constants.PORT);
    16.         factory.setUsername(Constants.USERNAME);
    17.         factory.setPassword(Constants.PASSWORD);
    18.         factory.setVirtualHost(Constants.VIRTUAL_HOST);
    19.         return factory.newConnection();
    20.     }
    21.     public static void main(String[] args) throws Exception {
    22.         // 单独确认
    23.         publishingMessagesIndividually();
    24.         // 批量确认
    25.         publishingMessagesInBatches();
    26.         // 异步确认
    27.         handlingPublisherConfirmsAsynchronously();
    28.     }
    29.     /**
    30.      * 异步确认
    31.      */
    32.     private static void handlingPublisherConfirmsAsynchronously() throws Exception {
    33.         try (Connection connection = createConnection()) {
    34.             // 1.开启信道
    35.             Channel channel = connection.createChannel();
    36.             // 2.设置为confirm模式
    37.             channel.confirmSelect();
    38.             // 3.使用内置交换机,声明队列
    39.             channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);
    40.             // 4.监听confirm
    41.             long startTime = System.currentTimeMillis();
    42.             // 创建有序集合,元素按照自然排序,用来存储未被确认的confirm消息序号
    43.             SortedSet<Long> confirmSetNo = Collections.synchronizedSortedSet(new TreeSet<>());
    44.             channel.addConfirmListener(new ConfirmListener() {
    45.                 @Override
    46.                 public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    47.                     if (multiple) {
    48.                         // 如果开启了批量确认,则批量确认之后要清空有序集合已经确认好的元素
    49.                         confirmSetNo.headSet(deliveryTag+1).clear();
    50.                     } else {
    51.                         // 如果没有开,那么只需要清除当前的这一个
    52.                         confirmSetNo.remove(deliveryTag);
    53.                     }
    54.                 }
    55.                 @Override
    56.                 public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    57.                     if (multiple) {
    58.                         // 如果开启了批量确认,则批量确认之后要清空有序集合已经确认好的元素
    59.                         confirmSetNo.headSet(deliveryTag+1).clear();
    60.                     } else {
    61.                         // 如果没有开,那么只需要清除当前的这一个
    62.                         confirmSetNo.remove(deliveryTag);
    63.                     }
    64.                     // 根据业务进行改进(重新发送消息)
    65.                 }
    66.             });
    67.             // 5.发送消息
    68.             for (int i = 0; i < MESSAGE_COUNT; i++) {
    69.                 String msg = "hello publisher confirms " + i;
    70.                 // 拿到当前要发送的序号
    71.                 long seqNo = channel.getNextPublishSeqNo();
    72.                 channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
    73.                 // 还未确认,放入集合
    74.                 confirmSetNo.add(seqNo);
    75.             }
    76.             while (!confirmSetNo.isEmpty()) {
    77.                 Thread.sleep(20);
    78.             }
    79.             long endTime = System.currentTimeMillis();
    80.             System.out.printf("异步确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, endTime - startTime);
    81.         }
    82.     }
    83.     /**
    84.      * 批量确认<br/>
    85.      * 一次性确认一批
    86.      */
    87.     private static void publishingMessagesInBatches() throws Exception {
    88.         try (Connection connection = createConnection()) {
    89.             // 1.开启信道
    90.             Channel channel = connection.createChannel();
    91.             // 2.设置confirm模式
    92.             channel.confirmSelect();
    93.             // 3.使用内部交换机,声明队列
    94.             channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
    95.             // 4.发送消息并返回确认
    96.             int batchSize = 100;
    97.             int outstandingMessages = 0;
    98.             long startTime = System.currentTimeMillis();
    99.             for (int i = 0; i < MESSAGE_COUNT; i++) {
    100.                 String msg = "hello publisher confirms " + i;
    101.                 channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
    102.                 outstandingMessages++;
    103.                 // 批量确认
    104.                 if (outstandingMessages == batchSize) {
    105.                     channel.waitForConfirms(5000);
    106.                     outstandingMessages = 0;
    107.                 }
    108.             }
    109.             // 消息发送完毕之后如果有遗漏,进行确认
    110.             if (outstandingMessages != 0) {
    111.                 channel.waitForConfirms(5000);
    112.             }
    113.             long endTime = System.currentTimeMillis();
    114.             System.out.printf("批量确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, endTime - startTime);
    115.         }
    116.     }
    117.     /**
    118.      * 单独确认<br/>
    119.      * 每发送一个就要进行确认
    120.      */
    121.     private static void publishingMessagesIndividually() throws Exception {
    122.         try (Connection connection = createConnection()) {
    123.             // 1.开启信道
    124.             Channel channel = connection.createChannel();
    125.             // 2.设置confirm模式
    126.             channel.confirmSelect();
    127.             // 3.使用内部交换机,声明队列
    128.             channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
    129.             // 4.发送消息并返回确认
    130.             long startTime = System.currentTimeMillis();
    131.             for (int i = 0; i < MESSAGE_COUNT; i++) {
    132.                 String msg = "hello publish confirms " + i;
    133.                 channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
    134.                 // 等待确认
    135.                 channel.waitForConfirms(5000);
    136.             }
    137.             long endTime = System.cur rentTimeMillis();
    138.             System.out.printf("单独确认策略,消息条数:%d,耗时:%d ms \n", MESSAGE_COUNT, endTime - startTime);
    139.         }
    140.     }
    141. }
    复制代码
    三种模式时间对比

SpringBoot集成RabbitMQ

引入依赖
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6.     <groupId>org.springframework.boot</groupId>
  7.     <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
复制代码
引入yml设置信息
  1. spring:
  2.   application:
  3.     name: rabbitmq-springboot
  4.   # 配置RabbitMQ基本信息
  5.   rabbitmq:
  6.     host: 47.122.121.191
  7.     port: 5672
  8.     username: xxxx
  9.     password: xxxx
  10.     virtual-host: new virtual
  11. # 日志日期格式
  12. logging:
  13.   pattern:
  14.     dateformat: yyyy-MM-dd HH:mm:ss
复制代码


  • 工作队列
    生产者
    1.编写RabbitMQConfig
    1. // 创建队列交给spring管理
    2. @Configuration
    3. public class RabbitMQConfig {
    4.     @Bean("workQueue")
    5.     public Queue workQueue() {
    6.         return QueueBuilder.durable(Constants.WORK_QUEUE).build();
    7.     }
    8. }
    复制代码
    2.编写ProducerController
    1. @RestController
    2. @RequestMapping("/producer")
    3. public class ProducerController {
    4.     @Autowired
    5.     private RabbitTemplate rabbittemplate;
    6.     @RequestMapping("/work")
    7.     public String work() {
    8.         for (int i = 0; i < 10; i++) {
    9.             String msg = "hello spring amqp: work...";
    10.             // 使用内置交换机,发送信息
    11.             rabbittemplate.convertAndSend("", Constants.WORK_QUEUE,msg);
    12.         }
    13.         return "发送成功~";
    14.     }
    15. }
    复制代码
    斲丧者
    1.创建WorkListener
    1. @Component
    2. public class WorkListener {
    3.         // 消费者1
    4.     @RabbitListener(queues = Constants.WORK_QUEUE)
    5.     public void queueListener1(Message message) {
    6.         // 参数讲解
    7.         // 1.String 只包含消息内容
    8.         // 2.Message 包含所有的信息,包括消息ID、内容、队列信息等
    9.         // 3.Channel RabbitMQ信道的所有信息,高级用法会用到,如手动确认
    10.         System.out.println("[ listener1 "+Constants.WORK_QUEUE+"] 接收到消息:"  + message);
    11.     }
    12.         // 消费者2
    13.     @RabbitListener(queues = Constants.WORK_QUEUE)
    14.     public void queueListener2(Message message) {
    15.         System.out.println("[ listener2 "+Constants.WORK_QUEUE+"] 接收到消息:"  + message);
    16.     }
    17. }
    复制代码
    运行结果

  • 发布/订阅模式
    生产者
    1.编写RabbitMQConfig
    1. // RabbitMQConfig 是用来创建队列和交换机 以及关联队列和交换机的一个工具类
    2. @Bean("fanoutQueue1")
    3. public Queue fanoutQueue1() {
    4.     return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
    5. }
    6. @Bean("fanoutQueue2")
    7. public Queue fanoutQueue2() {
    8.     return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
    9. }
    10. @Bean("fanoutExchange")
    11. public FanoutExchange fanoutExchange() {
    12.     return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
    13. }
    14. // 绑定交换机和队列
    15. @Bean
    16. public Binding fanoutBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) {
    17.     return BindingBuilder.bind(queue).to(fanoutExchange);
    18. }
    19. @Bean
    20. public Binding fanoutBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue2") Queue queue) {
    21.     return BindingBuilder.bind(queue).to(fanoutExchange);
    22. }
    复制代码
    2.编写ProducerController
    1. @RequestMapping("/fanout")
    2. public String fanout() {
    3.     rabbittemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp:fanout...");
    4.     return "发送成功~";
    5. }
    复制代码
    运行之后客户端的绑定关系图和所创建的队列如下

    斲丧者
    1.编写FanoutListener
    1. @Component
    2. public class FanoutListener {
    3.     // 消费者1
    4.     @RabbitListener(queues = Constants.FANOUT_QUEUE1)
    5.     public void queueListener1(String message) {
    6.         System.out.println("listener1 [" + Constants.FANOUT_QUEUE1 + "] 接收到消息:" + message);
    7.     }
    8.     // 消费者2
    9.     @RabbitListener(queues = Constants.FANOUT_QUEUE2)
    10.     public void queueListener2(String message) {
    11.         System.out.println("listener2 [" + Constants.FANOUT_QUEUE2 +"] 接收到消息:" + message);
    12.     }
    13. }
    复制代码
    运行结果

  • 路由模式
    生产者
    1.编写RabbitMQConfig
    1. @Bean("directQueue1")
    2. public Queue directQueue1() {
    3.     return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
    4. }
    5. @Bean("directQueue2")
    6. public Queue directQueue2() {
    7.     return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
    8. }
    9. @Bean("directExchange")
    10. public DirectExchange directExchange() {
    11.     return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
    12. }
    13. @Bean
    14. public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) {
    15.     return BindingBuilder.bind(queue).to(directExchange).with("orange");
    16. }
    17. @Bean
    18. public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
    19.     return BindingBuilder.bind(queue).to(directExchange).with("black");
    20. }
    21. @Bean
    22. public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
    23.     return BindingBuilder.bind(queue).to(directExchange).with("orange");
    24. }
    复制代码
    2.编写ProducerController
    1. @RequestMapping("/direct")
    2. public String direct(String routingKey) {
    3.     rabbittemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"hello spring amqp:direct ," +
    4.                                   "my routingKey is " + routingKey);
    5.     return "发送成功~";
    6. }
    复制代码
    斲丧者
    1.编写FanoutListener
    1. @Component
    2. public class DirectListener {
    3.     @RabbitListener(queues = Constants.DIRECT_QUEUE1)
    4.     public void QueueListener1(String message) {
    5.         System.out.println("listener1 [" + Constants.DIRECT_QUEUE1 + "] 接收到消息:" + message);
    6.     }
    7.     @RabbitListener(queues = Constants.DIRECT_QUEUE2)
    8.     public void QueueListener2(String message) {
    9.         System.out.println("listener2 [" + Constants.DIRECT_QUEUE2 + "] 接收到消息:" + message);
    10.     }
    11. }
    复制代码
    当在浏览器url分别传参routingKey为orange和balck后,运行结果

  • 通配符模式
    生产者
    1.编写RabbitMQConfig
    1. // 通配符模式
    2. @Bean("topicQueue1")
    3. public Queue topicQueue1() {
    4.     return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
    5. }
    6. @Bean("topicQueue2")
    7. public Queue topicQueue2() {
    8.     return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
    9. }
    10. @Bean("topicExchange")
    11. public TopicExchange topicExchange() {
    12.     return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
    13. }
    14. @Bean("topicQueueBinding1")
    15. public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) {
    16.     return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
    17. }
    18. @Bean("topicQueueBinding2")
    19. public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
    20.     return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
    21. }
    22. @Bean("topicQueueBinding3")
    23. public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
    24.     return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
    25. }
    复制代码
    2.编写ProducerController
    1. @RequestMapping("/topic")
    2. public String topic(String routingKey) {
    3.     rabbittemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"hello spring amqp:topic ," +
    4.                                   "my routingKey is " + routingKey);
    5.     return "发送成功~";
    6. }
    复制代码
    斲丧者
    1.编写TopicListener
    1. @Component
    2. public class TopicListener {
    3.     @RabbitListener(queues = Constants.TOPIC_QUEUE1)
    4.     public void topicQueue1(String message) {
    5.         System.out.println("listener1 [" + Constants.TOPIC_QUEUE1 + "] 接收到消息:" + message);
    6.     }
    7.     @RabbitListener(queues = Constants.TOPIC_QUEUE2)
    8.     public void topicQueue2(String message) {
    9.         System.out.println("listener2 [" + Constants.TOPIC_QUEUE2 + "] 接收到消息:" + message);
    10.     }
    11. }
    复制代码
    当在浏览器url分别传参routingKey后,运行结果

发送json对象
1.编写RabbitMQConfig
  1. @Configuration
  2. public class RabbitMQConfig {
  3.     @Bean("orderQueue")
  4.     public Queue orderQueue() {
  5.         return QueueBuilder.durable("order.create").build();
  6.     }
  7.         // 自己创建rabbitTemplate  在rabbitTemplate中设置messageConverter(核心)
  8.     @Bean
  9.     public Jackson2JsonMessageConverter jsonMessageConverter () {
  10.         return new Jackson2JsonMessageConverter();
  11.     }
  12.     @Bean
  13.     // 这里的参数ConnectionFactory是自带的,Jackson2JsonMessageConverter是上面自己写的通过Bean交给的Spring
  14.     public RabbitTemplate rabbitTemplate (ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jsonMessageConverter) {
  15.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  16.         rabbitTemplate.setMessageConverter(jsonMessageConverter);
  17.         return rabbitTemplate;
  18.     }
  19. }
复制代码

  • 编写ProducerController
  1. // 提前编写好要传递的对象(orderInfo)
  2. @RequestMapping("/create2")
  3. public String create2() {
  4.     OrderInfo orderInfo = new OrderInfo();
  5.     String orderId = UUID.randomUUID().toString();
  6.     orderInfo.setName("商品" + new Random().nextInt(100));
  7.     orderInfo.setOrderId(orderId);
  8.     rabbitTemplate.convertAndSend("","order.create",orderInfo);
  9.     return "下单成功~";
  10. }
复制代码
查看客户端

接收json对象
1.编写ConsumerListener
  1. @Component
  2. @RabbitListener(queues = "order.create") // 将RabbitListener写在类上  要搭配@RabbitHandler使用
  3. public class OrderListener {
  4.     @RabbitHandler // RabbitHandler中的参数,可以根据接收的数据类型来映射函数,此函数接收String类型
  5.     public void handMessage(String message) {
  6.         System.out.println("接收到订单消息String:" + message);
  7.     }
  8.     // 接收OrderInfo类型
  9.     @RabbitHandler
  10.     // 此函数接收OrderInfo类型,但是OrderInfo属于json类型,也要和设置rabbitTemplate中的setmessageConverter(核心)
  11.     public void handMessage2(OrderInfo message) {
  12.         System.out.println("接收到订单消息OrderInfo:" + message);
  13.     }
  14. }
复制代码
2.编写RabbitMQConfig
  1. @Configuration
  2. public class RabbitMQConfig {
  3.     @Bean
  4.     public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
  5.         return new Jackson2JsonMessageConverter();
  6.     }
  7.     @Bean
  8.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jsonMessageConverter) {
  9.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  10.         rabbitTemplate.setMessageConverter(jsonMessageConverter);
  11.         return rabbitTemplate;
  12.     }
  13. }
复制代码
分别发送String类型和OrderInfo类型实验的是差别的方法

RabbitMQ高级特性

消息确认(手动确认)



  • 肯定确认
    Channel.basicAck(long deliveryTag), boolean multiple
    RabbitMQ已经知道该消息,而且成功的处理消息,可以将其丢掉了
    deliveryTag: 消息唯一标识,是一个自增64位长整型
    multiple:是否批量确认
  • 单次否定确认
    Channel.basicReject(long deliveryTag, boolean requeue)
    斲丧者客户端可以拒绝此消息
    deliveryTag: 消息唯一标识,是一个自增64位长整型
    requeue:假如位true,表现重新发送,假如为false,表现从队列中移除
  • 批量否定确认
    Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
    斲丧者客户端批量拒绝消息
    deliveryTag: 消息唯一标识,是一个自增64位长整型
    requeue:假如位true,表现重新发送,假如为false,表现从队列中移除
    multiple:为true表现拒绝deliveryTag之前所有的消息
SpringBoot确认机制

  1. 1.NONE : 自动确认
  2. 不管消费者是否收到消息,只要发送给消费者就直接删除
  3. 2.AUTO : 默认确认
  4. 如果消费者处理成功时,会自动确认删除消息,如果出现异常就不会确认消息不会进行删除
  5. 3.MANUAL : 手动确认
  6. 需要消费者手动调用basicAck方法来确认消息,如果未被确认RabbitMQ就会认为消息未被成功处理,且会重新投递该消息
复制代码
  1. 主要流程:
  2. 1.配置确认机制(自动确认/手动确认)
  3. 2.生产者发送消息
  4. 3.消费端逻辑
  5. 4.测试
复制代码
  1. # 配置确认机制
  2. spring:
  3.   rabbitmq:
  4.     host: 47.122.121.191
  5.     port: 5672
  6.     username: xxxx
  7.     password: xxxx
  8.     virtual-host: extension
  9.     # 配置接收确认
  10.     listener:
  11.       simple:
  12. #        acknowledge-mode: none   #自动确认
  13. #        acknowledge-mode: auto   #成功自动确认,异常不会确认(默认)
  14.         acknowledge-mode: manual  #手动确认
复制代码
持久性

RabbitMQ可靠性包管机制之一
RabbitMQ的持久化是分为三个部门,分别是互换机持久化、队列持久化、消息持久化,也就是说要完整的包管RabbitMQ的持久化特性,就要包管这三个持久化的开启,但是不能百分百包管消息不丢失,这三个持久化只能包管RabbitMQ的服务器不进行消息的丢失,不包管生产者和斲丧者传输到服务区过程中的丢失情况


  • 互换机持久化
    互换机持久化是将durable设置为true,纵然是不去主动设置,他的底层代码也默认是持久化的


  • 队列持久化
    队列持久化也是和互换机持久化一样,通过durable来设置的,假如要设置为持久化就是用durable假如要设置非持久化,就是用nonDurable


  • 消息持久化
    在解说消息持久化之前,先明白一点,消息持久化是跟着队列持久化而来的,假如消息持久化开启了,但是队列持久化并未开启,服务器重启后因为队列的丢失消息也会跟着丢失,假如队列持久化开启了消息持久化没有开启,服务器重启后队列还在但是消息就会丢失,以是单独开启消息持久化是没故意义的,要再队列持久化基础上再开启消息持久化,而消息持久化默认也是开启的


发送方确认

发送方确认是生产者到RabbitMQ服务器之间的消息确认模式,确保了生产者到服务器的可靠性
发送方确认分为两种:
1.confirm模式 : 生产者到exchange之间的作用模式
2.return模式 : exchange到queue之间的作用模式
这两种模式不是互斥的,可以单独使用也可以联合使用


  • confirm模式
    confirm模式主要是通过设置rabbitTemplate中的setConfirmCallback方法,在设置中传入参数new RabbitTemplate.ConfirmCallback()重写此中的confirm方法,confirm方法中根据ack的参数来确保是否接收大概丢失,分别做处理
    步调1:设置发送确认yaml文件
    1. spring:
    2.   rabbitmq:
    3.     # 配置发送确认
    4.     publisher-confirm-type: correlated
    复制代码
    步调2:自界说rabbitTemplate重写confirm方法
    为什么要自界说?
    再写入消息的时候,我们会用到rabbitTemplate,这个是Spring为我们提供的,我们是直接注入进去的,以是只要用到了rabbit Template那就是同一个Bean对象,为了不影响其他地方对rabbit Template的使用,我们自己重新写一个之后设置他的setConfirmCallback即可。
    1. @Configuration
    2. public class RabbitTemplateConfig {
    3.     // 单独写一个没做任何处理的rabbitTemplate方法,不需要使用发送方确认的就是用这个rabbitTemplate,因为我们自定义了
    4.     // rabbitTemplate方法之后就会自动覆盖spring为我们提供的方法。
    5.     @Bean("rabbitTemplate")
    6.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    7.         return new RabbitTemplate(connectionFactory);
    8.     }
    9.     // 自定义rabbitTemplate命名为confirmRabbitTemplate通过Bean交给spring管理
    10.     @Bean("confirmRabbitTemplate")
    11.     public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
    12.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    13.         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    14.             @Override
    15.             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    16.                 System.out.println("执行confirm方法");
    17.                 if (ack) {
    18.                     System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());
    19.                 } else {
    20.                     System.out.printf("未收到消息, 消息ID: %s , cause : %s \n",correlationData == null ? null : correlationData.getId(),cause);
    21.                 }
    22.             }
    23.         });
    24.         return rabbitTemplate;
    25.     }
    26. }
    复制代码
    步调3:编写ProducerController
    1. @RestController
    2. @RequestMapping("/producer")
    3. public class ProducerController {
    4.     // 注入的是不同的rabbitTemplate,根据名字来区分
    5.     @Resource
    6.     private RabbitTemplate rabbitTemplate;
    7.     @Resource
    8.     private RabbitTemplate confirmRabbitTemplate;
    9.    
    10.     // 不需要发送确认的
    11.     @RequestMapping("/ack")
    12.     public String ack() {
    13.         // 创建一个Message对象设置为持久化或非持久化
    14.         Message message = new Message("consumer ack mode test...".getBytes(),new MessageProperties());
    15.         //message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化
    16.         message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);// 非持久化
    17.         rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack",message);
    18.         return "消息发送成功~";
    19.     }
    20.     // 需要发送确认的
    21.     @RequestMapping("/confirm")
    22.     public String confirm() {
    23.         CorrelationData correlationData = new CorrelationData("1");
    24.         confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","hello confirm",correlationData);
    25.         return "消息发送成功~";
    26.     }
    27. }
    复制代码
    注意!!:发送确认是生产者到互换机之间的发送确认,他只能包管到了互换机互换性能正常辨认到来了ack就是true,但是假如RoutingKey和BindingKey不匹配,他是不会将ack为false的,这种情况就要看后续讲的return退回模式!
  • return 退回模式
    互换机在接收到消息之后,未能找到与RoutingKey相同的BiindingKey,就会实验return退回模式的回调函数
    步调1:设置发送确认yaml文件
    1. spring:
    2.   rabbitmq:
    3.     # 配置发送确认
    4.     publisher-confirm-type: correlated
    复制代码
    步调2:自界说rabbitTemplate重写returnedMessage方法
    1. @Configuration
    2. public class RabbitTemplateConfig {
    3.         // 单独写一个没做任何处理的rabbitTemplate方法,不需要使用发送方确认的就是用这个rabbitTemplate,因为我们自定义了
    4.     // rabbitTemplate方法之后就会自动覆盖spring为我们提供的方法。
    5.     @Bean("rabbitTemplate")
    6.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    7.         return new RabbitTemplate(connectionFactory);
    8.     }
    9.     // 自定义rabbitTemplate命名为confirmRabbitTemplate通过Bean交给spring管理
    10.     @Bean("confirmRabbitTemplate")
    11.     public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
    12.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    13.         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    14.             @Override
    15.             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    16.                 System.out.println("执行confirm方法");
    17.                 if (ack) {
    18.                     System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());
    19.                 } else {
    20.                     System.out.printf("未收到消息, 消息ID: %s , cause : %s \n",correlationData == null ? null : correlationData.getId(),cause);
    21.                 }
    22.             }
    23.         });
    24.         // 消息被退回时的回调方法(此方法这里和重写的confirm方法写在了一个定义rabbitTemplate中,并不是一定要这样写,他俩是可以单独开来的)
    25.         rabbitTemplate.setMandatory(true);// 不要忘setMandatory(true)
    26.         rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    27.             @Override
    28.             public void returnedMessage(ReturnedMessage returned) {
    29.                 System.out.println("消息退回: " + returned);
    30.             }
    31.         });
    32.         return rabbitTemplate;
    33.     }
    34. }
    复制代码
持久化总结

如何确保可靠性传输?

1.Producer -> Broker 发送方确认
​ 1)producer -> exchange : confirm 模式
​ 2)exchange -> queue: returns 模式
​ 3)队列溢出:死信等
2.Broker 持久化
​ 1)互换机持久化
​ 2)队列持久化
​ 3)消息持久化
3.Broker -> Consumer 消息确认
​ 1)主动确认
​ 2)手动确认
重试机制

在消息通报过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息处理失败。为了办理这些问题, RabbitMQ 提供了重试机制, 答应消息在处理失败后重新发送。但假如是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数
1.设置信息
  1. spring:
  2. rabbitmq:
  3.   listener:
  4.     simple:
  5.      acknowledge-mode: auto #消息接收确认
  6.     retry:
  7.      enabled: true # 开启消费者失败重试
  8.      initial-interval: 5000ms # 初始失败等待时⻓为5秒
  9.      max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次)
复制代码
注意:

  • ⾃动确认模式下: 程序逻辑非常, 多次重试还是失败, 消息就会被⾃动确认, 那么消息就丢失了
  • 手动确认模式下: 程序逻辑非常, 多次重试消息依然处理失败, 无法被确认。
TTL

TTL就是过期时间,也就是消息的寿命存活的时间,假如消息到达过期时间,还没有被斲丧,就会直接被扫除掉,消息的TTL牵扯到两个TTL
1.消息的TTL :队列还存在,消息的TTL时间到了,消息会消失
  1. @RequestMapping("/ttl")
  2. public String ttl() {
  3.     MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
  4.         @Override
  5.         public Message postProcessMessage(Message message) throws AmqpException {
  6.             message.getMessageProperties().setExpiration("10000"); // 10s
  7.             return message;
  8.         }
  9.     };
  10.     rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","hello ttl",messagePostProcessor);
  11.     return "消息发送成功~";
  12. }
复制代码
2.队列的TTL : 队列TTL时间到了里面存放的消息也会随之消失
  1. // 设置ttl队列
  2. @Bean("ttlQueue2")
  3. public Queue ttlQueue2() {
  4.     return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20_000).build(); // 20s
  5. }
复制代码
总结:假如同时设置了各自的TTL,那么取短时间作为过期时间缘故原由就是,队列时间短带着消息一起消失,消息时间短消息到时间一样消失
死信队列

死信:简单来说就是种种缘故原由无法被斲丧者斲丧的信息。当消息变成死信之后就会被重新发送到另一个互换机中,这个互换机就是DLX绑定DLX的队列就是死信队列。

变成死信的几种情况:
1.消息被拒绝:Basic.Reject/Basic.Nack,而且设置requeue为false
  1. @Component
  2. public class DlListener {
  3.     @RabbitListener(queues = Constants.NORMAL_QUEUE)
  4.     public void handMessage(Message message, Channel channel) throws Exception {
  5.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6.         try {
  7.             // 消费者逻辑
  8.             System.out.printf("接收到消息:%s, deliveryTag %d \n",
  9.                               new String(message.getBody(), "utf-8"), deliveryTag);
  10.             // 进行业务逻辑处理
  11.             System.out.println("业务逻辑处理...");
  12.             int num = 3 / 0;
  13.             System.out.println("业务处理完成");
  14.             // 肯定确认
  15.             channel.basicAck(deliveryTag, false);
  16.         } catch (Exception e) {
  17.             // 否定确认
  18.             channel.basicNack(deliveryTag, false, false); // 设置不让入队,变成死信
  19.         }
  20.     }
  21.     @RabbitListener(queues = Constants.DL_QUEUE)
  22.     public void dlHandMessage(Message message) {
  23.         // 死信消费者逻辑
  24.         System.out.printf("[" + Constants.DL_QUEUE + "] 接收到消息:%s, deliveryTag: %d \n",
  25.                           new String(message.getBody()), message.getMessageProperties().getDeliveryTag());
  26.     }
  27. }
复制代码
2.消息过期
  1. // 死信
  2. // 正常交换机和队列
  3. @Bean("normalQueue")
  4. public Queue normalQueue() {
  5.     return QueueBuilder.durable(Constants.NORMAL_QUEUE)
  6.         .deadLetterExchange(Constants.DL_EXCHANGE)// 配置和死信交换机的关联
  7.         .deadLetterRoutingKey("dl")          // 配置和死信交换机的RoutingKey
  8.         .ttl(10000)                                                   // 配置ttl消息过期后直接回关联到死信队列中
  9.         .maxLength(10)                      // 队列长度
  10.         .build();
  11. }
  12. @Bean("normalExchange")
  13. public DirectExchange normalExchange() {
  14.     return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
  15. }
  16. @Bean("normalBinding")
  17. public Binding normalBinding(@Qualifier("normalExchange") DirectExchange directExchange,@Qualifier("normalQueue") Queue queue) {
  18.     return BindingBuilder.bind(queue).to(directExchange).with("normal");
  19. }
  20. // 死信交换机和队列
  21. @Bean("dlQueue")
  22. public Queue dlQueue() {
  23.     return QueueBuilder.durable(Constants.DL_QUEUE).build();
  24. }
  25. @Bean("dlExchange")
  26. public DirectExchange dlExchange() {
  27.     return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
  28. }
  29. @Bean("dlBinding")
  30. public Binding dlBinding(@Qualifier("dlExchange") DirectExchange directExchange,@Qualifier("dlQueue") Queue queue) {
  31.     return BindingBuilder.bind(queue).to(directExchange).with("dl");
  32. }
复制代码
3.队列到达最大长度 : 代码如上消息过期
延迟队列

延迟队列指的是,消息发送之后并不想让斲丧者立即拿到消息,而是等待特定时间后才让斲丧者进行斲丧,可惜的是RabbitMQ并没有直接支持延迟队列,但是我们可以通过TTL+死信的模式来拟定一个延迟队列,当TTL时间到了之后,用户直接斲丧死信队列即可,起到一个延时的结果
   延迟队列通过TTL+死信方式实现
  1.设置队列的TTL
  2.设置消息的TTL
  当后来的消息过期时间早于,先到的消息时,会有延迟,第二个时间短的会和第一个时间长的一起出队列,而设置队列的TTL不存在这个情况
  延迟队列插件

插件毗连:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
  1. // 延迟队列创建Bean队列和交换机代码
  2. @Configuration
  3. public class DelayConfig {
  4.     @Bean("delayQueue")
  5.     public Queue delayQueue() {
  6.         return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
  7.     }
  8.     @Bean("delayExchange")
  9.     public DirectExchange delayExchange() {
  10.         return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build(); // 延迟交换机
  11.     }
  12.     @Bean("delayBinding")
  13.     public Binding delayBinding(@Qualifier("delayExchange") DirectExchange delayExchange,@Qualifier("delayQueue") Queue queue) {
  14.         return BindingBuilder.bind(queue).to(delayExchange).with("delay");
  15.     }
  16. }
复制代码
  1. @RequestMapping("/delay")
  2. public String delay() {
  3.     // 发送消息1(30s)
  4.     MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
  5.         @Override
  6.         public Message postProcessMessage(Message message) throws AmqpException {
  7.             message.getMessageProperties().setDelayLong(30_000L);
  8.             return message;
  9.         }
  10.     };
  11.     rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","hello delay",messagePostProcessor);
  12.     // 发送消息2(10s)
  13.     MessagePostProcessor messagePostProcessor1 = new MessagePostProcessor() {
  14.         @Override
  15.         public Message postProcessMessage(Message message) throws AmqpException {
  16.             message.getMessageProperties().setDelayLong(10_000L);
  17.             return message;
  18.         }
  19.     };
  20.     rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","hello delay",messagePostProcessor1);
  21.     System.out.printf("%tc 消息发送成功 \n",new Date());
  22.     return "消息发送成功~";
  23. }
复制代码
二者对比:

  • 基于死信实现的延迟队列
​ a. 优点: 1) 灵活不需要额外的插件⽀持
​ b. 缺点: 1) 存在消息次序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性

  • 基于插件实现的延迟队列
​ a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
​ b. 缺点: 1) 需要依赖特定的插件, 有运维⼯作 2) 只适⽤特定版本
事务

  1. 步骤:
  2. 1.创建新的RabbitTemplate设置setChannelTransacted为true
  3. 2.创建RabbitTransactionManager对象交给spring管理
  4. 3.注入transRabbitTemplate到Producer中并使用
  5. 4.再Producer方法上加上@Transactional注解
复制代码
  1. @Configuration
  2. public class RabbitTemplateConfig {
  3.     @Bean("rabbitTemplate")
  4.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  5.         return new RabbitTemplate(connectionFactory);
  6.     }
  7.     // 事务
  8.     @Bean("transRabbitTemplate")
  9.     public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
  10.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  11.         rabbitTemplate.setChannelTransacted(true); // 开启事务
  12.         return rabbitTemplate;
  13.     }
  14.     // 创建事务管理器
  15.     @Bean("transactionManager")
  16.     public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
  17.         return new RabbitTransactionManager(connectionFactory);
  18.     }
  19. }
复制代码
  1. @RestController
  2. @RequestMapping("/producer")
  3. public class ProducerController {
  4.     @Resource(name = "rabbitTemplate")
  5.     private RabbitTemplate rabbitTemplate;
  6.     // 依赖注入
  7.     @Resource(name = "transRabbitTemplate")
  8.     private RabbitTemplate transRabbitTemplate;
  9.     // 加上@Transactional注解启用事务
  10.     @Transactional
  11.     @RequestMapping("/transaction")
  12.     public String transaction() {
  13.         System.out.println("transaction test...");
  14.         transRabbitTemplate.convertAndSend("",Constants.TRANSACTION_QUEUE,"transaction1...");
  15.         int num = 3/0;
  16.         transRabbitTemplate.convertAndSend("",Constants.TRANSACTION_QUEUE,"transaction2...");
  17.         return "消息发送成功~";
  18.     }
  19. }
复制代码
消息分发

接纳消息分发可以做到以下功能:
1.限流
2.负载均衡


  • 限流
    设置prefetch参数,设置为手动确认
    1. #ack 确认⽅式:开启ack
    2. listener:
    3. simple:
    4.   acknowledge-mode: manual #⼿动确认
    5.   prefetch: 5
    复制代码
    设置互换机队列
    1. @Configuration
    2. public class QosConfig {
    3.     @Bean("qosQueue")
    4.     public Queue qosQueue() {
    5.         return QueueBuilder.durable(Constants.QOS_QUEUE).build();
    6.     }
    7.     @Bean("qosExchange")
    8.     public DirectExchange qosExchange() {
    9.         return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
    10.     }
    11.     @Bean("qosBinding")
    12.     public Binding qosBinding(@Qualifier("qosExchange") DirectExchange directExchange,@Qualifier("qosQueue") Queue queue) {
    13.         return BindingBuilder.bind(queue).to(directExchange).with("qos");
    14.     }
    15. }
    复制代码
    发送消息
    1. @RequestMapping("/qos")
    2. public String qos() {
    3.     System.out.println("qos test...");
    4.     for (int i = 0; i < 20; i++) {
    5.         rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,"qos","qos test...");
    6.     }
    7.     return "消息发送成功~";
    8. }
    复制代码
    斲丧者监听
    1. @Component
    2. public class QosListener {
    3.     @RabbitListener(queues = Constants.QOS_QUEUE)
    4.     public void handMessage(Message message, Channel channel) throws Exception {
    5.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
    6.         try {
    7.             // 消费者逻辑
    8.             System.out.printf("接收到消息:%s, deliveryTag %d \n",
    9.                     new String(message.getBody(), "utf-8"), deliveryTag);
    10.             // 进行业务逻辑处理
    11. //            System.out.println("业务逻辑处理...");
    12. //            int num = 3 / 0;
    13. //            System.out.println("业务处理完成");
    14. //            // 肯定确认
    15. //            channel.basicAck(deliveryTag, false);
    16.         } catch (Exception e) {
    17.             // 否定确认
    18.             channel.basicNack(deliveryTag, false, true);
    19.         }
    20.     }
    21. }
    复制代码


  • 负载均衡
    负载均衡和限流一样,只是负载均衡并非是一个斲丧者监听,而是多个
幂等性保障

幂等性指的是在应用程序中,对一个系统重复调用,不论请求多少次,这些请求对系统的影响都是相等的结果,例如数据库中的select操作。
MQ的幂等性

对于MQ⽽⾔, 幂等性是指同⼀条消息, 多次斲丧, 对系统的影响是相同的。
⼀般消息中心件的消息传输保障分为三个层级.

  • At most once:最多⼀次. 消息可能会丢失,但绝不会重复传输.
  • At least once:最少⼀次. 消息绝不会丢失,但可能会重复传输.
  • Exactly once:恰好⼀次. 每条消息肯定会被传输⼀次且仅传输⼀次.
RabbitMQ⽀持"最多⼀次"和"最少⼀次"。
幂等性办理方案

全局唯一ID

1.设置ID:给每条消息分配一个唯一的ID(UUID等)
2.判定:斲丧者收到消息后,判定ID是否已经被斲丧过,假如斲丧过则放弃处理
3.保存ID:假如为斲丧过,斲丧者开始斲丧,但是要把此全局ID保存起来(数据库或Redis等)
   可以使⽤Redis 的原⼦性操作setnx来包管幂等性, 将唯⼀ID作为key放到redis中 (SETNX messageID 1) . 返回1, 说明之前没有斲丧过, 正常斲丧. 返回0, 说明这条消息之前已斲丧过, 抛弃.
  业务逻辑判定

在业务逻辑层⾯实现消息处理的幂等性.
例如: 通过检查数据库中是否已存在相关数据纪录, 大概使⽤乐观锁机制来避免更新已被其他事务更改的数据, 再大概在处理消息之前, 先检查相关业务的状态, 确保消息对应的操作尚未执⾏, 然后才进行处理, 详细根据业务场景来处理
次序性保障

生产者生产消息的次序和斲丧者斲丧消息的次序是同等的
次序性保障方案

1.单个队列的斲丧者
最简单的⽅法是使⽤单个队列, 并由单个斲丧者进⾏处理. 同⼀个队列中的消息是先辈先出的, 这是RabbitMQ来资助我们包管的。
2.分区斲丧
单个斲丧者的吞吐太低了, 当需要多个斲丧者以提⾼处理速率时, 可以使⽤分区斲丧. 把⼀个队列分割成多个分区, 每个分区由⼀个斲丧者处理, 以此来保持每个分区内消息的次序性。
3.消息确认机制
使⽤手动消息确认机制, 斲丧者在处理完⼀条消息后, 显式地发送确认, 如许RabbitMQ才会移除并继承发送下⼀条消息
4.业务逻辑控制
在某些情况下, 纵然消息乱序到达, 也可以在业务逻辑层⾯实现次序控制. ⽐如通过在消息中嵌⼊序列号, 并在斲丧时根据这些信息来处理
RabbitMQ本⾝并不包管全局的严格次序性, 特别是在分布式系统中. 在实际应⽤开辟中, 根据详细的业务需求, 可能需要联合多种策略来实现所需要的次序包管
消息积压

消息积压是指在消息队列(如RabbitMQ)中, 待处理的消息数目超过了斲丧者处理能⼒, 导致消息在队列中不断堆积的现象。
通常有以下情况:
1.消息生产过快
2.斲丧者处理能力不足
3.网络问题
4.RabbitMQ服务器设置偏低
办理方案:
1.进步斲丧者效率:
​ a. 增加斲丧者实例数目, ⽐如新增机器
​ b. 优化业务逻辑, ⽐如使⽤多线程来处理业务
​ c. 设置prefetchCount, 当⼀个斲丧者阻塞时, 消息转发到其他未阻塞的斲丧者.
​ d. 消息发生非常时, 设置符合的重试策略, 大概转入到死信队列
2.限制生产者效率(流量控制,限流算法等)
​ a. 流量控制: 在消息生产者中实现流量控制逻辑, 根据斲丧者处理能力动态调解发送速率
​ b. 限流: 使用限流工具, 为消息发送速率设置⼀个上限
​ c. 设置过期时间. 假如消息过期未斲丧, 可以设置死信队列, 以避免消息丢失, 并淘汰对主队列的压力
3.资源设置优化
​ 比如升级RabbitMQ服务器的硬件, 调解RabbitMQ的设置参数等

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

火影

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表