RabbitMQ 学习笔记

打印 上一主题 下一主题

主题 902|帖子 902|积分 2706

AMQP协议模型



  • Server:又称为Broker,接受客户端的链接,实现AMQP实体服务
  • Connection:连接,应用程序与Broker的网络连接
  • channel:网络信道,几乎所有的操作都在channel中进行,是消息读写的通道,可建立多个channel,每个channel代表一个会话任务
  • Message:消息本体,由Properties和Body组成,Proerties对消息进行装饰(优先级、延迟等特征),body是消息体内容
  • Virtual host:虚拟地址,进行逻辑各级,一个VirtualHost里面可以有若干个Exchange和Queue,同一个VirtualHost不能有相同的Exchange或Queue
  • Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列
  • Binding:Exchange和Queue之间的虚拟链接 binding中可以报刊rounting key
  • Rounting key :消息发送的路由规则
  • Queue: 消息队列,保存并转发给消费者
    AMQP 消息流转


命令行与管理台
  1. rabbitmqctl stop_app           关闭应用
  2. rabbitmqctl start_app          启动应用
  3. rabbitmqctl status                节点状态
  4. rabbitmqctl add_user username password        添加用户
  5. rabbitmqctl list_users                                        打印用户列表
  6. rabbitmqctl delete_user username 删除用户
  7. rabbitmqctl clear_permissions -p vhostpath username  用户权限清除
  8. rabbitmqctl add_vhost vhostpath 创建虚拟主机
  9. rabbitmqctl list_vhosts 打印虚拟主机列表
  10. rabbitmqctl list_queues 打印队列信息
  11. rabbitmqctl -p vhostpath purge_queue blue 清除队列里的消息
复制代码
Exchange 交换机

Exchange:接受消息,并根据路由键转发消息所绑定的队列

Exchange 交换机属性

  • Name:交换机名称
  • Type:类型

    • direct 直连,发送到direct Exchange的消息被转发到RouteKey指定的Queue  exchange=doutingKey
    • Topic  发布\订阅,发送到Topic Exchange的消息会被转发到所有关心RouteKey中制定的Topic的Queue上
    • fanout、不处理路由键,简单的将队列板绑定到交换机上,发送到交换机上的消息都会被转发到该交换机绑定的队列上、fount是最快的
    • headers

  • Durability:是否需要持久化
  • Auto Delete:当最后一个绑定在Exchange上的队列删除后,自动删除Exchange
  • Interal:当前exchange是否用于Rabbitmq内部使用,默认为FALSE
  • Arguments:扩展参数,用户扩展AMQP协议参数
Binding-绑定

  • Exchange和Exchange、queue之间绑定
消息可靠性投递 100%

生产端-可靠性投递

解决方案

  • 消息落库、对消息状态达标
  • 消息延迟投递、做二次确认,回掉检查
消费端-幂等性保证

解决方案:

  • 数据库ID+指纹码
  • 利用Redis原子性实现
Confirm确认消息

消费者收到消息会给生产者一个应答
  1. channel.addConfirmListener
复制代码
Return 消息机制

RabbitMQ代码整合

Spring-AMQP


  • RabbitAdmin -- Exchange\Queue\Binding 之间关系声明
  • SpringAMQP 声明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdapter
  • MessageConverter
    RabbitAdmin
    1. package com.study.rabbitmq.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.Queue;
    5. import org.springframework.amqp.core.TopicExchange;
    6. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    7. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    8. import org.springframework.amqp.rabbit.core.RabbitAdmin;
    9. import org.springframework.context.annotation.Bean;
    10. import org.springframework.stereotype.Component;
    11. @Component
    12. public class RabbitMQConfig {
    13.     @Bean
    14.     public ConnectionFactory connectionFactory(){
    15.         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    16.         connectionFactory.setAddresses("192.168.223.128:5672");
    17.         connectionFactory.setUsername("guest");
    18.         connectionFactory.setPassword("guest");
    19.         connectionFactory.setVirtualHost("/");
    20.         return connectionFactory;
    21.     }
    22.     @Bean
    23.     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
    24.         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    25.         // setAutoStartup  必须设置为true 否则Spring不会加载RabbitAdmin
    26.         rabbitAdmin.setAutoStartup(true);
    27.         return rabbitAdmin;
    28.     }
    29.     /**
    30.      * 针对消费者配置
    31.      * 1. 设置交换机类型
    32.      * 2. 将队列绑定到交换机
    33.      FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
    34.      HeadersExchange :通过添加属性key-value匹配
    35.      DirectExchange:按照routingkey分发到指定队列
    36.      TopicExchange:多关键字匹配
    37.      */
    38.     @Bean
    39.     public TopicExchange exchange001() {
    40.         return new TopicExchange("topic001", true, false);
    41.     }
    42.     @Bean
    43.     public Queue queue001() {
    44.         return new Queue("queue001", true); //队列持久
    45.     }
    46.     @Bean
    47.     public Binding binding001() {
    48.         return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
    49.     }
    50. }
    复制代码
    RabbitTemplate
  1.    @Bean
  2.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  3.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  4.         return rabbitTemplate;
  5.     }
  6.    
  7.         @Test
  8.     public void testSendMessage() throws Exception {
  9.         //1 创建消息
  10.         MessageProperties messageProperties = new MessageProperties();
  11.         messageProperties.getHeaders().put("desc", "信息描述..");
  12.         messageProperties.getHeaders().put("type", "自定义消息类型..");
  13.         Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
  14.         rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
  15.             @Override
  16.             public Message postProcessMessage(Message message) throws AmqpException {
  17.                 System.err.println("------添加额外的设置---------");
  18.                 message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
  19.                 message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
  20.                 return message;
  21.             }
  22.         });
  23.     }
复制代码
SimpleMessageListenerContainer


  • SimpleMessageListenerContainer可以进行动态监听消息,设置事务、事务管理、事务属性、设置消费者数量,进行批量消费,设置消息确认模式、自动确认,是否重回队列、异常捕获header函数,设置消费者标签生成策略,是否独占模式,消费者属性,设置具体的监听器、消息转换器
  1. @Bean
  2.     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  3.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  4.         // 容器添加监听队列
  5.         container.setQueues(queue001());
  6.         // 设置监听数据量
  7.         container.setConcurrentConsumers(1);
  8.         container.setMaxConcurrentConsumers(5);
  9.         // 是否重回队列 - FALSE
  10.         container.setDefaultRequeueRejected(false);
  11.         // 签收 - auto 自动签收
  12.         container.setAcknowledgeMode(AcknowledgeMode.AUTO);
  13.         //
  14.         container.setExposeListenerChannel(true);
  15.         // 设置客户端tag
  16.         container.setConsumerTagStrategy(new ConsumerTagStrategy() {
  17.             @Override
  18.             public String createConsumerTag(String queue) {
  19.                 return queue + "_" + UUID.randomUUID().toString();
  20.             }
  21.         });
  22.         // 消息监听处理
  23.         container.setMessageListener(new ChannelAwareMessageListener() {
  24.             @Override
  25.             public void onMessage(Message message, Channel channel) throws Exception {
  26.                 System.out.println("---- 处理消费者"+new String(message.getBody()));
  27.             }
  28.         });
  29.         return container;
  30.     }
复制代码
消息监听MessageListenerAdapter 以及 消息封装处理 MessageConverter
  1. // 消息坚定
  2. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
  3. adapter.setDefaultListenerMethod("consumeMessage"); // 设置监听处理方法名
  4. adapter.setMessageConverter(new TextMessageConverter()); // 设置消息封装类
  5. container.setMessageListener(adapter);
  6. // TextMessageConverter
  7. public class TextMessageConverter implements MessageConverter {
  8.     @Override
  9.     public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
  10.         return new Message(object.toString().getBytes(), messageProperties);
  11.     }
  12.     @Override
  13.     public Object fromMessage(Message message) throws MessageConversionException {
  14.         String contentType = message.getMessageProperties().getContentType();
  15.         if(null != contentType && contentType.contains("text")) {
  16.             return new String(message.getBody());
  17.         }
  18.         return message.getBody();
  19.     }
  20. }
  21. // MessageDelegate
  22. public class MessageDelegate {
  23.     public void handleMessage(byte[] messageBody) {
  24.         System.err.println("默认方法, 消息内容:" + new String(messageBody));
  25.     }
  26.     public void consumeMessage(byte[] messageBody) {
  27.         System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
  28.     }
  29.     public void consumeMessage(String messageBody) {
  30.         System.err.println("字符串方法, 消息内容:" + messageBody);
  31.     }
  32.     public void method1(String messageBody) {
  33.         System.err.println("method1 收到消息内容:" + new String(messageBody));
  34.     }
  35.     public void method2(String messageBody) {
  36.         System.err.println("method2 收到消息内容:" + new String(messageBody));
  37.     }
  38.     public void consumeMessage(Map messageBody) {
  39.         System.err.println("map方法, 消息内容:" + messageBody);
  40.     }
  41.     public void consumeMessage(Order order) {
  42.         System.err.println("order对象, 消息内容, id: " + order.getId() +
  43.                 ", name: " + order.getName() +
  44.                 ", content: "+ order.getContent());
  45.     }
  46.     public void consumeMessage(File file) {
  47.         System.err.println("文件对象 方法, 消息内容:" + file.getName());
  48.     }
  49. }
