Rabbitmq的使用

打印 上一主题 下一主题

主题 867|帖子 867|积分 2601

rabbitmq的使用

1. 使用场景及它的特点介绍


2. mq的5种常用消息模型

2.1 队列模型—-1 对 1



2.2 队列模型 — 1(生产者)对多(消费者)
  1. 特点:
  2.         1.当有多个消费者时,无论消费者处理的性能是否相同,生产者的消费会平均分配给每一个消费者
  3.         2.每个消费者处理的消息是否存在重复? 不会重复
  4.         解释:为什么开启多个消费者时,会出现有的消费者虽然处理的慢,但是也会收到相同的消息的个数?
  5.                 rabbitmq有消息默认的分配机制:平均分配(有多少个消费者,都将平均分配要处理的消息数)
  6. 优化: 能者多劳
  7.         在消费处理消息时,可以设置由队列每次分配给消费者的消息数量,不要一次性全分完
复制代码
2.3 队列模式的代码实现

2.3.1 生产的核心代码
  1. import cn.itsource.mq.utils.ConnectionUtil;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. public class Producer {
  5.         private static final String QUEUE_NAME = "queue_workqueue";
  6.         public static void main(String[] args) throws Exception {
  7.                 //1.创建连接
  8.                 Connection connection = ConnectionUtil.getConnection();
  9.                 //2.生产者与服务端之间建立通道
  10.                 Channel channel = connection.createChannel();
  11.                 for (int i = 0; i < 20; i++) {
  12.                         /**
  13.                          * 发送消息到队列
  14.                          * @param exchange 交换机名称
  15.                          * @param routingKey 发送到哪个队列(这个参数很容易搞错:没有交换机时,这个参数必须填队列名称;有交换机的时候,就填路由)
  16.                          * @param props 消息的其他属性
  17.                          * @param body 消息体
  18.                          */
  19. //在实际开发中,我们也会将发送的内容,以字符串进行传输。但是涉及到对象类型,会将其先转为json字符串。
  20.                         String message = "queue_workqueue: 这是一个消息!" + i;
  21.                         System.out.println(message);
  22.                         //3. 调用API进行消息的发送
  23.                         channel.basicPublish("",QUEUE_NAME,null,message.getBytes("utf-8"));
  24.                 }
  25.                 //5.关闭连接
  26.                 connection.close();
  27.         }
  28. }
复制代码
2.3.2 消费者的代码实现
  1. import cn.itsource.mq.utils.ConnectionUtil;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. public class Consumer01 {
  5.         //队列的名称,必须要与接收的消息生产者,设置的队列名相同
  6.         private static final String QUEUE_NAME = "queue_workqueue";
  7.         public static void main(String[] args) throws Exception {
  8.                 //1.创建连接
  9.                 Connection connection = ConnectionUtil.getConnection();
  10.                 //2.生产者与服务端之间建立通道
  11.                 Channel channel = connection.createChannel();
  12.                 //3.声明队列:因为生产者那边已经声明过队列了,所以这边就不需要声明队列
  13.                 /**
  14.                  * 3.声明队列
  15.                  * @param queue 队列名称
  16.                  * @param durable 是否持久化
  17.                  * @param exclusive 是否为专用队列
  18.                  * @param autoDelete 是否自动删除
  19.                  * @param arguments 其他参数
  20.                  */
  21.                 channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  22.                 //设置消费者每次预提取1个消息【这是一个提高消息处理效率的参数。表示每次接收几个消息】
  23.                 channel.basicQos(1);
  24.                 //4. 采用匿名内部类 写一个DefaultConsumer的子类,子类中重写handleDelivery方法
  25.                 DefaultConsumer consumer = new DefaultConsumer(channel) {
  26.                         @Override
  27.                         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28.                                 try {
  29.                                         //接收消息
  30.                                         String message = new String(body, "utf-8");
  31.                                         Thread.sleep(500);
  32.                                         System.out.println("消费者收到消息:" + message);
  33.                                         long deliveryTag = envelope.getDeliveryTag();
  34.                                         /**
  35.                                         【如果采用默认的 自动确认ACK机制 ,则可省略】
  36.                                          * 正常情况下的手动回执
  37.                                          * @param deliveryTag 处理消息的标识
  38.                                          * @param multiple 是否自动批量处理(自动处理队列中当前消息,及之前的所有消息) false表示只处理当前消息
  39.                                          */
  40.                   //注意:当ACK采用手动确认机制时,确认消息的成功发送的代码,一定要放在当前方法体的最后一行
  41.                                         channel.basicAck(deliveryTag, false);
  42.                                 } catch (Exception e) {
  43.                                         e.printStackTrace();
  44.                                 }
  45.                         }
  46.                 };
  47.                 /**
  48.                  * 5.监听队列
  49.                  *  一旦被监听的队列中有新的消息,就自动调用consumer对象的handleDelivery方法来接收消息
  50.                  * @param queue 队列名称
  51.                  * @param autoAck 是否自动回执 true表示自动回执,false表示手动回执
  52.                  * @param callback 接收消息的回调方法Consumer
  53.                  */
  54.                 channel.basicConsume(QUEUE_NAME, false, consumer);
  55.         }
  56. }
