一、概念先容:
RabbitMQ中几个重要的概念先容:
- Channels:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接本地虚拟连接,AMQP 下令都是通过信道发出去的,不管是发布消息、订阅队列还是吸收消息,这些动作都是通过信道完成。因为对于操作体系来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
- Exchanges:交换器,用来吸收生产者发送的消息并将这些消息路由给服务器中的队列。
- 交换机范例重要有以下几种:
- Direct Exchange(直连交换机):这种范例的交换机根据消息的Routing Key(路由键)进行准确匹配,只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息通报场景。
- Fanout Exchange(扇形交换机):这种范例的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,不管消息的路由键是什么。适用于消息必要被多个消耗者处理的场景。
- Topic Exchange(主题交换机):这种范例的交换机支持基于模式匹配的路由键,可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
- Headers Exchange(头交换机):这种范例的交换机不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。适用于必要在消息头中携带额外信息的场景。
- Queues:消息队列,用来保存消息直到发送给消耗者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列内里,等待消耗者连接到这个队列将其取走。
二、引入依赖:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
复制代码 三、添加配置信息
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- listener:
- simple:
- acknowledge-mode: manual # 手动提交
复制代码 四、Direct Exchange(直连交换机)模式
1、新建配置文件 RabbitDirectConfig类
- package com.example.direct;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * @author wasin
- * @version 1.0
- * @date 2024/6/4
- * @description: 直连交换机--这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配,
- * 只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景
- */
- @Configuration
- public class RabbitDirectConfig {
- /**
- * 队列名称
- */
- public static final String QUEUE_MESSAGE ="QUEUE_MESSAGE";
- public static final String QUEUE_USER ="QUEUE_USER";
- /**
- * 交换机
- */
- public static final String EXCHANGE="EXCHANGE_01";
- /**
- * 路由
- */
- public static final String ROUTING_KEY="ROUTING_KEY_01";
- @Bean
- public Queue queue01() {
- return new Queue(QUEUE_MESSAGE, //队列名称
- true, //是否持久化
- false, //是否排他
- false //是否自动删除
- );
- }
- @Bean
- public Queue queue02() {
- return new Queue(QUEUE_USER, //队列名称
- true, //是否持久化
- false, //是否排他
- false //是否自动删除
- );
- }
- @Bean
- public DirectExchange exchange01() {
- return new DirectExchange(EXCHANGE,
- true, //是否持久化
- false //是否排他
- );
- }
- @Bean
- public Binding demoBinding() {
- return BindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY);
- }
- @Bean
- public Binding demoBinding2() {
- return BindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY);
- }
- }
复制代码 2、添加消息生产者 Producer类
- package com.example.direct;
- import com.example.entity.User;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- /**
- * @author wasin
- * @version 1.0
- * @date 2024/6/4
- * @description:
- */
- @Component
- public class Producer {
- @Resource
- RabbitTemplate rabbitTemplate;
- public void sendMessageByExchangeANdRoute(String message){
- rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE, RabbitDirectConfig.ROUTING_KEY,message);
- }
- /**
- * 默认交换器,隐式地绑定到每个队列,路由键等于队列名称。
- * @param message
- */
- public void sendMessageByQueue(String message){
- rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message);
- }
- public void sendMessage(User user){
- rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user);
- }
- }
复制代码 3、添加消息消耗者
- package com.example.direct;
- import com.example.entity.User;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- /**
- * @author wasin
- * @version 1.0
- * @date 2024/6/4
- * @description:
- */
- @Component
- public class Consumer {
- @RabbitListener(queues = RabbitDirectConfig.QUEUE_USER)
- public void onMessage(User user){
- System.out.println("收到的实体bean消息:"+user);
- }
- @RabbitListener(queues = RabbitDirectConfig.QUEUE_MESSAGE)
- public void onMessage2(String message){
- System.out.println("收到的字符串消息:"+message);
- }
- }
复制代码 4、 测试
- package com.example;
- import com.example.entity.User;
- import com.example.direct.Producer;
- import com.example.fanout.FanoutProducer;
- import com.example.topic.TopicProducer;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
- import javax.annotation.Resource;
- @SpringBootTest
- class SpringbootRabbitMqApplicationTests {
- @Resource
- Producer producer;
-
- @Test
- public void sendMessage() throws InterruptedException {
- producer.sendMessageByQueue("哈哈");
- producer.sendMessage(new User().setAge(10).setName("wasin"));
- }
- }
复制代码 五、Topic Exchange(主题交换机)模式
1、新建RabbitTopicConfig类
- package com.example.topic;
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * @author wasin
- * @version 1.0
- * @date 2024/6/4
- * @description: 主题交换机--这种类型的交换机支持基于模式匹配的路由键,
- * 可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
- */
- @Configuration
- public class RabbitTopicConfig {
- /**
- * 交换机
- */
- public static final String EXCHANGE = "EXCHANGE_TOPIC1";
- /**
- * 队列名称
- */
- public static final String QUEUE_TOPIC1 = "QUEUE_TOPIC";
- /**
- * 路由
- * "*" 与 "#",用于做模糊匹配。其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个)
- * 可以匹配 aa.wasin.aa.bb wasin.aa.bb wasin.aa ....
- * aa.bb.wasin.cc 无法匹配
- */
- public static final String ROUTING_KEY1 = "*.wasin.#";
- @Bean
- public Queue queue() {
- return new Queue(QUEUE_TOPIC1, //队列名称
- true, //是否持久化
- false, //是否排他
- false //是否自动删除
- );
- }
- @Bean
- public TopicExchange exchange() {
- return new TopicExchange(EXCHANGE,
- true, //是否持久化
- false //是否排他
- );
- }
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1);
- }
- }
复制代码 2、新建 消息生产者和发送者
- package com.example.topic;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- /**
- * @author wasin
- * @version 1.0
- * @date 2024/6/4
- * @description:
- */
- @Component
- public class TopicProducer {
- @Resource
- RabbitTemplate rabbitTemplate;
- /**
- * @param routeKey 路由
- * @param message 消息
- */
- public void sendMessageByQueue(String routeKey, String message){
- rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message);
- }
- }
复制代码
- package com.example.topic;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- /**
- * @author wasin
- * @version 1.0
- * @date 2024/6/4
- * @description:
- */
- @Slf4j
- @Component
- public class TopicConsumer {
- @RabbitListener(queues = RabbitTopicConfig.QUEUE_TOPIC1)
- public void onMessage2(String message){
- log.info("topic收到的字符串消息:{}",message);
- }
- }
复制代码 六、Fanout Exchange(扇形交换机)模式
1、 新建 RabbitFanoutConfig类
- package com.example.fanout;
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * @author wasin
- * @version 1.0
- * @date 2024/6/4
- * @description: 扇形交换机--这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,
- * 不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。
- */
- @Configuration
- public class RabbitFanoutConfig {
- /**
- * 交换机
- */
- public static final String EXCHANGE = "EXCHANGE_FANOUT";
- /**
- * 队列名称
- */
- public static final String QUEUE_FANOUT1 = "QUEUE_FANOUT";
- /**
- * 队列名称
- */
- public static final String QUEUE_FANOUT2 = "QUEUE_FANOUT2";
- @Bean
- public Queue queueFanout1() {
- return new Queue(QUEUE_FANOUT1, //队列名称
- true, //是否持久化
- false, //是否排他
- false //是否自动删除
- );
- }
- @Bean
- public Queue queueFanout2() {
- return new Queue(QUEUE_FANOUT2, //队列名称
- true, //是否持久化
- false, //是否排他
- false //是否自动删除
- );
- }
- @Bean
- public FanoutExchange exchangeFanout() {
- return new FanoutExchange(EXCHANGE,
- true, //是否持久化
- false //是否排他
- );
- }
- @Bean
- public Binding bindingFanout() {
- return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());
- }
- @Bean
- public Binding bindingFanout2() {
- return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());
- }
- }
复制代码 2、新建 消息生产者和发送者
- package com.example.fanout;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- /**
- * @author wasin
- * @version 1.0
- * @date 2024/6/4
- * @description:
- */
- @Component
- public class FanoutProducer {
- @Resource
- RabbitTemplate rabbitTemplate;
-
- /**
- * @param message 消息
- */
- public void sendMessageByQueue(String message) {
- rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE, "", message);
- }
- }
复制代码
- package com.example.fanout;
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
- /**
- * @author wasin
- * @version 1.0
- * @date 2024/6/4
- * @description:
- */
- @Slf4j
- @Component
- public class FanoutConsumer {
- /**
- * 手动提交
- * @param message
- * @param channel
- * @param tag
- * @throws IOException
- */
- @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT1)
- public void onMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
- log.info("fanout1收到的字符串消息:{}",message);
- channel.basicAck(tag,false);
- }
- @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT2)
- public void onMessage2(String message){
- log.info("fanout2到的字符串消息:{}",message);
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |