AMQP协议模型

- Server:又称为Broker,接受客户端的链接,实现AMQP实体服务
- Connection:连接,应用程序与Broker的网络连接
- channel:网络信道,几乎所有的操作都在channel中进行,是消息读写的通道,可建立多个channel,每个channel代表一个会话任务
- Message:消息本体,由Properties和Body组成,Proerties对消息进行装饰(优先级、延迟等特征),body是消息体内容
- Virtual host:虚拟地址,进行逻辑各级,一个VirtualHost里面可以有若干个Exchange和Queue,同一个VirtualHost不能有相同的Exchange或Queue
- Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列
- Binding:Exchange和Queue之间的虚拟链接 binding中可以报刊rounting key
- Rounting key :消息发送的路由规则
- Queue: 消息队列,保存并转发给消费者
AMQP 消息流转


命令行与管理台
- rabbitmqctl stop_app 关闭应用
- rabbitmqctl start_app 启动应用
- rabbitmqctl status 节点状态
- rabbitmqctl add_user username password 添加用户
- rabbitmqctl list_users 打印用户列表
- rabbitmqctl delete_user username 删除用户
- rabbitmqctl clear_permissions -p vhostpath username 用户权限清除
- rabbitmqctl add_vhost vhostpath 创建虚拟主机
- rabbitmqctl list_vhosts 打印虚拟主机列表
- rabbitmqctl list_queues 打印队列信息
- rabbitmqctl -p vhostpath purge_queue blue 清除队列里的消息
复制代码 Exchange 交换机
Exchange:接受消息,并根据路由键转发消息所绑定的队列

Exchange 交换机属性
- Name:交换机名称
- Type:类型
- direct 直连,发送到direct Exchange的消息被转发到RouteKey指定的Queue exchange=doutingKey
- Topic 发布\订阅,发送到Topic Exchange的消息会被转发到所有关心RouteKey中制定的Topic的Queue上
- fanout、不处理路由键,简单的将队列板绑定到交换机上,发送到交换机上的消息都会被转发到该交换机绑定的队列上、fount是最快的
- headers
- Durability:是否需要持久化
- Auto Delete:当最后一个绑定在Exchange上的队列删除后,自动删除Exchange
- Interal:当前exchange是否用于Rabbitmq内部使用,默认为FALSE
- Arguments:扩展参数,用户扩展AMQP协议参数
Binding-绑定
- Exchange和Exchange、queue之间绑定
消息可靠性投递 100%
生产端-可靠性投递
解决方案
- 消息落库、对消息状态达标
- 消息延迟投递、做二次确认,回掉检查
消费端-幂等性保证
解决方案:
Confirm确认消息
消费者收到消息会给生产者一个应答- channel.addConfirmListener
复制代码 Return 消息机制
RabbitMQ代码整合
Spring-AMQP
- RabbitAdmin -- Exchange\Queue\Binding 之间关系声明
- SpringAMQP 声明
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
- MessageConverter
RabbitAdmin
- package com.study.rabbitmq.config;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
- @Component
- public class RabbitMQConfig {
- @Bean
- public ConnectionFactory connectionFactory(){
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses("192.168.223.128:5672");
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- connectionFactory.setVirtualHost("/");
- return connectionFactory;
- }
- @Bean
- public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
- RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
- // setAutoStartup 必须设置为true 否则Spring不会加载RabbitAdmin
- rabbitAdmin.setAutoStartup(true);
- return rabbitAdmin;
- }
- /**
- * 针对消费者配置
- * 1. 设置交换机类型
- * 2. 将队列绑定到交换机
- FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
- HeadersExchange :通过添加属性key-value匹配
- DirectExchange:按照routingkey分发到指定队列
- TopicExchange:多关键字匹配
- */
- @Bean
- public TopicExchange exchange001() {
- return new TopicExchange("topic001", true, false);
- }
- @Bean
- public Queue queue001() {
- return new Queue("queue001", true); //队列持久
- }
- @Bean
- public Binding binding001() {
- return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
- }
- }
复制代码 RabbitTemplate
 - @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- return rabbitTemplate;
- }
-
- @Test
- public void testSendMessage() throws Exception {
- //1 创建消息
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.getHeaders().put("desc", "信息描述..");
- messageProperties.getHeaders().put("type", "自定义消息类型..");
- Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
- rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- System.err.println("------添加额外的设置---------");
- message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
- message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
- return message;
- }
- });
- }
复制代码 SimpleMessageListenerContainer
- SimpleMessageListenerContainer可以进行动态监听消息,设置事务、事务管理、事务属性、设置消费者数量,进行批量消费,设置消息确认模式、自动确认,是否重回队列、异常捕获header函数,设置消费者标签生成策略,是否独占模式,消费者属性,设置具体的监听器、消息转换器
- @Bean
- public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
- // 容器添加监听队列
- container.setQueues(queue001());
- // 设置监听数据量
- container.setConcurrentConsumers(1);
- container.setMaxConcurrentConsumers(5);
- // 是否重回队列 - FALSE
- container.setDefaultRequeueRejected(false);
- // 签收 - auto 自动签收
- container.setAcknowledgeMode(AcknowledgeMode.AUTO);
- //
- container.setExposeListenerChannel(true);
- // 设置客户端tag
- container.setConsumerTagStrategy(new ConsumerTagStrategy() {
- @Override
- public String createConsumerTag(String queue) {
- return queue + "_" + UUID.randomUUID().toString();
- }
- });
- // 消息监听处理
- container.setMessageListener(new ChannelAwareMessageListener() {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- System.out.println("---- 处理消费者"+new String(message.getBody()));
- }
- });
- return container;
- }
复制代码 消息监听MessageListenerAdapter 以及 消息封装处理 MessageConverter
- // 消息坚定
- MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
- adapter.setDefaultListenerMethod("consumeMessage"); // 设置监听处理方法名
- adapter.setMessageConverter(new TextMessageConverter()); // 设置消息封装类
- container.setMessageListener(adapter);
- // TextMessageConverter
- public class TextMessageConverter implements MessageConverter {
- @Override
- public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
- return new Message(object.toString().getBytes(), messageProperties);
- }
- @Override
- public Object fromMessage(Message message) throws MessageConversionException {
- String contentType = message.getMessageProperties().getContentType();
- if(null != contentType && contentType.contains("text")) {
- return new String(message.getBody());
- }
- return message.getBody();
- }
- }
- // MessageDelegate
- public class MessageDelegate {
- public void handleMessage(byte[] messageBody) {
- System.err.println("默认方法, 消息内容:" + new String(messageBody));
- }
- public void consumeMessage(byte[] messageBody) {
- System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
- }
- public void consumeMessage(String messageBody) {
- System.err.println("字符串方法, 消息内容:" + messageBody);
- }
- public void method1(String messageBody) {
- System.err.println("method1 收到消息内容:" + new String(messageBody));
- }
- public void method2(String messageBody) {
- System.err.println("method2 收到消息内容:" + new String(messageBody));
- }
- public void consumeMessage(Map messageBody) {
- System.err.println("map方法, 消息内容:" + messageBody);
- }
- public void consumeMessage(Order order) {
- System.err.println("order对象, 消息内容, id: " + order.getId() +
- ", name: " + order.getName() +
- ", content: "+ order.getContent());
- }
- public void consumeMessage(File file) {
- System.err.println("文件对象 方法, 消息内容:" + file.getName());
- }
- }
复制代码 SpringBoot整合RabbitMQ
- // 生产者
- @Component
- public class RabbitSender {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //回调函数: confirm确认
- final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.err.println("correlationData: " + correlationData);
- System.err.println("ack: " + ack);
- if(!ack){
- System.err.println("异常处理....");
- }
- }
- };
- //回调函数: return返回
- final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
- @Override
- public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
- String exchange, String routingKey) {
- System.err.println("return exchange: " + exchange + ", routingKey: "
- + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
- }
- };
- //发送消息方法调用: 构建Message消息
- public void send(Object message, Map<String, Object> properties) throws Exception {
- MessageHeaders mhs = new MessageHeaders(properties);
- Message msg = MessageBuilder.createMessage(message, mhs);
- rabbitTemplate.setConfirmCallback(confirmCallback);
- rabbitTemplate.setReturnCallback(returnCallback);
- //id + 时间戳 全局唯一
- CorrelationData correlationData = new CorrelationData("1234567890");
- rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
- }
- //发送消息方法调用: 构建自定义对象消息
- public void sendOrder(Order order) throws Exception {
- rabbitTemplate.setConfirmCallback(confirmCallback);
- rabbitTemplate.setReturnCallback(returnCallback);
- //id + 时间戳 全局唯一
- CorrelationData correlationData = new CorrelationData("0987654321");
- rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
- }
- }
复制代码- // 消费者
- @Component
- public class RabbitReceiver {
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "queue-1",
- durable="true"),
- exchange = @Exchange(value = "exchange-1",
- durable="true",
- type= "topic",
- ignoreDeclarationExceptions = "true"),
- key = "springboot.*"
- )
- )
- @RabbitHandler
- public void onMessage(Message message, Channel channel) throws Exception {
- System.err.println("--------------------------------------");
- System.err.println("消费端Payload: " + message.getPayload());
- Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
- //手工ACK
- channel.basicAck(deliveryTag, false);
- }
- /**
- *
- * @param order
- * @param channel
- * @param headers
- * @throws Exception
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
- durable="${spring.rabbitmq.listener.order.queue.durable}"),
- exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
- durable="${spring.rabbitmq.listener.order.exchange.durable}",
- type= "${spring.rabbitmq.listener.order.exchange.type}",
- ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
- key = "${spring.rabbitmq.listener.order.key}"
- )
- )
- @RabbitHandler
- public void onOrderMessage(@Payload Order order,
- Channel channel,
- @Headers Map<String, Object> headers) throws Exception {
- System.err.println("--------------------------------------");
- System.err.println("消费端order: " + order.getId());
- Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
- //手工ACK
- channel.basicAck(deliveryTag, false);
- }
- }
复制代码- # 配置文件
- spring:
- rabbitmq:
- addresses: 192.168.11.76:5672
- username: guest
- password: guest
- virtual-host: /
- connection-timeout: 15000
- publisher-confirms: true # 生产者exchange确认
- publisher-returns: true # 生产者queue确认
- template:
- mandatory: true
- listener:
- simple:
- acknowledge-mode: manual
- concurrency: 5
- max-concurrency: 10
- order: # 自定义消费队列
- queue:
- name: queue-2
- durable: true
- exchange:
- name: exchange-2
- durable: true
- type: topic
- ignoreDeclarationExceptions: true
- key: springboot.*
复制代码 <img alt="image-20240221125440948" loading="lazy">
SpringCloudStream 整合Rabbitmq
- /**
- * 生产者
- * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
- * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
- */
- public interface Barista {
- String OUTPUT_CHANNEL = "output_channel";
- //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。
- @Output(Barista.OUTPUT_CHANNEL)
- MessageChannel logoutput();
- }
- @EnableBinding(Barista.class)
- @Service
- public class RabbitmqSender {
- @Autowired
- private Barista barista;
- // 发送消息
- public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
- try{
- MessageHeaders mhs = new MessageHeaders(properties);
- Message msg = MessageBuilder.createMessage(message, mhs);
- boolean sendStatus = barista.logoutput().send(msg);
- System.err.println("--------------sending -------------------");
- System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
- }catch (Exception e){
- System.err.println("-------------error-------------");
- e.printStackTrace();
- throw new RuntimeException(e.getMessage());
- }
- return null;
- }
- }
复制代码- /**
- * 消费者
- * 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
- * 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
- */
- public interface ConsumerBarista {
- String INPUT_CHANNEL = "input_channel";
- //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题
- @Input(ConsumerBarista.INPUT_CHANNEL)
- SubscribableChannel loginput();
- }
- @EnableBinding(ConsumerBarista.class)
- @Service
- public class RabbitmqReceiver {
- @StreamListener(ConsumerBarista.INPUT_CHANNEL)
- public void receiver(Message message) throws Exception {
- Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
- Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
- System.out.println("Input Stream 1 接受数据:" + message);
- System.out.println("消费完毕------------");
- channel.basicAck(deliveryTag, false);
- }
- }
复制代码- # 服务配置
- server:
- port: 8888
- context-path: /rabbitmq
- # spring应用配置
- spring:
- application:
- name: rabbitmq-cloud-stream
- # SpringCloud 服务配置
- cloud:
- stream:
- binders:
- output_channel:
- destination: exchange-3
- group: queue-3
- binder: rabbit_cluster
- input_channel:
- destination: exchange-3
- group: queue-3
- binder: rabbit_cluster
- consumer:
- concurrency: 1
- bindings:
- rabbit_cluster:
- type: rabbit
- environment:
- spring:
- rabbitmq:
- addresses: 192.168.11.76:5672
- username: guest
- password: guest
- virtual-host: /
- input_channel:
- consumer:
- requeue-rejected: false
- acknowledge-mode: MANUAL
- recovery-interval: 3000
- durable-subscription: true
- max-concurrency: 5
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |