RabbitMq原生接口详解

打印 上一主题 下一主题

主题 899|帖子 899|积分 2697

helloword

生产者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6.   
  7. import java.nio.charset.StandardCharsets;  
  8.   
  9. public class SingleProducer {  
  10.     private final static String QUEUE_NAME = "hello";  
  11.     public static void main(String[] args) throws Exception {  
  12.         // 创建连接工厂  
  13.         ConnectionFactory factory = new ConnectionFactory();  
  14.         factory.setHost("localhost");  
  15.         factory.setUsername("admin");  
  16.         factory.setPassword("admin");  
  17.   
  18.         try(Connection connection=factory.newConnection(); Channel channel=connection.createChannel()){  
  19.             channel.queueDeclare(QUEUE_NAME,false,false,false,null);  
  20.             String message = "Hello World!";  
  21.             channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));  
  22.             System.out.println(" [x] Sent '" + message + "'");  
  23.         }  
  24.     }  
  25. }
复制代码
首先我们必要使用ConnectionFactory设置登录账户和暗码以及端标语等。
Connection这里可以明白为数据库ConnectionFactory类负责登录mq的管理窗口想要对队列举行利用则必要建立连接然后再获取channel对象举行利用
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
关于queueDeclare方法的各个参数剖析

queueDeclare通常用于声明一个队列以下是各个参数的寄义
参数寄义queueName表示队列的名称是在利用队列时的唯一利用符可以让服务器找到想要利用的队列durable表示是否乐意让这个队列持久化即当重启服务器后该队列是否丢失假如设置为true后则阐明在后续假如碰到服务器故障当服务器重启完成后该队列以及队列中的数据可以自动恢复exclusive当其设置为true后阐明队列具有排他性,因此当前队列只有当前连接的人可见并且当连接断开后队列也会举行烧毁和接纳当其为false后则其可以被多个消费者使用autoDelete这里是指该队列是否必要自动烧毁接纳arguments这里是一个map指的是是否必要传递进一些其他的属性 消费者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.DeliverCallback;  
  7.   
  8. import java.nio.charset.StandardCharsets;  
  9.   
  10. public class SingleConsumer {  
  11.     private final static String QUEUE_NAME = "hello";  
  12.   
  13.     public static void main(String[] args) throws Exception{  
  14.         // 创建一个 ConnectionFactory 对象,用于创建与消息代理的连接  
  15.         ConnectionFactory factory = new ConnectionFactory();  
  16.         // 设置消息代理的主机地址为 localhost        factory.setHost("localhost");  
  17.         factory.setUsername("admin");  
  18.         factory.setPassword("admin");  
  19.         // 通过 ConnectionFactory 创建一个新的连接  
  20.         Connection connection = factory.newConnection();  
  21.         // 通过连接创建一个通道,通道是执行大多数 API 的地方,例如声明队列、发布消息、消费消息等  
  22.         Channel channel = connection.createChannel();  
  23.         // 声明一个队列,使用 queueDeclare 方法  
  24.         // 参数说明:  
  25.         // QUEUE_NAME:队列的名称  
  26.         // false:表示该队列是否持久化,false 表示不持久化,即如果 RabbitMQ 服务重启,该队列将被删除  
  27.         // false:表示该队列是否排他性,false 表示不排他,多个消费者可以连接到该队列  
  28.         // false:表示该队列是否自动删除,false 表示不会自动删除,即使没有消费者,该队列也会保留  
  29.         // null:表示该队列的其他属性,这里设置为 null 表示没有额外的属性  
  30.         channel.queueDeclare(QUEUE_NAME,false,false,false,null);  
  31.         // 输出等待消息的信息,提示用户按 CTRL+C 退出  
  32.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  33.         // 定义一个 DeliverCallback 对象,用于处理接收到的消息  
  34.         DeliverCallback deliverCallback=(consumerTag, delivery)->{  
  35.             // 将接收到的消息体(byte[] 类型)转换为 UTF-8 编码的字符串  
  36.             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);  
  37.             // 打印接收到的消息  
  38.             System.out.println(" [x] Received '" + message + "'");  
  39.         };  
  40.         // 开始消费消息,会持续阻塞等待消息  
  41.         // 参数说明:  
  42.         // QUEUE_NAME:要消费的队列名称  
  43.         // true:表示是否自动确认消息,true 表示一旦消息被发送给消费者,RabbitMQ 就会将其标记为已确认并从队列中删除  
  44.         // deliverCallback:处理接收到消息的回调函数  
  45.         // consumerTag -> { }:取消消费的回调函数,这里是一个空的实现  
  46.         channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });  
  47.     }  
  48. }
复制代码
这里前半部分和生产者是一样的利用而basicConsume方法我们举行详细剖析
basicConsume

这个方法是一个开始消费信息即从队列中开始获取信息的一个方法。其参数寄义如下
参数寄义queuename要读取信息的队列名称ack这里是一个boolean类型的参数表示的是是否开启自动确认应答,即假如设置为true消费者吸收到消息后会自动确认无需手动调用channel.basicAck方法。DeliverCallback这里是创建的一个回调函数假如有消息传递给消费者后这个consumerTag -> { }指的是取消订阅这个消息队列后调用的方法 workqueue

生产者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.MessageProperties;  
  7.   
  8. import java.util.Scanner;  
  9.   
  10. public class MultiProducer {  
  11.     private static final String TASK_QUEUE_NAME = "multi_queue";  
  12.   
  13.     public static void main(String[] args) throws Exception{  
  14.         ConnectionFactory connectionFactory=new ConnectionFactory();  
  15.         connectionFactory.setHost("localhost");  
  16.         connectionFactory.setUsername("admin");  
  17.         connectionFactory.setPassword("admin");  
  18.         try (Connection connection = connectionFactory.newConnection();  
  19.              Channel channel = connection.createChannel()){  
  20.              channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);  
  21.              Scanner scanner=new Scanner(System.in);  
  22.              while(scanner.hasNext()){  
  23.                  String message=scanner.nextLine();  
  24.                  channel.basicPublish("",TASK_QUEUE_NAME,  
  25.                          MessageProperties.PERSISTENT_TEXT_PLAIN,  
  26.                          message.getBytes("UTF-8"));  
  27.                  System.out.println(" [x] Sent '" + message + "'");  
  28.              }  
  29.         }  
  30.     }  
  31. }
复制代码
以上代码和第一个helloword代码的主要区别就是basicPublish方法的运用这里我们表明一下各个参数的寄义
basicPublish参数剖析

参数寄义exchange这里表示交换机的名称我们后面会讲到这里设置为空表示使用默认的交换机TASK_QUEUE_NAME(routingKey内部源码有一个处置惩罚我们下面来说这里简单明白为队列名称)表示队列名称MessageProperties.PERSISTENT_TEXT_PLAIN(props)这个参数表示的消息的属性设置此处的参数寄义是该消息为持久化消息,即当服务器重启后该消息不会丢失message.getBytes("UTF-8")(body)这个是body参数是消息的主体内容一般我们会将其设置为utf8该方法的主要目的是为了将消息传递给消息队列其内部的参数则是设置了该消息传递应当如何传递,是否持久是否使用创建的交换机等等 消费者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.DeliverCallback;  
  7.   
  8. public class MultiConsumer {  
  9.     private static final String TASK_QUEUE_NAME = "multi_queue";  
  10.   
  11.     public static void main(String[] args) throws Exception{  
  12.         ConnectionFactory connectionFactory=new ConnectionFactory();  
  13.         connectionFactory.setHost("localhost");  
  14.         connectionFactory.setUsername("admin");  
  15.         connectionFactory.setPassword("admin");  
  16.         final Connection connection = connectionFactory.newConnection();  
  17.         for(int i=0;i<2;i++){  
  18.             final Channel channel = connection.createChannel();  
  19.             channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);  
  20.             System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  21.             channel.basicQos(1);  
  22.             int finalI = i;  
  23.             DeliverCallback deliverCallback=(consumerTag, delivery)->{  
  24.                 String message=new String(delivery.getBody(), "UTF-8");  
  25.                 try {  
  26.                     System.out.println(" [x] Received '" + "编号:" + finalI + ":" + message + "'");  
  27.                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
  28.                     //停20秒  
  29.                     Thread.sleep(20000);  
  30.                 } catch (InterruptedException e) {  
  31.                     throw new RuntimeException(e);  
  32.                 }finally {  
  33.                     System.out.println(" [x] Done");  
  34.                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
  35.                 }  
  36.             };  
  37.             // 开启消费监听  
  38.             channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {  
  39.             });  
  40.         }  
  41.     }  
  42. }
复制代码
代码剖析

channel.basicQos(1);

它允许消费者控制预取消息的数量,防止消费者吸收过多消息而无法实时处置惩罚,导致消息积压或体系过载。这里涉及到了rabbitmq存取消息的一个过程我们可以举行详细的讲解一下
rabbitMq存取消息流程

当我们启动rabbitMq服务器之后利用体系会给rabbitMq分别一块内存作为mq存储队列,消息,连接器等
basicPublish 当我们调用该方法后我们的消息会被发送给rabbitmq服务器,服务器再根据我们的消息的属性例如交换器等将该消息存储到某个队列中
basicConsume 当我们调用该方法后那么消费者就会与rabbitmq服务器建立起来连接并订阅相应的消息队列当该队列有消息的时间RabbitMQ 服务器会将消息传递给消费者
交换器(Publish/Subscribe)

这里官方文档给的一个归栏是发布者和订阅者,其实指的是交换器和队列之间的关系这里我们将从以下交换器举行介绍
fanout交换器

fanout交换器是交换器的一种类型我们来看下官网的剖析
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an _exchange_. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the _exchange type_.
我们对以上举行翻译
RabbitMQ 消息传递模型的焦点头脑是,生产者从不直接将任何消息发送到队列。实际上,生产者经常甚至不知道消息是否会被投递到哪个队列。
相反,生产者只能将消息发送到一个交换器。交换器是一种非常简单的组件。它一端吸收来自生产者的消息,另一端将消息推送到队列。交换器必须确切知道如何处置惩罚吸收到的消息。是将消息附加到特定的某个队列?照旧附加到多个队列?亦或是将其抛弃。这些规则由交换器类型来定义。
这里的意思就是在最开始我们的消息传递时生产者直接将消息发送给队列现在变成了生产者将消息发给交换器由交换器将其发送给不同的队列(以fanout为例)

上图其实时fanout交换器在吸收到消息后如何发送我们可以看到fanout交换器是将消息统一发给与其绑定的全部队列那么代码层我们来看一下
生产者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6.   
  7. import java.util.Scanner;  
  8.   
  9. public class FanoutProducer {  
  10.     private static final String EXCHANGE_NAME = "fanout-exchange";  
  11.   
  12.     public static void main(String[] args) throws Exception {  
  13.         ConnectionFactory factory=new ConnectionFactory();  
  14.         factory.setPassword("admin");  
  15.         factory.setUsername("admin");  
  16.         factory.setHost("localhost");  
  17.         try(Connection connection=factory.newConnection();  
  18.             Channel channel=connection.createChannel()){  
  19.             channel.exchangeDeclare(EXCHANGE_NAME,"fanout");  
  20.             Scanner scanner=new Scanner(System.in);  
  21.             while(scanner.hasNext()){  
  22.                 String message=scanner.nextLine();  
  23.                 channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("utf-8"));  
  24.                 System.out.println(" [x] Sent '" + message + "'");  
  25.             }  
  26.         }  
  27.   
  28.     }  
  29. }
复制代码
代码剖析

channel.exchangeDeclare(EXCHANGE_NAME,“fanout”);

这行代码的作用其实就是声明一个交换器,我们可以类比上面的生产者举行对比
参数寄义EXCHANGE_NAME交换器名称type(fanout)交换器的类型、 消费者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.DeliverCallback;  
  7.   
  8. public class FanoutConsumer {  
  9.     private static final String EXCHANGE_NAME = "fanout-exchange";  
  10.   
  11.     public static void main(String[] args) throws Exception {  
  12.         ConnectionFactory factory=new ConnectionFactory();  
  13.         factory.setHost("localhost");  
  14.         factory.setUsername("admin");  
  15.         factory.setPassword("admin");  
  16.         Connection connection=factory.newConnection();  
  17.         Channel channel1=connection.createChannel();  
  18.         Channel channel2=connection.createChannel();  
  19.         channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");  
  20.         String queueName = "xiaowang_queue";  
  21.         channel1.queueDeclare(queueName,true,false,false,null);  
  22.         channel1.queueBind(queueName, EXCHANGE_NAME, "");  
  23.         String queueName2 = "xiaoli_queue";  
  24.         channel1.queueDeclare(queueName2,true,false,false,null);  
  25.         channel1.queueBind(queueName2, EXCHANGE_NAME, "");  
  26.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  27.         DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {  
  28.             String message = new String(delivery.getBody(), "UTF-8");  
  29.             System.out.println(" [小王] Received '" + message + "'");  
  30.         };  
  31.   
  32.         DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {  
  33.             String message = new String(delivery.getBody(), "UTF-8");  
  34.             System.out.println(" [小李] Received '" + message + "'");  
  35.         };  
  36.         channel1.basicConsume(queueName,true,deliverCallback1,consumerTag->{});  
  37.         channel2.basicConsume(queueName2,true,deliverCallback2,consumerTag->{});  
  38.     }  
  39. }