复制代码
SpringBoot整合RabbitMQ
  1. // 生产者
  2. @Component
  3. public class RabbitSender {
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.     //回调函数: confirm确认
  7.     final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
  8.         @Override
  9.         public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  10.             System.err.println("correlationData: " + correlationData);
  11.             System.err.println("ack: " + ack);
  12.             if(!ack){
  13.                 System.err.println("异常处理....");
  14.             }
  15.         }
  16.     };
  17.     //回调函数: return返回
  18.     final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
  19.         @Override
  20.         public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
  21.                                     String exchange, String routingKey) {
  22.             System.err.println("return exchange: " + exchange + ", routingKey: "
  23.                     + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
  24.         }
  25.     };
  26.     //发送消息方法调用: 构建Message消息
  27.     public void send(Object message, Map<String, Object> properties) throws Exception {
  28.         MessageHeaders mhs = new MessageHeaders(properties);
  29.         Message msg = MessageBuilder.createMessage(message, mhs);
  30.         rabbitTemplate.setConfirmCallback(confirmCallback);
  31.         rabbitTemplate.setReturnCallback(returnCallback);
  32.         //id + 时间戳 全局唯一
  33.         CorrelationData correlationData = new CorrelationData("1234567890");
  34.         rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
  35.     }
  36.     //发送消息方法调用: 构建自定义对象消息
  37.     public void sendOrder(Order order) throws Exception {
  38.         rabbitTemplate.setConfirmCallback(confirmCallback);
  39.         rabbitTemplate.setReturnCallback(returnCallback);
  40.         //id + 时间戳 全局唯一
  41.         CorrelationData correlationData = new CorrelationData("0987654321");
  42.         rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
  43.     }
  44. }
复制代码
  1. // 消费者
  2. @Component
  3. public class RabbitReceiver {
  4.     @RabbitListener(bindings = @QueueBinding(
  5.             value = @Queue(value = "queue-1",
  6.                     durable="true"),
  7.             exchange = @Exchange(value = "exchange-1",
  8.                     durable="true",
  9.                     type= "topic",
  10.                     ignoreDeclarationExceptions = "true"),
  11.             key = "springboot.*"
  12.     )
  13.     )
  14.     @RabbitHandler
  15.     public void onMessage(Message message, Channel channel) throws Exception {
  16.         System.err.println("--------------------------------------");
  17.         System.err.println("消费端Payload: " + message.getPayload());
  18.         Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
  19.         //手工ACK
  20.         channel.basicAck(deliveryTag, false);
  21.     }
  22.     /**
  23.      *
  24.      * @param order
  25.      * @param channel
  26.      * @param headers
  27.      * @throws Exception
  28.      */
  29.     @RabbitListener(bindings = @QueueBinding(
  30.             value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
  31.                     durable="${spring.rabbitmq.listener.order.queue.durable}"),
  32.             exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
  33.                     durable="${spring.rabbitmq.listener.order.exchange.durable}",
  34.                     type= "${spring.rabbitmq.listener.order.exchange.type}",
  35.                     ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
  36.             key = "${spring.rabbitmq.listener.order.key}"
  37.     )
  38.     )
  39.     @RabbitHandler
  40.     public void onOrderMessage(@Payload Order order,
  41.                                Channel channel,
  42.                                @Headers Map<String, Object> headers) throws Exception {
  43.         System.err.println("--------------------------------------");
  44.         System.err.println("消费端order: " + order.getId());
  45.         Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
  46.         //手工ACK
  47.         channel.basicAck(deliveryTag, false);
  48.     }
  49. }
