消息队列RabbitMQ
消息队列是一种通信机制,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,用于在分布式体系中实现不同应用程序或服务之间的异步消息传递。它通过在发送者和接收者之间提供一个暂时存储的“队列”,帮助解耦服务之间的直接依赖,从而提升体系的可扩展性和可靠性。以下是消息队列的一些关键特性和工作原理:
关键特性
- 异步通信:
- 发送者可以将消息发送到队列中,而无需等候接收者处置惩罚完这些消息。接收者可以在符合的时间从队列中获取消息进行处置惩罚。
- 解耦:
- 发送者和接收者不必要同时在线,且它们之间没有直接的依赖关系。这使得体系的各个部分可以独立开辟、部署和扩展。
- 可靠性:
- 消息可以在队列中持久化,确保即使在体系故障的情况下也不会丢失。许多消息队列体系支持消息确认机制,以确保消息被成功处置惩罚。
- 流量控制:
- 消息队列可以平衡负载,当接收者处置惩罚速度较慢时,消息可以在队列中排队,避免体系崩溃。
- 顺序处置惩罚:
- 一些消息队列支持包管消息的顺序,确保消息按照发送的顺序被处置惩罚。
工作原理
- 生产者:
- 消息队列:
- 存储消息的地方,负责将消息保存在队列中,直到消费者准备好处置惩罚它们。
- 消费者:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按肯定的规则将消息路由转发到某个队列,对消息进行过虑。exchange有下面四种(先了解:fanout,direct,topics,header)
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程
1.发送消息
1、生产者和Broker创建TCP毗连。
2、生产者和Broker创建通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
2.消息接收消息
1、消费者和Broker创建TCP毗连
2、消费者和Broker创建通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
应用场景
- 异步处置惩罚:如用户注册后发送确认邮件,可以在用户提交表单后立即返回相应,而邮件发送可以在后台处置惩罚。
- 任务调理:将任务放入队列中,后台服务按需处置惩罚,避免了请求壅闭。
- 体系解耦:不同服务之间通过消息队列进行通信,减少直接依赖,方便体系的扩展和维护。
- 大数据处置惩罚:由于数据量太大,程序一时处置惩罚不外来,可以通过把数据放入MQ,多开几个消费者行止理消息,比如:日记网络等
常见的消息队列体系
- RabbitMQ:一个开源的消息队列体系,支持多种消息协议,具有良好的机动性和可靠性。
- Apache Kafka:一个分布式流处置惩罚平台,专为高吞吐量和高可用性计划,得当大数据处置惩罚场景。
- ActiveMQ:一个开源的消息中间件,支持多种协议和语言,得当企业级应用。
- Redis:固然重要是一个内存数据存储,但也可以用作轻量级的消息队列。
启用管理插件
RabbitMQ 提供了一个 web 管理界面,方便管理和监控 RabbitMQ 实例。可以通过以下命令启用该插件:
- sudo rabbitmq-plugins enable rabbitmq_management
复制代码 启用后,管理界面通常会在 http://localhost:15672 访问,默认用户名和密码均为 guest。
设置用户权限
在生产情况中,建议创建一个新的用户并分配相应权限。以下是创建新用户的步骤:
1.添加用户
- sudo rabbitmqctl add_user 用户 密码
复制代码 2.设置用户权限
- sudo rabbitmqctl set_user_tags 用户 角色
复制代码 脚色范例:
administrator
- 权限:完全控制 RabbitMQ 服务器的所有功能,包括创建和管理用户、虚拟主机、交换机、队列等。
- 用途:实用于必要对 RabbitMQ 体系进行全面管理的用户。
monitoring
- 权限:仅限于查察 RabbitMQ 服务器的状态和监控信息,无法进行更改操作。
- 用途:实用于必要监控 RabbitMQ 性能和状态但不必要进行管理操作的用户。
policy
- 权限:用户可以管理 RabbitMQ 的策略(Policies),如设置队列的 TTL(生存时间)、最大长度等。
- 用途:实用于必要管理策略但不必要完全管理员权限的用户。
lifecycle
- 权限:允许用户管理生命周期操作,包括创建、删除和更新虚拟主机等。
- 用途:实用于必要管理 RabbitMQ 实例生命周期的用户。
management
- 权限:访问 RabbitMQ 的管理插件,允许用户通过管理界面查察和管理 RabbitMQ 资源,但不具备全部的管理员权限。
- 用途:实用于必要访问管理界面但不必要完全控制的用户。
3.赋予用户特定主的权限
- sudo rabbitmqctl set_permissions -p / 用户 ".*" ".*" ".*"
复制代码 ".*" 的寄义
在命令中使用的 "*" 和 ".*" 是正则表达式的语法,详细寄义如下:
- ".*"
- . 表示匹配任何单个字符。
- * 表示匹配前面的元素零次或多次。
因此,".*" 表示匹配任意字符串,包括空字符串。这意味着用户 newuser 对于指定虚拟主机 / 的以下权限:
- 设置权限 (<configure>):用户可以设置任意交换机。
- 写入权限 (<write>):用户可以向任意队列发送消息。
- 读取权限 (<read>):用户可以从任意队列接收消息。
交换机和对列设置
1.创建交换机
交换机是Rabbitmq 的紧张概念,用于接受消息并将其路由到一个或多个队列。可以通过管理界面或命令
通过命令情势创建交换机:
- sudo rabbitmqctl add_exchange my_exchange direct
复制代码 这里,my_exchange 是交换机的名称,direct 是交换机的范例。
2.创建队列
队列用于存储消息,可以使用下面的命令创建队列
通过命令行创建队列
- sudo rabbitmqctl add_queue my_queue
复制代码 3.绑定交换机和队列
一旦创建了交换机和队列,就可以将他们绑定在一起:
- sudo rabbitmqctl bind_queue my_queue my_exchange routing_key
复制代码 这里,routing_key 是用于路由消息的关键字
设置文件
通常在/etc/rabbitmq/rabbitmq.conf
使用 RabbitMQ 的客户端
RabbitMQ 提供了多种语言的客户端库,包括 Python、Java、Node.js 等。以下是一个使用 Python 的示例:
安装 Pika 库(Python 的 RabbitMQ 客户端)
- 发送消息
- 创建一个 Python 文件,例如 send.py,然后输入以下代码:
- import pika
- # 连接到 RabbitMQ 服务器
- connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
- channel = connection.channel()
- # 声明队列
- channel.queue_declare(queue='my_queue')
- # 发送消息
- channel.basic_publish(exchange='',
- routing_key='my_queue',
- body='Hello, RabbitMQ!')
- print(" [x] Sent 'Hello, RabbitMQ!'")
- # 关闭连接
- connection.close()
- 接收消息
- 创建另一个 Python 文件,例如 receive.py,然后输入以下代码:
- import pika
- # 连接到 RabbitMQ 服务器
- connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
- channel = connection.channel()
- # 声明队列
- channel.queue_declare(queue='my_queue')
- # 定义消息处理回调函数
- def callback(ch, method, properties, body):
- print(f" [x] Received {body.decode()}")
- # 设置消费队列
- channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- # 开始消费消息
- channel.start_consuming()
复制代码 运行代码
先运行接受信息脚本
其次运行发送信息脚本
RabbitMQ集群部署
主机域名192.168.218.221node1192.168.218.220node2192.168.218.227node3 编辑每台/etc/hosts文件
- 192.168.218.221 node1
- 192.168.218.220 node2
- 192.168.218.227 node3
复制代码 编辑/etc/rabbitmq/rabbitmq-env.conf
- 192.168.218.221上
- NODENAME=rabbit@node1
- 192.168.218.220上
- NODENAME=rabbit@node2
- 192.168.218.227上
- NODENAME=rabbit@node3
- 查看文件,确保每台机器都一样
- cat /var/lib/rabbitmq/.erlang.cookie
- 重新启动
- systemctl restart rabbitmq-server
复制代码 在node1上执行
- rabbitmqctl stop_app
- rabbitmqctl reset # 如果此节点以前有集群信息,则重置
- rabbitmqctl start_app
复制代码 然后在node2上
- rabbitmqctl stop_app
- rabbitmqctl join_cluster rabbit@node1
- rabbitmqctl join_cluster rabbit@node3
复制代码 查抄集群状态
- rabbitmqctl cluster_status
复制代码 创建用户密码都相同
访问任意一台服务
192.168.218.221:15672
keepalive+nginx+RabbitMQ实现负载均衡高可用
设置nginx反向代理负载均衡
- vim /etc/nginx/conf.d/rabbitmq.conf
- upstream rabbitmqserver {
- server 192.168.218.221:15672;
- server 192.168.218.220:15672;
- server 192.168.218.227:15672;
- }
- server {
- listen 15675;
- server_name 192.168.218.100;
- location / {
- proxy_pass http://rabbitmqserver;
- index index.html index.htm;
- }
- }
- scp -r /etc/nginx/conf.d/rabbitmq.conf 192.168.218.220:/etc/nginx/conf.d/rabbitmq.conf
复制代码 设置keepalived
- 主keepalived在192.168.218.221上
- vim /etc/keepalived/keepalived.conf
- ! Configuration File for keepalived
-
- global_defs {
- router_id LVS01
- }
- vrrp_script chk_nginx {
- script "/etc/keepalived/nginx_check.sh"
- interval 1
- weight -30
- fall 3
- rise 2
- timeout 2
- }
- vrrp_instance VI_MYSQL {
- state MASTER
- interface ens33
- virtual_router_id 100
- priority 100
- advert_int 1
- authentication {
- auth_type PASS
- auth_pass 1111
- }
- virtual_ipaddress {
- 192.168.218.100
- }
- track_script {
- chk_nginx
- }
- }
- 备在192.168.218.220上
- vim /etc/keepalived/keepalived.conf
- ! Configuration File for keepalived
- global_defs {
- router_id LVS02
- }
- vrrp_script chk_nginx {
- script "/etc/keepalived/nginx_check.sh"
- interval 2
- fall 3
- weight -30
- rise 2
- timeout 2
- }
- vrrp_instance VI_1 {
- state BACKUP
- interface ens33
- virtual_router_id 100
- priority 99
- advert_int 1
- authentication {
- auth_type PASS
- auth_pass 1111
- }
- virtual_ipaddress {
- 192.168.218.100
- }
- track_script {
- chk_nginx
- }
- }
复制代码 编辑nginx健康检测脚本(221和220)
- cat /etc/keepalived/nginx_check.sh
- #!/bin/bash
- # 健康检查 URL
- HEALTHCHECK_URL="http://192.168.218.221/healthcheck"
- # 使用 curl 检查健康状态
- HTTP_RESPONSE=$(curl --write-out "%{http_code}" --silent --output /dev/null "$HEALTHCHECK_URL")
- if [ "$HTTP_RESPONSE" -eq 200 ]; then
- exit 0 # 健康状态
- else
- exit 1 # 不健康状态
- fi
复制代码 依次启动主备keepalived
访问192.168.218.100:15675
Docker部署RabbitMQ集群
- docker run -d --hostname rabbit_node1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
- docker run -d --hostname rabbit_node2 --name rabbitmq2 -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_node1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
- docker run -d --hostname rabbit_node3 --name rabbitmq3 -p 15674:15672 -p 5674:5672 --link rabbitmq1:rabbit_node1 --link rabbitmq2:rabbit_node2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
- 参数说明:
- --hostname ⾃定义Docker容器的 hostname
- --link 容器之间连接,link不可或缺,使得三个容器能互相通信
- --privileged=true 使⽤该参数,container内的root拥有真正的root权限,否则容器出现permission denied
- -v 宿主机和容器路径映射
- 参数 RABBITMQ_NODENAME,缺省 Unix*:rabbit@$HOSTNAME
- 参数 RABBITMQ_DEFAULT_USER=admin
- 参数 RABBITMQ_DEFAULT_PASS=admin
- Erlang Cookie 值必须相同,也就是⼀个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同, 相当于不同节点之间通讯的密钥,erlang.cookie是erlang的分布式token⽂件,集群内各个节点的erlang.cookie需要相同,才可以互相通信
- #节点⼀配置集群
- docker exec -it rabbitmq1 bash
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl start_app
- exit
- #节点⼆加⼊集群,--ram是以内存⽅式加⼊,忽略该参数默认为磁盘节点
- docker exec -it rabbitmq2 bash
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl join_cluster --ram rabbit@rabbit_node1
- rabbitmqctl start_app
- exit
- #节点三加⼊集群,--ram是以内存⽅式加⼊,忽略该参数默认为磁盘节点。
- docker exec -it rabbitmq3 bash
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl join_cluster --ram rabbit@rabbit_node2
- rabbitmqctl start_app
- exit
- ##查看集群节点状态,配置启动了3个节点,1个磁盘节点和2个内存节点
- rabbitmqctl cluster_status
复制代码 访问页面192.168.218.221:15672
用户为admin 密码为默认密码guest 可以修改
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |