消息队列RabbitMQ

打印 上一主题 下一主题

主题 862|帖子 862|积分 2586

消息队列RabbitMQ

消息队列是一种通信机制,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,用于在分布式体系中实现不同应用程序或服务之间的异步消息传递。它通过在发送者和接收者之间提供一个暂时存储的“队列”,帮助解耦服务之间的直接依赖,从而提升体系的可扩展性和可靠性。以下是消息队列的一些关键特性和工作原理:
关键特性


  • 异步通信

    • 发送者可以将消息发送到队列中,而无需等候接收者处置惩罚完这些消息。接收者可以在符合的时间从队列中获取消息进行处置惩罚。

  • 解耦

    • 发送者和接收者不必要同时在线,且它们之间没有直接的依赖关系。这使得体系的各个部分可以独立开辟、部署和扩展。

  • 可靠性

    • 消息可以在队列中持久化,确保即使在体系故障的情况下也不会丢失。许多消息队列体系支持消息确认机制,以确保消息被成功处置惩罚。

  • 流量控制

    • 消息队列可以平衡负载,当接收者处置惩罚速度较慢时,消息可以在队列中排队,避免体系崩溃。

  • 顺序处置惩罚

    • 一些消息队列支持包管消息的顺序,确保消息按照发送的顺序被处置惩罚。

工作原理


  • 生产者

    • 负责天生消息并将其发送到消息队列中。

  • 消息队列

    • 存储消息的地方,负责将消息保存在队列中,直到消费者准备好处置惩罚它们。

  • 消费者

    • 从消息队列中读取消息并进行处置惩罚。


Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按肯定的规则将消息路由转发到某个队列,对消息进行过虑。exchange有下面四种(先了解:fanout,direct,topics,header)
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程
1.发送消息
1、生产者和Broker创建TCP毗连。
2、生产者和Broker创建通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
2.消息接收消息
1、消费者和Broker创建TCP毗连
2、消费者和Broker创建通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
应用场景



  • 异步处置惩罚:如用户注册后发送确认邮件,可以在用户提交表单后立即返回相应,而邮件发送可以在后台处置惩罚。
  • 任务调理:将任务放入队列中,后台服务按需处置惩罚,避免了请求壅闭。
  • 体系解耦:不同服务之间通过消息队列进行通信,减少直接依赖,方便体系的扩展和维护。
  • 大数据处置惩罚:由于数据量太大,程序一时处置惩罚不外来,可以通过把数据放入MQ,多开几个消费者行止理消息,比如:日记网络等
常见的消息队列体系



  • RabbitMQ:一个开源的消息队列体系,支持多种消息协议,具有良好的机动性和可靠性。
  • Apache Kafka:一个分布式流处置惩罚平台,专为高吞吐量和高可用性计划,得当大数据处置惩罚场景。
  • ActiveMQ:一个开源的消息中间件,支持多种协议和语言,得当企业级应用。
  • Redis:固然重要是一个内存数据存储,但也可以用作轻量级的消息队列。
启用管理插件

RabbitMQ 提供了一个 web 管理界面,方便管理和监控 RabbitMQ 实例。可以通过以下命令启用该插件:
  1. sudo rabbitmq-plugins enable rabbitmq_management
复制代码
启用后,管理界面通常会在 http://localhost:15672 访问,默认用户名和密码均为 guest。
设置用户权限

在生产情况中,建议创建一个新的用户并分配相应权限。以下是创建新用户的步骤:
1.添加用户
  1. sudo  rabbitmqctl  add_user  用户  密码
复制代码
2.设置用户权限
  1. sudo  rabbitmqctl  set_user_tags 用户  角色
复制代码
脚色范例:
administrator


  • 权限:完全控制 RabbitMQ 服务器的所有功能,包括创建和管理用户、虚拟主机、交换机、队列等。
  • 用途:实用于必要对 RabbitMQ 体系进行全面管理的用户。
monitoring


  • 权限:仅限于查察 RabbitMQ 服务器的状态和监控信息,无法进行更改操作。
  • 用途:实用于必要监控 RabbitMQ 性能和状态但不必要进行管理操作的用户。
policy


  • 权限:用户可以管理 RabbitMQ 的策略(Policies),如设置队列的 TTL(生存时间)、最大长度等。
  • 用途:实用于必要管理策略但不必要完全管理员权限的用户。
lifecycle


  • 权限:允许用户管理生命周期操作,包括创建、删除和更新虚拟主机等。
  • 用途:实用于必要管理 RabbitMQ 实例生命周期的用户。
management


  • 权限:访问 RabbitMQ 的管理插件,允许用户通过管理界面查察和管理 RabbitMQ 资源,但不具备全部的管理员权限。
  • 用途:实用于必要访问管理界面但不必要完全控制的用户。
3.赋予用户特定主的权限
  1. sudo rabbitmqctl set_permissions -p / 用户 ".*" ".*" ".*"
