ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ 学习笔记 [打印本页]

作者: 锦通    时间: 2024-5-5 17:13
标题: RabbitMQ 学习笔记
AMQP协议模型




命令行与管理台
  1. rabbitmqctl stop_app           关闭应用
  2. rabbitmqctl start_app          启动应用
  3. rabbitmqctl status                节点状态
  4. rabbitmqctl add_user username password        添加用户
  5. rabbitmqctl list_users                                        打印用户列表
  6. rabbitmqctl delete_user username 删除用户
  7. rabbitmqctl clear_permissions -p vhostpath username  用户权限清除
  8. rabbitmqctl add_vhost vhostpath 创建虚拟主机
  9. rabbitmqctl list_vhosts 打印虚拟主机列表
  10. rabbitmqctl list_queues 打印队列信息
  11. rabbitmqctl -p vhostpath purge_queue blue 清除队列里的消息
复制代码
Exchange 交换机

Exchange:接受消息,并根据路由键转发消息所绑定的队列

Exchange 交换机属性
Binding-绑定
消息可靠性投递 100%

生产端-可靠性投递

解决方案
消费端-幂等性保证

解决方案:
Confirm确认消息

消费者收到消息会给生产者一个应答
  1. channel.addConfirmListener
复制代码
Return 消息机制

RabbitMQ代码整合

Spring-AMQP

  1.    @Bean
  2.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  3.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  4.         return rabbitTemplate;
  5.     }
  6.    
  7.         @Test
  8.     public void testSendMessage() throws Exception {
  9.         //1 创建消息
  10.         MessageProperties messageProperties = new MessageProperties();
  11.         messageProperties.getHeaders().put("desc", "信息描述..");
  12.         messageProperties.getHeaders().put("type", "自定义消息类型..");
  13.         Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
  14.         rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
  15.             @Override
  16.             public Message postProcessMessage(Message message) throws AmqpException {
  17.                 System.err.println("------添加额外的设置---------");
  18.                 message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
  19.                 message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
  20.                 return message;
  21.             }
  22.         });
  23.     }
复制代码
SimpleMessageListenerContainer

  1. @Bean
  2.     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  3.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  4.         // 容器添加监听队列
  5.         container.setQueues(queue001());
  6.         // 设置监听数据量
  7.         container.setConcurrentConsumers(1);
  8.         container.setMaxConcurrentConsumers(5);
  9.         // 是否重回队列 - FALSE
  10.         container.setDefaultRequeueRejected(false);
  11.         // 签收 - auto 自动签收
  12.         container.setAcknowledgeMode(AcknowledgeMode.AUTO);
  13.         //
  14.         container.setExposeListenerChannel(true);
  15.         // 设置客户端tag
  16.         container.setConsumerTagStrategy(new ConsumerTagStrategy() {
  17.             @Override
  18.             public String createConsumerTag(String queue) {
  19.                 return queue + "_" + UUID.randomUUID().toString();
  20.             }
  21.         });
  22.         // 消息监听处理
  23.         container.setMessageListener(new ChannelAwareMessageListener() {
  24.             @Override
  25.             public void onMessage(Message message, Channel channel) throws Exception {
  26.                 System.out.println("---- 处理消费者"+new String(message.getBody()));
  27.             }
  28.         });
  29.         return container;
  30.     }
复制代码
消息监听MessageListenerAdapter 以及 消息封装处理 MessageConverter
  1. // 消息坚定
  2. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
  3. adapter.setDefaultListenerMethod("consumeMessage"); // 设置监听处理方法名
  4. adapter.setMessageConverter(new TextMessageConverter()); // 设置消息封装类
  5. container.setMessageListener(adapter);
  6. // TextMessageConverter
  7. public class TextMessageConverter implements MessageConverter {
  8.     @Override
  9.     public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
  10.         return new Message(object.toString().getBytes(), messageProperties);
  11.     }
  12.     @Override
  13.     public Object fromMessage(Message message) throws MessageConversionException {
  14.         String contentType = message.getMessageProperties().getContentType();
  15.         if(null != contentType && contentType.contains("text")) {
  16.             return new String(message.getBody());
  17.         }
  18.         return message.getBody();
  19.     }
  20. }
  21. // MessageDelegate
  22. public class MessageDelegate {
  23.     public void handleMessage(byte[] messageBody) {
  24.         System.err.println("默认方法, 消息内容:" + new String(messageBody));
  25.     }
  26.     public void consumeMessage(byte[] messageBody) {
  27.         System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
  28.     }
  29.     public void consumeMessage(String messageBody) {
  30.         System.err.println("字符串方法, 消息内容:" + messageBody);
  31.     }
  32.     public void method1(String messageBody) {
  33.         System.err.println("method1 收到消息内容:" + new String(messageBody));
  34.     }
  35.     public void method2(String messageBody) {
  36.         System.err.println("method2 收到消息内容:" + new String(messageBody));
  37.     }
  38.     public void consumeMessage(Map messageBody) {
  39.         System.err.println("map方法, 消息内容:" + messageBody);
  40.     }
  41.     public void consumeMessage(Order order) {
  42.         System.err.println("order对象, 消息内容, id: " + order.getId() +
  43.                 ", name: " + order.getName() +
  44.                 ", content: "+ order.getContent());
  45.     }
  46.     public void consumeMessage(File file) {
  47.         System.err.println("文件对象 方法, 消息内容:" + file.getName());
  48.     }
  49. }
复制代码
SpringBoot整合RabbitMQ
  1. // 生产者
  2. @Component
  3. public class RabbitSender {
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.     //回调函数: confirm确认
  7.     final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
  8.         @Override
  9.         public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  10.             System.err.println("correlationData: " + correlationData);
  11.             System.err.println("ack: " + ack);
  12.             if(!ack){
  13.                 System.err.println("异常处理....");
  14.             }
  15.         }
  16.     };
  17.     //回调函数: return返回
  18.     final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
  19.         @Override
  20.         public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
  21.                                     String exchange, String routingKey) {
  22.             System.err.println("return exchange: " + exchange + ", routingKey: "
  23.                     + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
  24.         }
  25.     };
  26.     //发送消息方法调用: 构建Message消息
  27.     public void send(Object message, Map<String, Object> properties) throws Exception {
  28.         MessageHeaders mhs = new MessageHeaders(properties);
  29.         Message msg = MessageBuilder.createMessage(message, mhs);
  30.         rabbitTemplate.setConfirmCallback(confirmCallback);
  31.         rabbitTemplate.setReturnCallback(returnCallback);
  32.         //id + 时间戳 全局唯一
  33.         CorrelationData correlationData = new CorrelationData("1234567890");
  34.         rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
  35.     }
  36.     //发送消息方法调用: 构建自定义对象消息
  37.     public void sendOrder(Order order) throws Exception {
  38.         rabbitTemplate.setConfirmCallback(confirmCallback);
  39.         rabbitTemplate.setReturnCallback(returnCallback);
  40.         //id + 时间戳 全局唯一
  41.         CorrelationData correlationData = new CorrelationData("0987654321");
  42.         rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
  43.     }
  44. }
复制代码
  1. // 消费者
  2. @Component
  3. public class RabbitReceiver {
  4.     @RabbitListener(bindings = @QueueBinding(
  5.             value = @Queue(value = "queue-1",
  6.                     durable="true"),
  7.             exchange = @Exchange(value = "exchange-1",
  8.                     durable="true",
  9.                     type= "topic",
  10.                     ignoreDeclarationExceptions = "true"),
  11.             key = "springboot.*"
  12.     )
  13.     )
  14.     @RabbitHandler
  15.     public void onMessage(Message message, Channel channel) throws Exception {
  16.         System.err.println("--------------------------------------");
  17.         System.err.println("消费端Payload: " + message.getPayload());
  18.         Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
  19.         //手工ACK
  20.         channel.basicAck(deliveryTag, false);
  21.     }
  22.     /**
  23.      *
  24.      * @param order
  25.      * @param channel
  26.      * @param headers
  27.      * @throws Exception
  28.      */
  29.     @RabbitListener(bindings = @QueueBinding(
  30.             value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
  31.                     durable="${spring.rabbitmq.listener.order.queue.durable}"),
  32.             exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
  33.                     durable="${spring.rabbitmq.listener.order.exchange.durable}",
  34.                     type= "${spring.rabbitmq.listener.order.exchange.type}",
  35.                     ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
  36.             key = "${spring.rabbitmq.listener.order.key}"
  37.     )
  38.     )
  39.     @RabbitHandler
  40.     public void onOrderMessage(@Payload Order order,
  41.                                Channel channel,
  42.                                @Headers Map<String, Object> headers) throws Exception {
  43.         System.err.println("--------------------------------------");
  44.         System.err.println("消费端order: " + order.getId());
  45.         Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
  46.         //手工ACK
  47.         channel.basicAck(deliveryTag, false);
  48.     }
  49. }