复制代码
2.4 订阅模型的代码实现

2.4.1 订阅模型分3种
  1. 1. fanout类型 : 1.不需要设置routekey,生产者的消息,会统一分别发给每一个消费者
  2. 2. direct : 1. 设置routekey,且生产者在发送消息时,也要指定routekey,且消息在过滤时,需要完全匹配生产指定的routekey
  3. 3. topic  : 1. 在设置toutekey时,可以引用【通配符】 ;2.通配符分2种:*:单个匹配;#:多个匹配
复制代码

  • fanout模型的效果图

  • direct效果图

  • topic效果图

2.4.2 生产者的代码实现
  1. import cn.itsource.mq.utils.ConnectionUtil;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. public class Producer {
  6.         private static final String EXCHANGE_NAME = "direct_exchange";
  7.         public static void main(String[] args) throws Exception {
  8.                 //1.创建连接
  9.                 Connection connection = ConnectionUtil.getConnection();
  10.                 //2.生产者与服务端之间建立通道
  11.                 Channel channel = connection.createChannel();
  12.                 /**
  13.                  * 3.声明交换机
  14.                  * @param exchange 交换机名称
  15.                  * @param type 交换机类型
  16.                  * @param durable 是否持久化
  17.                  */
  18.                 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
  19.                 /**
  20.                  * 4.发送消息到队列
  21.                  * @param exchange 交换机名称
  22.                  * @param routingKey 发送到哪个队列(这个参数很容易搞错:没有交换机时,这个参数必须填队列名称;有交换机的时候,就填路由)
  23.                  * @param props 消息的其他属性
  24.                  * @param body 消息体
  25.                  */
  26.                 String message = "这是一个消息!" + System.currentTimeMillis();
  27.                 System.out.println(message);
  28.                 //要指定 路由key  : routekey。设置后,对应的消费者,只要在监听指定的路由key的消息,才会收取到
  29.                 channel.basicPublish(EXCHANGE_NAME,"email",null,message.getBytes("utf-8"));
  30.                 //5.关闭连接
  31.                 connection.close();
  32.         }
  33. }