复制代码
".*" 的寄义
在命令中使用的 "*" 和 ".*" 是正则表达式的语法,详细寄义如下:


  • ".*"

    • . 表示匹配任何单个字符。
    • * 表示匹配前面的元素零次或多次。

因此,".*" 表示匹配任意字符串,包括空字符串。这意味着用户 newuser 对于指定虚拟主机 / 的以下权限:

  • 设置权限 (<configure>):用户可以设置任意交换机。
  • 写入权限 (<write>):用户可以向任意队列发送消息。
  • 读取权限 (<read>):用户可以从任意队列接收消息。
交换机和对列设置
1.创建交换机
交换机是Rabbitmq 的紧张概念,用于接受消息并将其路由到一个或多个队列。可以通过管理界面或命令
通过命令情势创建交换机:
  1. sudo rabbitmqctl add_exchange my_exchange direct
复制代码
这里,my_exchange 是交换机的名称,direct 是交换机的范例。
2.创建队列
队列用于存储消息,可以使用下面的命令创建队列
通过命令行创建队列
  1. sudo  rabbitmqctl  add_queue  my_queue
复制代码
3.绑定交换机和队列
一旦创建了交换机和队列,就可以将他们绑定在一起:
  1. sudo rabbitmqctl bind_queue my_queue my_exchange routing_key
复制代码
这里,routing_key 是用于路由消息的关键字
设置文件

通常在/etc/rabbitmq/rabbitmq.conf
使用 RabbitMQ 的客户端

RabbitMQ 提供了多种语言的客户端库,包括 Python、Java、Node.js 等。以下是一个使用 Python 的示例:
安装 Pika 库(Python 的 RabbitMQ 客户端)
  1. pip install pika
复制代码
  1. 发送消息
  2. 创建一个 Python 文件,例如 send.py,然后输入以下代码:
  3. import pika
  4. # 连接到 RabbitMQ 服务器
  5. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  6. channel = connection.channel()
  7. # 声明队列
  8. channel.queue_declare(queue='my_queue')
  9. # 发送消息
  10. channel.basic_publish(exchange='',
  11.                       routing_key='my_queue',
  12.                       body='Hello, RabbitMQ!')
  13. print(" [x] Sent 'Hello, RabbitMQ!'")
  14. # 关闭连接
  15. connection.close()
  16. 接收消息
  17. 创建另一个 Python 文件,例如 receive.py,然后输入以下代码:
  18. import pika
  19. # 连接到 RabbitMQ 服务器
  20. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  21. channel = connection.channel()
  22. # 声明队列
  23. channel.queue_declare(queue='my_queue')
  24. # 定义消息处理回调函数
  25. def callback(ch, method, properties, body):
  26.     print(f" [x] Received {body.decode()}")
  27. # 设置消费队列
  28. channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True)
  29. print(' [*] Waiting for messages. To exit press CTRL+C')
  30. # 开始消费消息
  31. channel.start_consuming()
复制代码
运行代码
先运行接受信息脚本
  1. python receive.py
复制代码
其次运行发送信息脚本
  1. python send.py
复制代码
RabbitMQ集群部署

主机域名192.168.218.221node1192.168.218.220node2192.168.218.227node3 编辑每台/etc/hosts文件
  1. 192.168.218.221  node1
  2. 192.168.218.220  node2
  3. 192.168.218.227  node3
复制代码
编辑/etc/rabbitmq/rabbitmq-env.conf
  1. 192.168.218.221上
  2. NODENAME=rabbit@node1
  3. 192.168.218.220上
  4. NODENAME=rabbit@node2
  5. 192.168.218.227上
  6. NODENAME=rabbit@node3
  7. 查看文件,确保每台机器都一样
  8. cat /var/lib/rabbitmq/.erlang.cookie
  9. 重新启动
  10. systemctl restart rabbitmq-server
复制代码
在node1上执行
  1. rabbitmqctl stop_app
  2. rabbitmqctl reset  # 如果此节点以前有集群信息,则重置
  3. rabbitmqctl start_app
复制代码
然后在node2上
  1. rabbitmqctl stop_app
  2. rabbitmqctl join_cluster rabbit@node1
  3. rabbitmqctl join_cluster rabbit@node3
复制代码
查抄集群状态
  1. rabbitmqctl cluster_status
复制代码
创建用户密码都相同
访问任意一台服务
192.168.218.221:15672

keepalive+nginx+RabbitMQ实现负载均衡高可用

设置nginx反向代理负载均衡
  1. vim /etc/nginx/conf.d/rabbitmq.conf
  2. upstream rabbitmqserver {
  3.     server 192.168.218.221:15672;
  4.     server 192.168.218.220:15672;
  5.     server 192.168.218.227:15672;
  6. }
  7. server {
  8.     listen 15675;
  9.     server_name 192.168.218.100;
  10.     location / {
  11.     proxy_pass http://rabbitmqserver;
  12.     index  index.html  index.htm;
  13.     }
  14. }
  15. scp -r /etc/nginx/conf.d/rabbitmq.conf   192.168.218.220:/etc/nginx/conf.d/rabbitmq.conf