复制代码
  1. # 配置文件
  2. spring:
  3.   rabbitmq:
  4.     addresses: 192.168.11.76:5672
  5.     username: guest
  6.     password: guest
  7.     virtual-host: /
  8.     connection-timeout: 15000
  9.     publisher-confirms: true # 生产者exchange确认
  10.     publisher-returns: true # 生产者queue确认
  11.     template:
  12.       mandatory: true
  13.     listener:
  14.       simple:
  15.         acknowledge-mode: manual
  16.         concurrency: 5
  17.         max-concurrency: 10
  18.       order: # 自定义消费队列
  19.         queue:
  20.           name: queue-2
  21.           durable: true
  22.         exchange:
  23.           name: exchange-2
  24.           durable: true
  25.           type: topic
  26.           ignoreDeclarationExceptions: true
  27.         key: springboot.*
复制代码
<img alt="image-20240221125440948" loading="lazy">
SpringCloudStream 整合Rabbitmq
  1. /**
  2. * 生产者
  3. * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
  4. * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
  5. */
  6. public interface Barista {
  7.     String OUTPUT_CHANNEL = "output_channel";
  8.     //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。
  9.     @Output(Barista.OUTPUT_CHANNEL)
  10.     MessageChannel logoutput();
  11. }
  12. @EnableBinding(Barista.class)
  13. @Service
  14. public class RabbitmqSender {
  15.     @Autowired
  16.     private Barista barista;
  17.     // 发送消息
  18.     public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
  19.         try{
  20.             MessageHeaders mhs = new MessageHeaders(properties);
  21.             Message msg = MessageBuilder.createMessage(message, mhs);
  22.             boolean sendStatus = barista.logoutput().send(msg);
  23.             System.err.println("--------------sending -------------------");
  24.             System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
  25.         }catch (Exception e){
  26.             System.err.println("-------------error-------------");
  27.             e.printStackTrace();
  28.             throw new RuntimeException(e.getMessage());
  29.         }
  30.         return null;
  31.     }
  32. }
复制代码
  1. /**
  2. * 消费者
  3. * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
  4. * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
  5. */
  6. public interface ConsumerBarista {
  7.     String INPUT_CHANNEL = "input_channel";
  8.     //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题
  9.     @Input(ConsumerBarista.INPUT_CHANNEL)
  10.     SubscribableChannel loginput();
  11. }
  12. @EnableBinding(ConsumerBarista.class)
  13. @Service
  14. public class RabbitmqReceiver {
  15.     @StreamListener(ConsumerBarista.INPUT_CHANNEL)
  16.     public void receiver(Message message) throws Exception {
  17.         Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
  18.         Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
  19.         System.out.println("Input Stream 1 接受数据:" + message);
  20.         System.out.println("消费完毕------------");
  21.         channel.basicAck(deliveryTag, false);
  22.     }
  23. }
复制代码
  1. # 服务配置
  2. server:
  3.   port: 8888
  4.   context-path: /rabbitmq
  5. # spring应用配置
  6. spring:
  7.   application:
  8.     name: rabbitmq-cloud-stream
  9.   # SpringCloud 服务配置
  10.   cloud:
  11.     stream:
  12.       binders:
  13.         output_channel:
  14.           destination: exchange-3
  15.           group: queue-3
  16.           binder: rabbit_cluster
  17.         input_channel:
  18.           destination: exchange-3
  19.           group: queue-3
  20.           binder: rabbit_cluster
  21.           consumer:
  22.             concurrency: 1
  23.       bindings:
  24.         rabbit_cluster:
  25.           type: rabbit
  26.           environment:
  27.             spring:
  28.               rabbitmq:
  29.                 addresses: 192.168.11.76:5672
  30.                 username: guest
  31.                 password: guest
  32.                 virtual-host: /
  33.         input_channel:
  34.           consumer:
  35.             requeue-rejected: false
  36.             acknowledge-mode: MANUAL
  37.             recovery-interval: 3000
  38.             durable-subscription: true
  39.             max-concurrency: 5
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表