复制代码
代码剖析

channel1.queueBind(queueName, EXCHANGE_NAME, “”);

参数寄义queueName绑定交换器的队列名exchangeName绑定的哪个交换器名称routingKey(“”)路由键。它是一个字符串,当交换器将消息路由到队列时会使用这个路由键。不同类型的交换器对路由键的使用方式不同接下来使用direct将会使用 该方法的作用就是可以让该队列绑定该交换机从而使得交换机可以向绑定自己的队列发送消息
direct交换机

作用

在上述fanout的表明中我们可以知道随着业务复杂度的提拔单纯的将消息传递给全部的消息队列已经无法满意我们的业务需求了因此我们必要一个交换器可以帮助我们将特定的消息发送给特定的队列即针对消息举行一个分类管理而我们的direct便有这个功能他的传递消息图如下.

可以实现的功能

通过如许的特性我们可以对消息实现一个分类比如说,我们在注册一个QQ账号的时间假如是fanout路由那么我们全部的消息都是发送给全部的队列,包罗注册失败,注册成功等等消息这很明显不符合我们的项目要求,因此我们可以对消息分类,成功的放入一个队列失败的放入一个队列从而实现了降低耦合进步内聚
生产者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6.   
  7. import java.util.Scanner;  
  8.   
  9. public class DirectProducer {  
  10.     private static final String EXCHANGE_NAME = "direct-exchange";  
  11.   
  12.     public static void main(String[] args) throws Exception{  
  13.         ConnectionFactory factory=new ConnectionFactory();  
  14.         factory.setHost("localhost");  
  15.         factory.setUsername("admin");  
  16.         factory.setPassword("admin");  
  17.         try (Connection connection = factory.newConnection();  
  18.              Channel channel = connection.createChannel()) {  
  19.             channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  20.             Scanner scanner = new Scanner(System.in);  
  21.             while (scanner.hasNext()) {  
  22.                 String userInput = scanner.nextLine();//输入一个字符串前半部分是message后半部分是routingKey中间是空格  
  23.                 String[] strings = userInput.split(" ");//以空格分割  
  24.                 if (strings.length < 1) {  
  25.                     continue;  
  26.                 }  
  27.                 String message = strings[0];  
  28.                 String routingKey = strings[1];  
  29.                 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));  
  30.                 System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");  
  31.             }  
  32.   
  33.         }  
  34.   
  35.     }  
  36. }
复制代码
代码剖析

basicPublish填坑

![[Pasted image 20250118134640.png]]
在对上面剖析的时间我提到了这个方法的内部参数名称官方起的时routingKey而上面我说先简单明白为传递队列名称,由于其内部源码有如许的一个处置惩罚,那就是当我们exchange为空的时间那就默认为使用rabbitMq默认的一个路由交换器,此时routingKey就会被认为是队列名称因此这里看似我们在对exchange传递的是空但是其实本质我们也是调用了rabbitMq的默认路由
那么basicPublish的实际使用其实rottingKey参数传递的是一个路由键,可以明白为就像一个哈希表一样,在消费者中调用queuebind的时间我们有一个参数也就是routingKey请参考上面的fanout消费者代码剖析 上面我们提到了在绑定交换机的时间我们可以设置一个routingKey其实可以明白为一个哈希表,routingKey是键然后我们通过这个路由键实现了可以将消息转给特定的消息队列
消费者