复制代码
  1. # 配置文件
  2. spring:
  3.   rabbitmq:
  4.     addresses: 192.168.11.76:5672
  5.     username: guest
  6.     password: guest
  7.     virtual-host: /
  8.     connection-timeout: 15000
  9.     publisher-confirms: true # 生产者exchange确认
  10.     publisher-returns: true # 生产者queue确认
  11.     template:
  12.       mandatory: true
  13.     listener:
  14.       simple:
  15.         acknowledge-mode: manual
  16.         concurrency: 5
  17.         max-concurrency: 10
  18.       order: # 自定义消费队列
  19.         queue:
  20.           name: queue-2
  21.           durable: true
  22.         exchange:
  23.           name: exchange-2
  24.           durable: true
  25.           type: topic
  26.           ignoreDeclarationExceptions: true
  27.         key: springboot.*
复制代码
<img alt="image-20240221125440948" loading="lazy">
SpringCloudStream 整合Rabbitmq
  1. /**
  2. * 生产者
  3. * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
  4. * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
  5. */
  6. public interface Barista {
  7.     String OUTPUT_CHANNEL = "output_channel";
  8.     //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。
  9.     @Output(Barista.OUTPUT_CHANNEL)
  10.     MessageChannel logoutput();
  11. }
  12. @EnableBinding(Barista.class)
  13. @Service
  14. public class RabbitmqSender {
  15.     @Autowired
  16.     private Barista barista;
  17.     // 发送消息
  18.     public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
  19.         try{
  20.             MessageHeaders mhs = new MessageHeaders(properties);
  21.             Message msg = MessageBuilder.createMessage(message, mhs);
  22.             boolean sendStatus = barista.logoutput().send(msg);
  23.             System.err.println("--------------sending -------------------");
  24.             System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
  25.         }catch (Exception e){
  26.             System.err.println("-------------error-------------");
  27.             e.printStackTrace();
  28.             throw new RuntimeException(e.getMessage());
  29.         }
  30.         return null;
  31.     }
  32. }
复制代码
  1. /**
  2. * 消费者
  3. * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
  4. * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
  5. */
  6. public interface ConsumerBarista {
  7.     String INPUT_CHANNEL = "input_channel";
  8.     //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题
  9.     @Input(ConsumerBarista.INPUT_CHANNEL)
  10.     SubscribableChannel loginput();
  11. }
  12. @EnableBinding(ConsumerBarista.class)
  13. @Service
  14. public class RabbitmqReceiver {
  15.     @StreamListener(ConsumerBarista.INPUT_CHANNEL)
  16.     public void receiver(Message message) throws Exception {
  17.         Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
  18.         Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
  19.         System.out.println("Input Stream 1 接受数据:" + message);
  20.         System.out.println("消费完毕------------");
  21.         channel.basicAck(deliveryTag, false);
  22.     }
  23. }
复制代码
  1. # 服务配置
  2. server:
  3.   port: 8888
  4.   context-path: /rabbitmq
  5. # spring应用配置
  6. spring:
  7.   application:
  8.     name: rabbitmq-cloud-stream
  9.   # SpringCloud 服务配置
  10.   cloud:
  11.     stream:
  12.       binders:
  13.         output_channel:
  14.           destination: exchange-3
  15.           group: queue-3
  16.           binder: rabbit_cluster
  17.         input_channel:
  18.           destination: exchange-3
  19.           group: queue-3
  20.           binder: rabbit_cluster
  21.           consumer:
  22.             concurrency: 1
  23.       bindings:
  24.         rabbit_cluster:
  25.           type: rabbit
  26.           environment:
  27.             spring:
  28.               rabbitmq:
  29.                 addresses: 192.168.11.76:5672
  30.                 username: guest
  31.                 password: guest
  32.                 virtual-host: /
  33.         input_channel:
  34.           consumer:
  35.             requeue-rejected: false
  36.             acknowledge-mode: MANUAL
  37.             recovery-interval: 3000
  38.             durable-subscription: true
  39.             max-concurrency: 5
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4