罪恶克星 发表于 2024-7-4 20:00:15

Rabbitmq的使用

rabbitmq的使用

1. 使用场景及它的特点介绍

https://img2024.cnblogs.com/blog/3237288/202406/3237288-20240630144650827-652429488.png
2. mq的5种常用消息模型

2.1 队列模型—-1 对 1

https://img2024.cnblogs.com/blog/3237288/202406/3237288-20240630161201591-1565246137.png
https://img2024.cnblogs.com/blog/3237288/202406/3237288-20240630161210707-715047401.png
2.2 队列模型 — 1(生产者)对多(消费者)

特点:
        1.当有多个消费者时,无论消费者处理的性能是否相同,生产者的消费会平均分配给每一个消费者
        2.每个消费者处理的消息是否存在重复? 不会重复
        解释:为什么开启多个消费者时,会出现有的消费者虽然处理的慢,但是也会收到相同的消息的个数?
                rabbitmq有消息默认的分配机制:平均分配(有多少个消费者,都将平均分配要处理的消息数)
优化: 能者多劳
        在消费处理消息时,可以设置由队列每次分配给消费者的消息数量,不要一次性全分完2.3 队列模式的代码实现

2.3.1 生产的核心代码

import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
        private static final String QUEUE_NAME = "queue_workqueue";
        public static void main(String[] args) throws Exception {
                //1.创建连接
                Connection connection = ConnectionUtil.getConnection();
                //2.生产者与服务端之间建立通道
                Channel channel = connection.createChannel();
                for (int i = 0; i < 20; i++) {
                        /**
                       * 发送消息到队列
                       * @param exchange 交换机名称
                       * @param routingKey 发送到哪个队列(这个参数很容易搞错:没有交换机时,这个参数必须填队列名称;有交换机的时候,就填路由)
                       * @param props 消息的其他属性
                       * @param body 消息体
                       */
//在实际开发中,我们也会将发送的内容,以字符串进行传输。但是涉及到对象类型,会将其先转为json字符串。
                        String message = "queue_workqueue: 这是一个消息!" + i;
                        System.out.println(message);
                        //3. 调用API进行消息的发送
                        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("utf-8"));
                }
                //5.关闭连接
                connection.close();
        }
}2.3.2 消费者的代码实现

import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer01 {
        //队列的名称,必须要与接收的消息生产者,设置的队列名相同
        private static final String QUEUE_NAME = "queue_workqueue";
        public static void main(String[] args) throws Exception {
                //1.创建连接
                Connection connection = ConnectionUtil.getConnection();
                //2.生产者与服务端之间建立通道
                Channel channel = connection.createChannel();
                //3.声明队列:因为生产者那边已经声明过队列了,所以这边就不需要声明队列
                /**
               * 3.声明队列
               * @param queue 队列名称
               * @param durable 是否持久化
               * @param exclusive 是否为专用队列
               * @param autoDelete 是否自动删除
               * @param arguments 其他参数
               */
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                //设置消费者每次预提取1个消息【这是一个提高消息处理效率的参数。表示每次接收几个消息】
                channel.basicQos(1);
                //4. 采用匿名内部类 写一个DefaultConsumer的子类,子类中重写handleDelivery方法
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                try {
                                        //接收消息
                                        String message = new String(body, "utf-8");
                                        Thread.sleep(500);
                                        System.out.println("消费者收到消息:" + message);
                                        long deliveryTag = envelope.getDeliveryTag();
                                        /**
                                        【如果采用默认的 自动确认ACK机制 ,则可省略】
                                       * 正常情况下的手动回执
                                       * @param deliveryTag 处理消息的标识
                                       * @param multiple 是否自动批量处理(自动处理队列中当前消息,及之前的所有消息) false表示只处理当前消息
                                       */
                  //注意:当ACK采用手动确认机制时,确认消息的成功发送的代码,一定要放在当前方法体的最后一行
                                        channel.basicAck(deliveryTag, false);
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
                };
                /**
               * 5.监听队列
               *一旦被监听的队列中有新的消息,就自动调用consumer对象的handleDelivery方法来接收消息
               * @param queue 队列名称
               * @param autoAck 是否自动回执 true表示自动回执,false表示手动回执
               * @param callback 接收消息的回调方法Consumer
               */
                channel.basicConsume(QUEUE_NAME, false, consumer);
        }
}2.4 订阅模型的代码实现

2.4.1 订阅模型分3种

1. fanout类型 : 1.不需要设置routekey,生产者的消息,会统一分别发给每一个消费者
2. direct : 1. 设置routekey,且生产者在发送消息时,也要指定routekey,且消息在过滤时,需要完全匹配生产指定的routekey
3. topic: 1. 在设置toutekey时,可以引用【通配符】 ;2.通配符分2种:*:单个匹配;#:多个匹配

[*]fanout模型的效果图
https://img2024.cnblogs.com/blog/3237288/202406/3237288-20240630170755571-1494298862.png
[*]direct效果图
https://img2024.cnblogs.com/blog/3237288/202406/3237288-20240630170810323-1841382119.png
[*]topic效果图
https://img2024.cnblogs.com/blog/3237288/202406/3237288-20240630170834300-1428690118.png
2.4.2 生产者的代码实现

import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
        private static final String EXCHANGE_NAME = "direct_exchange";
        public static void main(String[] args) throws Exception {
                //1.创建连接
                Connection connection = ConnectionUtil.getConnection();
                //2.生产者与服务端之间建立通道
                Channel channel = connection.createChannel();
                /**
               * 3.声明交换机
               * @param exchange 交换机名称
               * @param type 交换机类型
               * @param durable 是否持久化
               */
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
                /**
               * 4.发送消息到队列
               * @param exchange 交换机名称
               * @param routingKey 发送到哪个队列(这个参数很容易搞错:没有交换机时,这个参数必须填队列名称;有交换机的时候,就填路由)
               * @param props 消息的其他属性
               * @param body 消息体
               */
                String message = "这是一个消息!" + System.currentTimeMillis();
                System.out.println(message);
                //要指定 路由key: routekey。设置后,对应的消费者,只要在监听指定的路由key的消息,才会收取到
                channel.basicPublish(EXCHANGE_NAME,"email",null,message.getBytes("utf-8"));
                //5.关闭连接
                connection.close();
        }
}2.4.3 消费者的代码实现

import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerEMAIL {
        private static final String QUEUE_NAME_EMAIL = "queue_direct_email";
        private static final String EXCHANGE_NAME = "direct_exchange";
        public static void main(String[] args) throws Exception {
                //1.创建连接
                Connection connection = ConnectionUtil.getConnection();
                //2.生产者与服务端之间建立通道
                Channel channel = connection.createChannel();
                /**
               * 3.声明队列
               * @param queue 队列名称
               * @param durable 是否持久化
               * @param exclusive 是否为专用队列
               * @param autoDelete 是否自动删除
               * @param arguments 其他参数
               */
                channel.queueDeclare(QUEUE_NAME_EMAIL,true,false,false,null);
                /**
                        在绑定到 指定的交换机时,要同时指定接收什么类型的 routekey消息
               * 4.将队列绑定到交换机
               * @param queue 队列名称
               * @param exchange 交换机名称
               * @param routingKey 路由设置
               * @param arguments 其他参数
               */
                channel.queueBind(QUEUE_NAME_EMAIL,EXCHANGE_NAME,"email",null);
                //设置消费者每次预提取1个消息
                channel.basicQos(1);
                //采用匿名内部类 写一个DefaultConsumer的子类,子类中重写handleDelivery方法
                DefaultConsumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                try {
                                        //接收消息
                                        String message = new String(body,"utf-8");
                                        Thread.sleep(2000);
                                        System.out.println("消费者收到消息:"+message);
                                        /**
                                       * 正常情况下的手动回执
                                       * @param deliveryTag 处理消息的标识
                                       * @param multiple 是否自动批量处理(自动处理队列中当前消息,及之前的所有消息) false表示只处理当前消息
                                       */
                                        channel.basicAck(envelope.getDeliveryTag(),false);
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
                };
                /**
               * 4.监听队列
               *一旦被监听的队列中有新的消息,就自动调用consumer对象的handleDelivery方法来接收消息
               * @param queue 队列名称
               * @param autoAck 是否自动回执 true表示自动回执,false表示手动回执
               * @param callback 接收消息的回调方法Consumer
               */
                channel.basicConsume(QUEUE_NAME_EMAIL, false, consumer);
        }
}3. springboot整合mq


[*]springboot整合mq时,在企业开辟中,都会将生产者和消费者分开集成到 2个工程中
3.1 整合生产者

3.1.1 导入pom依赖

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>3.1.2 设置yml

spring:
rabbitmq:
          host: 127.0.0.1
          port: 5672
          username: guest
          password: guest
          virtualHost: /
          listener:
                simple:
                  acknowledge-mode: manual #手动签收
                  prefetch: 1   #预提取1条消息
          publisher-confirms: true #消息发送到交换机失败回调
          publisher-returns: true#消息发送到队列失败回调
          template:
                mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃3.1.3 设置启动类的注解


[*]不必要在启动类添加开启的注解,但是必要添加几个@Bean的设置
[*]设置bean
public static final String EXCHANGE_NAME = "springboot-rabbitmq-exchange";
public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms";
public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email";
/**
   * 声明交换机
   * @return
   */
@Bean(EXCHANGE_NAME)
public Exchange EXCHANGE_NAME(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
   * 声明队列:sms
   * @return
   */
@Bean(QUEUE_NAME_SMS)
public Queue QUEUE_NAME_SMS(){
        return QueueBuilder.durable(QUEUE_NAME_SMS).build();
}
/**
   * 声明队列:email
   * @return
   */
@Bean(QUEUE_NAME_EMAIL)
public Queue QUEUE_NAME_EMAIL(){
        return QueueBuilder.durable(QUEUE_NAME_EMAIL).build();
}
/**
   * sms队列绑定到交换机
   *需要参数有两个办法:
   *      1)直接在方法内部调用其他方法获取对象
   *      2)直接方法参数中写变量,Spring会自动从Spring容器取出对象进行依赖注入
   * @param queue
   * @param exchange
   * @return
   */
@Bean
public Binding BINDING_QUEUE_NAME_SMS(@Qualifier(QUEUE_NAME_SMS)Queue queue, Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("user.#.sms").noargs();
}
/**
   * email队列绑定到交换机
   * @param queue
   * @param exchange
   * @return
   */
@Bean
public Binding BINDING_QUEUE_NAME_EMAIL(@Qualifier(QUEUE_NAME_EMAIL)Queue queue, Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("user.#.email").noargs();
}
3.1.4 测试


[*]先定义一个controller,调用RabbitmqTemplate方法。
在欣赏器中,调用一次下面的消息发送的方法,就到 rabbitmq服务器中,检查是否生成了对应的exchange和queue的内容
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @GetMapping("/sendMsg")
        public void sendMsg(String msg) {
                //发送一个消息给mq服务器
                rabbitTemplate.convertAndSend(Contants.EXCHANGE_NAME, "user.email", msg);
        }
}

[*]检查rabbitmq服务器,是否会生成对应的exchange和queue的数据
https://img2024.cnblogs.com/blog/3237288/202406/3237288-20240630171619495-313036471.png
https://img2024.cnblogs.com/blog/3237288/202406/3237288-20240630171629490-1171651721.png
3.2 整合消费者

3.2.1 导入pom依赖

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>3.2.2 设置yml

spring:
rabbitmq:
          host: 127.0.0.1
          port: 5672
          username: guest
          password: guest
          virtualHost: /
          listener:
                simple:
                  acknowledge-mode: manual #手动签收
                  prefetch: 1   #预提取1条消息
          publisher-confirms: true #消息发送到交换机失败回调
          publisher-returns: true#消息发送到队列失败回调
          template:
                mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃3.2.3 设置启动注解或bean


3.2.4 测试


[*]消费者的核心代码
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ConsumerListener {
        public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms";
        public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email";
        /**
       * 监听器:监听一个或者多个队列
       *被监听的队列中一旦有了新的消息,就自动执行此方法来处理消息
       * @param msg
       * @param message
       * @param channel
       */
        @RabbitListener(queues = {QUEUE_NAME_SMS})
        public void accept_sms(String msg, Message message, Channel channel){
                try {
                        System.out.println("SMS消费者收到消息:" + msg);
                        //成功接收消息
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                } catch (IOException e) {
                        e.printStackTrace();
                }
        }
        /**
       * 监听器:监听一个或者多个队列
       *被监听的队列中一旦有了新的消息,就自动执行此方法来处理消息
       * @param msg
       * @param message
       * @param channel
       */
        @RabbitListener(queues = {QUEUE_NAME_EMAIL})
        public void accept_email(String msg, Message message, Channel channel){
                try {
                        System.out.println("EMAIL消费者收到消息:" + msg);
                        //成功接收消息
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                } catch (IOException e) {
                        e.printStackTrace();
                }
        }
}
[*]启动消费者的工程后,不必要做任何事,只要生产者发送成功一条消息,消费者就应该能吸收到 消息内容,如果吸收不到 ,说明 环境 设置失败

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Rabbitmq的使用