代码

  1. package com.monai.aidati.mq;  
  2. import com.rabbitmq.client.Channel;  
  3. import com.rabbitmq.client.Connection;  
  4. import com.rabbitmq.client.ConnectionFactory;  
  5. import com.rabbitmq.client.DeliverCallback;  
  6.   
  7. public class DirectConsumer {  
  8.     private static final String EXCHANGE_NAME = "direct-exchange";  
  9.   
  10.     public static void main(String[] args) throws Exception {  
  11.         ConnectionFactory factory=new ConnectionFactory();  
  12.         factory.setPassword("admin");  
  13.         factory.setUsername("admin");  
  14.         factory.setHost("localhost");  
  15.         Connection connection=factory.newConnection();  
  16.         Channel channel=connection.createChannel();  
  17.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  18.         // 创建队列,随机分配一个队列名称  
  19.         String queueName = "xiaoyu_queue";  
  20.         channel.queueDeclare(queueName, true, false, false, null);  
  21.         channel.queueBind(queueName, EXCHANGE_NAME, "xiaoyu");  
  22.         // 创建队列,随机分配一个队列名称  
  23.         String queueName2 = "xiaopi_queue";  
  24.         channel.queueDeclare(queueName2, true, false, false, null);  
  25.         channel.queueBind(queueName2, EXCHANGE_NAME, "xiaopi");  
  26.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  27.         DeliverCallback xiaoyuDeliverCallback=(consumerTag, delivery)->{  
  28.             String message=new String(delivery.getBody(),"utf-8");  
  29.             System.out.println(" [xiaoyu] Received '" +  
  30.                     delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  
  31.         };  
  32.         DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {  
  33.             String message = new String(delivery.getBody(), "UTF-8");  
  34.             System.out.println(" [xiaopi] Received '" +  
  35.                     delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  
  36.         };  
  37.         channel.basicConsume(queueName,true,xiaoyuDeliverCallback,consumerTag->{});  
  38.         channel.basicConsume(queueName2,true,xiaopiDeliverCallback,consumerTag->{});  
  39.     }  
  40. }
复制代码
代码剖析

queueBind

![[Pasted image 20250118141905.png]]
在这段代码中我们可以看到我们的routingKey已经写入了值,也就是说该队列绑定交换机后交换机可以通过这个routingkey向特定的消息队列发送消息
Topic交换机

作用

在上面的交换机我们学到了发送给全部队列,发送给指定队列,但是我们在生活中的实际应用场景其实比上面的还要复杂一些由于我们还必要批量发送消息那么Topic就可以满意我们这个需求

可实现的功能

在这里我们可以实现消息的批量转发比如说在一个公司中老板必要给全体员工发送消息1给全体管理层发送消息2那么可以使用Topic交换机
生产者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6.   
  7. import java.util.Scanner;  
  8.   
  9. public class TopicProducer {  
  10.     private static final String EXCHANGE_NAME = "topic-exchange";  
  11.   
  12.     public static void main(String[] args) throws Exception {  
  13.         ConnectionFactory factory = new ConnectionFactory();  
  14.         factory.setHost("localhost");  
  15.         factory.setUsername("admin");  
  16.         factory.setPassword("admin");  
  17.         try (Connection connection = factory.newConnection();  
  18.              Channel channel = connection.createChannel()) {  
  19.             channel.exchangeDeclare(EXCHANGE_NAME,"topic");  
  20.             Scanner scanner = new Scanner(System.in);  
  21.             while (scanner.hasNext()) {  
  22.                 String userInput = scanner.nextLine();  
  23.                 String[] strings = userInput.split(" ");  
  24.                 if (strings.length < 1) {  
  25.                     continue;  
  26.                 }  
  27.                 String message = strings[0];  
  28.                 String routingKey = strings[1];  
  29.   
  30.                 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));  
  31.                 System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");  
  32.             }  
  33.         }  
  34.     }  
  35. }