复制代码
设置keepalived
  1. 主keepalived在192.168.218.221上
  2. vim  /etc/keepalived/keepalived.conf
  3. ! Configuration File for keepalived
  4.   
  5. global_defs {
  6.     router_id LVS01
  7. }
  8. vrrp_script chk_nginx {
  9.     script "/etc/keepalived/nginx_check.sh"
  10.     interval 1
  11.     weight -30
  12.     fall 3
  13.     rise 2
  14.     timeout 2
  15. }
  16. vrrp_instance VI_MYSQL {
  17.     state MASTER
  18.     interface ens33
  19.     virtual_router_id 100
  20.     priority 100
  21.     advert_int 1
  22.     authentication {
  23.         auth_type PASS
  24.         auth_pass 1111
  25.     }
  26.     virtual_ipaddress {
  27.         192.168.218.100
  28.     }
  29.     track_script {
  30.         chk_nginx
  31.     }
  32. }
  33. 备在192.168.218.220上
  34. vim  /etc/keepalived/keepalived.conf
  35. ! Configuration File for keepalived
  36. global_defs {
  37.     router_id LVS02
  38. }
  39. vrrp_script chk_nginx {
  40.     script "/etc/keepalived/nginx_check.sh"
  41.     interval 2
  42.     fall 3
  43.     weight -30
  44.     rise 2
  45.     timeout 2
  46. }
  47. vrrp_instance VI_1 {
  48.     state BACKUP
  49.     interface ens33
  50.     virtual_router_id 100
  51.     priority 99
  52.     advert_int 1
  53.     authentication {
  54.         auth_type PASS
  55.         auth_pass 1111
  56.     }
  57.     virtual_ipaddress {
  58.         192.168.218.100
  59.     }
  60.     track_script {
  61.         chk_nginx
  62.     }
  63. }
复制代码
编辑nginx健康检测脚本(221和220)
  1. cat /etc/keepalived/nginx_check.sh
  2. #!/bin/bash
  3. # 健康检查 URL
  4. HEALTHCHECK_URL="http://192.168.218.221/healthcheck"
  5. # 使用 curl 检查健康状态
  6. HTTP_RESPONSE=$(curl --write-out "%{http_code}" --silent --output /dev/null "$HEALTHCHECK_URL")
  7. if [ "$HTTP_RESPONSE" -eq 200 ]; then
  8.     exit 0  # 健康状态
  9. else
  10.     exit 1  # 不健康状态
  11. fi
复制代码
依次启动主备keepalived
访问192.168.218.100:15675
Docker部署RabbitMQ集群

  1. docker run -d --hostname  rabbit_node1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
  2. docker run -d --hostname rabbit_node2 --name rabbitmq2 -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_node1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
  3. docker run -d --hostname rabbit_node3 --name rabbitmq3 -p 15674:15672 -p 5674:5672  --link rabbitmq1:rabbit_node1 --link rabbitmq2:rabbit_node2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
  4. 参数说明:
  5. --hostname ⾃定义Docker容器的 hostname
  6. --link 容器之间连接,link不可或缺,使得三个容器能互相通信
  7. --privileged=true 使⽤该参数,container内的root拥有真正的root权限,否则容器出现permission denied
  8. -v 宿主机和容器路径映射
  9.         参数 RABBITMQ_NODENAME,缺省 Unix*:rabbit@$HOSTNAME
  10.         参数 RABBITMQ_DEFAULT_USER=admin
  11.         参数 RABBITMQ_DEFAULT_PASS=admin
  12. Erlang Cookie 值必须相同,也就是⼀个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同, 相当于不同节点之间通讯的密钥,erlang.cookie是erlang的分布式token⽂件,集群内各个节点的erlang.cookie需要相同,才可以互相通信
  13. #节点⼀配置集群
  14. docker exec -it rabbitmq1 bash
  15. rabbitmqctl stop_app
  16. rabbitmqctl reset
  17. rabbitmqctl start_app
  18. exit
  19. #节点⼆加⼊集群,--ram是以内存⽅式加⼊,忽略该参数默认为磁盘节点
  20. docker exec -it rabbitmq2 bash
  21. rabbitmqctl stop_app
  22. rabbitmqctl reset
  23. rabbitmqctl join_cluster --ram rabbit@rabbit_node1
  24. rabbitmqctl start_app
  25. exit
  26. #节点三加⼊集群,--ram是以内存⽅式加⼊,忽略该参数默认为磁盘节点。
  27. docker exec -it rabbitmq3 bash
  28. rabbitmqctl stop_app
  29. rabbitmqctl reset
  30. rabbitmqctl join_cluster --ram rabbit@rabbit_node2
  31. rabbitmqctl start_app
  32. exit
  33. ##查看集群节点状态,配置启动了3个节点,1个磁盘节点和2个内存节点
  34. rabbitmqctl cluster_status
复制代码
访问页面192.168.218.221:15672
用户为admin 密码为默认密码guest 可以修改

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

半亩花草

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表