RabbitMQ
一、MQ相关的概念1.1、MQ的根本概念
什么是MQ
MQ(Message Queue,消息队列)是一种应用程序对应用程序的通信方法。应用程序通过写入和检索出入队列的针对性消息来通信,这些消息可以存储在内存或磁盘中。消息队列答应应用程序独立地运行,并以可靠的方式相互通信。
为啥要用MQ
[*]解耦: 答应体系独立开发、部署和运行,淘汰体系间的直接依赖
[*]异步处理: 非壅闭操作,请求处理与相应分离
[*]削峰填谷: 缓冲突发请求,防止体系过载
[*]可靠通信: 确保消息通报的可靠性,支持长期化
[*]扩展性: 便于体系水平扩展
[*]顺序包管: 某些MQ可确保消息按特定顺序处理
常用的MQ
[*]RabbitMQ: 基于Erlang开发,实现AMQP协议,轻量级,易于部署
[*]Kafka: 高吞吐量的分布式消息体系,适合大数据场景
[*]RocketMQ: 阿里开源的分布式消息中心件,高性能、高可靠
[*]ActiveMQ: 老牌消息中心件,实现JMS规范
[*]ZeroMQ: 更像是网络通信库,无需单独的消息服务器
1.2、消息队列协议
什么是协议
协议是通信两边约定的规则集合,定义了通信格式、通信方式、数据传输方式等。
网络协议的三要素
[*]语法: 数据与控制信息的结构或格式
[*]语义: 解释控制信息每个部分的含义
[*]时序: 事件发生的顺序
常用消息中心件协议
[*]AMQP (Advanced Message Queuing Protocol): 高级消息队列协议,是一个开放标准的应用层协议,为面向消息的中心件设计。RabbitMQ实现了AMQP。
[*]MQTT (Message Queuing Telemetry Transport): 轻量级的发布/订阅消息传输协议,适用于物联网场景。
[*]STOMP (Simple/Streaming Text Oriented Messaging Protocol): 简单的面向文本的消息通报协议。
[*]JMS (Java Message Service): Java消息服务,是Java平台中关于面向消息中心件的API。
[*]OpenWire: ActiveMQ的原生协议。
1.3、消息队列长期化
消息队列长期化是指将消息存储到磁盘上,确保在消息代理重启后消息不会丢失。
长期化方式:
[*]内存长期化: 速度快但不可靠,服务重启后数据丢失
[*]文件长期化: 将消息存储在文件体系中
[*]数据库长期化: 将消息存储在数据库中,便于管理和查询
RabbitMQ长期化需要:
[*]队列长期化:声明队列时设置为长期化
[*]消息长期化:发送消息时设置长期化属性
1.4、消息的分发策略
消息队列分发策略决定了消息如何分发给多个消费者:
[*]轮询分发 (Round-Robin): 依次将消息发送给下一个消费者,实现负载均衡
[*]公中分发 (Fair Dispatch): 根据消费者的处理能力分发消息,通常基于确认机制
[*]一致性哈希: 相同特征的消息发送到同一消费者
[*]随机分发: 随机选择消费者
[*]优先级分发: 根据消息优先级分发
1.5、消息队列的高可用和高可靠
高可用:确保服务在部分节点故障时仍能继续工作
实现方式:
[*]集群模式
[*]镜像队列
[*]主备模式
[*]负载均衡
高可靠:确保消息不丢失
实现方式:
[*]消息长期化
[*]发送方确认机制
[*]消费方确认机制
[*]消息重试机制
二、RabbitMQ安装启动
Linux安装
# 安装Erlang环境
apt-get install erlang
# 安装RabbitMQ
apt-get install rabbitmq-server
# 启动RabbitMQ服务
systemctl start rabbitmq-server
# 启用管理插件
rabbitmq-plugins enable rabbitmq_management
# 创建用户
rabbitmqctl add_user admin password
# 设置用户角色
rabbitmqctl set_user_tags admin administrator
# 设置用户权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" 访问管理界面: http://server-ip:15672
Docker安装
# 拉取RabbitMQ镜像(带管理插件)
docker pull rabbitmq:3-management
# 启动容器
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management 三、RabbitMQ快速入门
3.1、RabbitMQ的概念
RabbitMQ是实现AMQP(高级消息队列协议)的开源消息代理软件。它吸取、存储和转发消息数据。RabbitMQ用Erlang语言编写,集群和故障转移非常容易。
3.2、AMQP协议
AMQP(高级消息队列协议)是一个开放标准的应用层协议,为面向消息的中心件设计。AMQP定义了消息通报所需的各种元素,如互换机、队列、绑定、路由键等。
AMQP的主要特点:
[*]可互操作性:不同供应商实现的AMQP可互操作
[*]安全性:支持TLS加密和SASL认证
[*]可靠性:事件支持,确认机制
[*]标准化:开放标准
3.3、RabbitMQ架构构成
RabbitMQ的核心架构由以下部分构成:
[*]Broker: 吸取和分发消息的消息中心件服务节点
[*]Virtual Host: 假造主机,用于隔离不同的消息情况
[*]Connection: 客户端与RabbitMQ Broker的TCP连接
[*]Channel: 信道,建立在Connection之上的假造连接
[*]Exchange: 互换机,吸取消息并转发到绑定的队列
[*]Queue: 队列,存储消息
[*]Binding: 绑定,互换机与队列之间的关联
[*]Routing Key: 路由键,互换机根据路由键决定消息转发到哪个队列
3.4、四大核心概念
[*]生产者(Producer): 发送消息的应用程序
[*]互换机(Exchange): 吸取生产者发送的消息,并根据路由键将消息路由到一个或多个队列
[*]队列(Queue): 存储消息的缓冲区,直到消费者吸取它们
[*]消费者(Consumer): 吸取消息的应用程序
3.5、RabbitMQ角色分类
RabbitMQ中的用户角色:
[*]超等管理员(administrator): 可以管理用户、策略、假造主机等
[*]监控者(monitoring): 可以查看节点的状态
[*]策略订定者(policymaker): 可以管理策略和参数
[*]管理者(management): 可以管理假造主机中的资源
[*]平常用户(none): 只能使用已设置的假造主机和资源
3.6、RabbitMQ消息模式
简单模式
一个生产者、一个队列、一个消费者的最简单模式。
工作模式
一个生产者、一个队列、多个消费者,消息在多个消费者间分发。
发布/订阅模式(fanout)
生产者将消息发送到fanout互换机,互换机将消息广播到所有绑定的队列。
路由模式(direct)
生产者将消息发送到direct互换机,互换机根据消息的路由键将消息路由到特定队列。
主题模式(topic)
生产者将消息发送到topic互换机,互换机根据路由键的模式匹配将消息路由到队列。
参数模式
生产者将消息发送到headers互换机,互换机根据消息头信息而非路由键举行路由。
四、简单模式——Hello Word
4.1、导入rabbitmq依赖
Maven依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency> 4.2、编写消息生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址
factory.setHost("localhost");
// 设置用户名和密码
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" Sent '" + message + "'");
// 关闭通道和连接
channel.close();
connection.close();
}
} 4.3、编写消息消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.co
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]