复制代码
这里的生产者代码其实和direct的代码是共用的
消费者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.DeliverCallback;  
  7.   
  8. public class TopicConsumer {  
  9.     private static final String EXCHANGE_NAME = "topic-exchange";  
  10.   
  11.     public static void main(String[] args) throws Exception {  
  12.         ConnectionFactory factory=new ConnectionFactory();  
  13.         factory.setHost("localhost");  
  14.         factory.setUsername("admin");  
  15.         factory.setPassword("admin");  
  16.         Connection connection=factory.newConnection();  
  17.         Channel channel=connection.createChannel();  
  18.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
  19.         String queueName = "frontend_queue";  
  20.         channel.queueDeclare(queueName,true,false,false,null);  
  21.         channel.queueBind(queueName,EXCHANGE_NAME,"#.前端.#");  
  22.         // 创建队列  
  23.         String queueName2 = "backend_queue";  
  24.         channel.queueDeclare(queueName2, true, false, false, null);  
  25.         channel.queueBind(queueName2, EXCHANGE_NAME, "#.后端.#");  
  26.   
  27.         // 创建队列  
  28.         String queueName3 = "product_queue";  
  29.         channel.queueDeclare(queueName3, true, false, false, null);  
  30.         channel.queueBind(queueName3, EXCHANGE_NAME, "#.产品.#");  
  31.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  32.         DeliverCallback xiaoaDeliverCallback = (consumerTag, delivery) -> {  
  33.             String message = new String(delivery.getBody(), "UTF-8");  
  34.             System.out.println(" [xiaoa] Received '" +  
  35.                     delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  
  36.         };  
  37.   
  38.         DeliverCallback xiaobDeliverCallback = (consumerTag, delivery) -> {  
  39.             String message = new String(delivery.getBody(), "UTF-8");  
  40.             System.out.println(" [xiaob] Received '" +  
  41.                     delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  
  42.         };  
  43.   
  44.         DeliverCallback xiaocDeliverCallback = (consumerTag, delivery) -> {  
  45.             String message = new String(delivery.getBody(), "UTF-8");  
  46.             System.out.println(" [xiaoc] Received '" +  
  47.                     delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  
  48.         };  
  49.         channel.basicConsume(queueName, true, xiaoaDeliverCallback, consumerTag -> {  
  50.         });  
  51.         channel.basicConsume(queueName2, true, xiaobDeliverCallback, consumerTag -> {  
  52.         });  
  53.         channel.basicConsume(queueName3, true, xiaocDeliverCallback, consumerTag -> {  
  54.         });  
  55.     }  
  56. }
复制代码
代码剖析

channel.queueBind(queueName2, EXCHANGE_NAME, “#.后端.#”);

这里我们可以发现我们在设置routingKey的时间用了一种类似于正则表达式的样式但是照旧有肯定区别的
* . 可以匹配一个单词比如说 * .ori 那么a.ori , b.ori , c.ori都可以匹配
#. 可以匹配0个大概多个单词比如说 a.# 那么a.a a.a.a a.b都可以匹配到
这里我尝试已往掉 . 举行实行但是实行结果显示去掉后匹配就不能成功了可能这是它默认的匹配规则吧。
RPC

除了以上的交换机类型外官方还提供了一个交换机类型就是RPC
RPC 即远程过程调用,是一种允许一个步调调用另一个地址空间(通常是远程盘算机)中的过程或函数的技能,而开发人员无需显式编码远程交互的细节。在分布式体系中,RPC 是一种常用的通讯模式,它使得开发人员可以像调用当地函数一样调用远程服务,隐藏了网络通讯的复杂性。但是的话我们假如必要使用RPC也有专门的RPC框架,就像redis也可以当作消息队列但是我们不会为了模仿而模仿,一般我们都是使用专门的框架技能
焦点特性

消息过期机制

在上面讲解basicPublish的各项参数寄义时间我们提到了持久化机制,那么我们可以明白就是我们的消息是可以设置过期时间的,其实也很好明白,由于消息队列也好,交换机也好这些占用的都是我们的内存,那么假如不设置一个合理的过期机制就很容易导致我们的消息出现挤压导致内存的浪费。
生产者

代码

  1. package com.monai.aidati.mq;  
  2.   
  3. import com.rabbitmq.client.AMQP;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7.   
  8. public class TtlProducer {  
  9.     private final static String QUEUE_NAME = "ttl_queue";  
  10.   
  11.     public static void main(String[] args) throws Exception{  
  12.         ConnectionFactory factory=new ConnectionFactory();  
  13.         factory.setHost("localhost");  
  14.         factory.setUsername("admin");  
  15.         factory.setPassword("admin");  
  16.         try (Connection connection=factory.newConnection();  
  17.              Channel channel=connection.createChannel()){  
  18.             channel.queueDeclare(QUEUE_NAME,true,false,false,null);  
  19.             String message = "Hello World!";  
  20.             AMQP.BasicProperties basicProperties=new AMQP.BasicProperties().builder()  
  21.                     .expiration("10000")  
  22.                     .build();  
  23.             channel.basicPublish("",QUEUE_NAME,basicProperties,message.getBytes("utf8"));  
  24.         }  
  25.   
  26.     }  
  27. }
复制代码
代码剖析

在以上代码中我们用到了之前没有效到的一个参数basicProperties这里我们之前提出过是为了设置一些消息的属性那么这里我们便可以设置消息的属性之一也就是过期时间为10秒消费者的话其实代码没有做过多的变动可以直接使用helloword的代码调用即可观察征象。
消息确认机制

焦点概念

这里我们必要了解生产者发送的消息是如何发送到消费者步调中的如下图

如图所示生产者生产的消息能够被消费者获取到是由于生产者会将生产的消息发送给broker这是一个中心件,它的作用就是将生产者生产出的消息传递给消费者。而broker在获取到消息之后会根据这个消息的各种配置选择合适的路由等,并且当收到消息后会给生产者发送一个ack表示成功吸收,并把该消息发送给消费者,那么在这个过程中有哪些地方容易出现异常呢?
异常情况1 生产者生产的消息未能传递给broker

针对生产者和broker之间的消息传递rabbitMq设置了三个样式的消息确认机制
Publishing Messages Individually(单独确认)

  1.    static void publishMessagesIndividually() throws Exception {  
  2.         try (Connection connection = createConnection()) {  
  3.             Channel ch = connection.createChannel();  
  4. //开启信道确认模式  
  5.             ch.confirmSelect();  
  6. //声明队列  
  7.             ch.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME1, true, false, true,  
  8.                     null);  
  9.             long start = System.currentTimeMillis();  
  10. //循环发送消息  
  11.             for (int i = 0; i < MESSAGE_COUNT; i++) {  
  12.                 String body = "消息"+ i;  
  13. //发布消息  
  14.                 ch.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME1, null,  
  15.                         body.getBytes());  
  16. //等待确认消息.只要消息被确认,这个方法就会被返回  
  17. //如果超时过期, 则抛出TimeoutException。如果任何消息被nack(丢失),  
  18.                 waitForConfirmsOrDie将抛出IOException。  
  19.                 ch.waitForConfirmsOrDie(5_000);  
  20.             }  
  21.             long end = System.currentTimeMillis();  
  22.             System.out.format("Published %d messages individually in %d ms",  
  23.                     MESSAGE_COUNT, end - start);  
  24.         }  
  25.     }
复制代码
相较于普通的生产者这里的主要区别就是这里我们新增的代码就是上面这两行那么我们主要讲解waitForConfirmsOrDie
这个方法可以设置一个参数这个参数是broker未返回ack的话我们最多等候多长时间他的底层原理如下
1 生产者向broker发送消息
2 broker收到消息后会按照顺序对收到的消息设置一个id这个id通常是自增的
3 broker收到消息后会自动的为生产者发送一个ack并且rabbitMq内部会维持一张表上面记载着对应id的消息此时是否发送ack报文
4 生产者调用waitForConfirmsOrDie会陷入阻塞等候不停等候到broker发送的ack报文获取到为止
Publishing Messages in Batches(批量确认)

  1.     static void publishMessagesInBatch() throws Exception {  
  2.         try (Connection connection = createConnection()) {  
  3. //创建信道  
  4.             Channel ch = connection.createChannel();  
  5. //信道设置为confirm模式  
  6.             ch.confirmSelect();  
  7. //声明队列  
  8.             ch.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME2, true, false, true,  
  9.                     null);  
  10.             int batchSize = 100;  
  11.             int outstandingMessageCount = 0;  
  12.             long start = System.currentTimeMillis();  
  13.             for (int i = 0; i < MESSAGE_COUNT; i++) {  
  14.                 String body = "消息"+ i;  
  15. //发送消息  
  16.                 ch.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME2, null,  
  17.                         body.getBytes());  
  18.                 outstandingMessageCount++;  
  19. //批量确认消息  
  20.                 if (outstandingMessageCount == batchSize) {  
  21.                     ch.waitForConfirmsOrDie(5_000);  
  22.                     outstandingMessageCount = 0;  
  23.                 }  
  24.             }  
  25. //消息发送完, 还有未确认的消息, 进行确认  
  26.             if (outstandingMessageCount > 0) {  
  27.                 ch.waitForConfirmsOrDie(5_000);  
  28.             }  
  29.             long end = System.currentTimeMillis();  
  30.             System.out.format("Published %d messages in batch in %d ms",  
  31.                     MESSAGE_COUNT, end - start);  
  32.         }  
  33.     }