复制代码
2.4.3 消费者的代码实现
  1. import cn.itsource.mq.utils.ConnectionUtil;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. public class ConsumerEMAIL {
  5.         private static final String QUEUE_NAME_EMAIL = "queue_direct_email";
  6.         private static final String EXCHANGE_NAME = "direct_exchange";
  7.         public static void main(String[] args) throws Exception {
  8.                 //1.创建连接
  9.                 Connection connection = ConnectionUtil.getConnection();
  10.                 //2.生产者与服务端之间建立通道
  11.                 Channel channel = connection.createChannel();
  12.                 /**
  13.                  * 3.声明队列
  14.                  * @param queue 队列名称
  15.                  * @param durable 是否持久化
  16.                  * @param exclusive 是否为专用队列
  17.                  * @param autoDelete 是否自动删除
  18.                  * @param arguments 其他参数
  19.                  */
  20.                 channel.queueDeclare(QUEUE_NAME_EMAIL,true,false,false,null);
  21.                 /**
  22.                         在绑定到 指定的交换机时,要同时指定接收什么类型的 routekey消息
  23.                  * 4.将队列绑定到交换机
  24.                  * @param queue 队列名称
  25.                  * @param exchange 交换机名称
  26.                  * @param routingKey 路由设置
  27.                  * @param arguments 其他参数
  28.                  */
  29.                 channel.queueBind(QUEUE_NAME_EMAIL,EXCHANGE_NAME,"email",null);
  30.                 //设置消费者每次预提取1个消息
  31.                 channel.basicQos(1);
  32.                 //采用匿名内部类 写一个DefaultConsumer的子类,子类中重写handleDelivery方法
  33.                 DefaultConsumer consumer = new DefaultConsumer(channel){
  34.                         @Override
  35.                         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  36.                                 try {
  37.                                         //接收消息
  38.                                         String message = new String(body,"utf-8");
  39.                                         Thread.sleep(2000);
  40.                                         System.out.println("消费者收到消息:"+message);
  41.                                         /**
  42.                                          * 正常情况下的手动回执
  43.                                          * @param deliveryTag 处理消息的标识
  44.                                          * @param multiple 是否自动批量处理(自动处理队列中当前消息,及之前的所有消息) false表示只处理当前消息
  45.                                          */
  46.                                         channel.basicAck(envelope.getDeliveryTag(),false);
  47.                                 } catch (Exception e) {
  48.                                         e.printStackTrace();
  49.                                 }
  50.                         }
  51.                 };
  52.                 /**
  53.                  * 4.监听队列
  54.                  *  一旦被监听的队列中有新的消息,就自动调用consumer对象的handleDelivery方法来接收消息
  55.                  * @param queue 队列名称
  56.                  * @param autoAck 是否自动回执 true表示自动回执,false表示手动回执
  57.                  * @param callback 接收消息的回调方法Consumer
  58.                  */
  59.                 channel.basicConsume(QUEUE_NAME_EMAIL, false, consumer);
  60.         }
  61. }
复制代码
3. springboot整合mq


  • springboot整合mq时,在企业开辟中,都会将生产者和消费者分开集成到 2个工程中
3.1 整合生产者

3.1.1 导入pom依赖
  1. <dependency>
  2.         <groupId>org.springframework.boot</groupId>
  3.         <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
3.1.2 设置yml
  1. spring:
  2.   rabbitmq:
  3.           host: 127.0.0.1
  4.           port: 5672
  5.           username: guest
  6.           password: guest
  7.           virtualHost: /
  8.           listener:
  9.                 simple:
  10.                   acknowledge-mode: manual #手动签收
  11.                   prefetch: 1     #预提取1条消息
  12.           publisher-confirms: true #消息发送到交换机失败回调
  13.           publisher-returns: true  #消息发送到队列失败回调
  14.           template:
  15.                 mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
复制代码
3.1.3 设置启动类的注解


  • 不必要在启动类添加开启的注解,但是必要添加几个@Bean的设置
  • 设置bean
    1.   public static final String EXCHANGE_NAME = "springboot-rabbitmq-exchange";
    2.   public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms";
    3.   public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email";
    4.   /**
    5.    * 声明交换机
    6.    * @return
    7.    */
    8.   @Bean(EXCHANGE_NAME)
    9.   public Exchange EXCHANGE_NAME(){
    10.           return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    11.   }
    12.   /**
    13.    * 声明队列:sms
    14.    * @return
    15.    */
    16.   @Bean(QUEUE_NAME_SMS)
    17.   public Queue QUEUE_NAME_SMS(){
    18.           return QueueBuilder.durable(QUEUE_NAME_SMS).build();
    19.   }
    20.   /**
    21.    * 声明队列:email
    22.    * @return
    23.    */
    24.   @Bean(QUEUE_NAME_EMAIL)
    25.   public Queue QUEUE_NAME_EMAIL(){
    26.           return QueueBuilder.durable(QUEUE_NAME_EMAIL).build();
    27.   }
    28.   /**
    29.    * sms队列绑定到交换机
    30.    *  需要参数有两个办法:
    31.    *      1)直接在方法内部调用其他方法获取对象
    32.    *      2)直接方法参数中写变量,Spring会自动从Spring容器取出对象进行依赖注入
    33.    * @param queue
    34.    * @param exchange
    35.    * @return
    36.    */
    37.   @Bean
    38.   public Binding BINDING_QUEUE_NAME_SMS(@Qualifier(QUEUE_NAME_SMS)Queue queue, Exchange exchange){
    39.           return BindingBuilder.bind(queue).to(exchange).with("user.#.sms").noargs();
    40.   }
    41.   /**
    42.    * email队列绑定到交换机
    43.    * @param queue
    44.    * @param exchange
    45.    * @return
    46.    */
    47.   @Bean
    48.   public Binding BINDING_QUEUE_NAME_EMAIL(@Qualifier(QUEUE_NAME_EMAIL)Queue queue, Exchange exchange){
    49.           return BindingBuilder.bind(queue).to(exchange).with("user.#.email").noargs();
    50.   }
    复制代码