复制代码
不同于同步处置惩罚机制,批量处置惩罚明显的淘汰了网络通讯的负担由于它会设置一个消息量当已发送的消息达到这个量的时间会举行一次确认,这时间就用到了rabbitMq维护的一张broker是否举行ack确认的表了,他会根据以发送消息的id举行排场从而获取到哪些消息还没有举行ack确认。
Handling Publisher Confirms Asynchronously(异步确认)

异步确认的机制是这三者实现最为复杂但也是最为高效的一种确认方式,他的基本原理就是确认和机制和发送消息不是同时举行的这得益于rabbitMq实现了一个方法addConfirmListener这个方法
可以添加ConfirmListener 回调接口.而回调窗口包含了两个方法handleAck(long deliveryTag, boolean
multiple) 和 handleNack(long deliveryTag, boolean multiple),从方法的名称我们可以知道addConfirmListener其实就是添加了一个监听器,而回调接口内实现的两个方法则是当broker返回ack和nack的时间的处置惩罚,异步确认必要我们的生产者维护一个表格,也就是把全部的消息放入该表格中,然后当吸收到一个ack后把该消息从表格中移除,终极表格中剩余的消息也就是没有吸收到ack的消息了
  1.    static void handlePublishConfirmsAsynchronously() throws Exception {  
  2.         try (Connection connection = createConnection()) {  
  3.             Channel ch = connection.createChannel();  
  4.             ch.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME3, false, false, true,  
  5.                     null);  
  6.             ch.confirmSelect();  
  7. //有序集合,元素按照自然顺序进行排序,存储未confirm消息序号  
  8.             SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new  
  9.                     TreeSet<>());  
  10.             ch.addConfirmListener(new ConfirmListener() {  
  11.                 @Override  
  12.                 public void handleAck(long deliveryTag, boolean multiple) throws  
  13.                         IOException {  
  14. //System.out.println("ack, SeqNo: " + deliveryTag +  
  15.                     ",multiple:" + multiple);  
  16. //multiple 批量  
  17. //confirmSet.headSet(n)方法返回当前集合中小于n的集合  
  18.                     if (multiple) {  
  19. //批量确认:将集合中小于等于当前序号deliveryTag元素的集合清除,表示  
  20.                         这批序号的消息都已经被ack了  
  21.                         confirmSet.headSet(deliveryTag+1).clear();  
  22.                     } else {  
  23. //单条确认:将当前的deliveryTag从集合中移除  
  24.                         confirmSet.remove(deliveryTag);  
  25.                     }  
  26.                 }  
  27.                 @Override  
  28.                 public void handleNack(long deliveryTag, boolean multiple) throws  
  29.                         IOException {  
  30.                     System.err.format("deliveryTag: %d, multiple: %b%n",  
  31.                             deliveryTag, multiple);  
  32.                     if (multiple) {  
  33. //批量确认:将集合中小于等于当前序号deliveryTag元素的集合清除,表示  
  34.                         这批序号的消息都已经被ack了  
  35.                         confirmSet.headSet(deliveryTag+1).clear();  
  36.                     } else {  
  37. //单条确认:将当前的deliveryTag从集合中移除  
  38.                         confirmSet.remove(deliveryTag);  
  39.                     }  
  40. //如果处理失败, 这里需要添加处理消息重发的场景. 此处代码省略  
  41.                 }  
  42.             });  
  43. //循环发送消息  
  44.             long start = System.currentTimeMillis();  
  45.             for (int i = 0; i < MESSAGE_COUNT; i++) {  
  46.                 String message = "消息" + i;  
  47. //得到下次发送消息的序号, 从1开始  
  48.                 long nextPublishSeqNo = ch.getNextPublishSeqNo();  
  49. //System.out.println("消息序号:"+ nextPublishSeqNo);  
  50.                 ch.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME3, null,  
  51.                         message.getBytes());  
  52. //将序号存入集合中  
  53.                 confirmSet.add(nextPublishSeqNo);  
  54.             }  
  55. //消息确认完毕  
  56.             while (!confirmSet.isEmpty()){  
  57.                 Thread.sleep(10);  
  58.             }  
  59.             long end = System.currentTimeMillis();  
  60.             System.out.format("Published %d messages and handled confirms  
  61.                     asynchronously in %d ms%n", MESSAGE_COUNT, end - start);  
  62.         }  
  63.     }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户云卷云舒

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表