3.1.4 测试


  • 先定义一个controller,调用RabbitmqTemplate方法。
在欣赏器中,调用一次下面的消息发送的方法,就到 rabbitmq服务器中,检查是否生成了对应的exchange和queue的内容
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. @RestController
  6. public class ProducerController {
  7.         @Autowired
  8.         private RabbitTemplate rabbitTemplate;
  9.         @GetMapping("/sendMsg")
  10.         public void sendMsg(String msg) {
  11.                 //发送一个消息给mq服务器
  12.                 rabbitTemplate.convertAndSend(Contants.EXCHANGE_NAME, "user.email", msg);
  13.         }
  14. }
复制代码

  • 检查rabbitmq服务器,是否会生成对应的exchange和queue的数据


3.2 整合消费者

3.2.1 导入pom依赖
  1. <dependency>
  2.         <groupId>org.springframework.boot</groupId>
  3.         <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
3.2.2 设置yml
  1. spring:
  2.   rabbitmq:
  3.           host: 127.0.0.1
  4.           port: 5672
  5.           username: guest
  6.           password: guest
  7.           virtualHost: /
  8.           listener:
  9.                 simple:
  10.                   acknowledge-mode: manual #手动签收
  11.                   prefetch: 1     #预提取1条消息
  12.           publisher-confirms: true #消息发送到交换机失败回调
  13.           publisher-returns: true  #消息发送到队列失败回调
  14.           template:
  15.                 mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
复制代码
3.2.3 设置启动注解或bean


3.2.4 测试


  • 消费者的核心代码
    1.   import com.rabbitmq.client.Channel;
    2.   import org.springframework.amqp.core.Message;
    3.   import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4.   import org.springframework.stereotype.Component;
    5.   import java.io.IOException;
    6.   @Component
    7.   public class ConsumerListener {
    8.           public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms";
    9.           public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email";
    10.           /**
    11.            * 监听器:监听一个或者多个队列
    12.            *  被监听的队列中一旦有了新的消息,就自动执行此方法来处理消息
    13.            * @param msg
    14.            * @param message
    15.            * @param channel
    16.            */
    17.           @RabbitListener(queues = {QUEUE_NAME_SMS})
    18.           public void accept_sms(String msg, Message message, Channel channel){
    19.                   try {
    20.                           System.out.println("SMS消费者收到消息:" + msg);
    21.                           //成功接收消息
    22.                           channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    23.                   } catch (IOException e) {
    24.                           e.printStackTrace();
    25.                   }
    26.           }
    27.           /**
    28.            * 监听器:监听一个或者多个队列
    29.            *  被监听的队列中一旦有了新的消息,就自动执行此方法来处理消息
    30.            * @param msg
    31.            * @param message
    32.            * @param channel
    33.            */
    34.           @RabbitListener(queues = {QUEUE_NAME_EMAIL})
    35.           public void accept_email(String msg, Message message, Channel channel){
    36.                   try {
    37.                           System.out.println("EMAIL消费者收到消息:" + msg);
    38.                           //成功接收消息
    39.                           channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    40.                   } catch (IOException e) {
    41.                           e.printStackTrace();
    42.                   }
    43.           }
    44.   }
    复制代码
  • 启动消费者的工程后,不必要做任何事,只要生产者发送成功一条消息,消费者就应该能吸收到 消息内容,如果吸收不到 ,说明 环境 设置失败

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

罪恶克星

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表