ToB企服应用市场:ToB评测及商务社交产业平台

标题: 消息队列RabbitMQ [打印本页]

作者: 半亩花草    时间: 2024-11-7 12:47
标题: 消息队列RabbitMQ
消息队列RabbitMQ

消息队列是一种通信机制,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,用于在分布式体系中实现不同应用程序或服务之间的异步消息传递。它通过在发送者和接收者之间提供一个暂时存储的“队列”,帮助解耦服务之间的直接依赖,从而提升体系的可扩展性和可靠性。以下是消息队列的一些关键特性和工作原理:
关键特性

工作原理


Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按肯定的规则将消息路由转发到某个队列,对消息进行过虑。exchange有下面四种(先了解:fanout,direct,topics,header)
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程
1.发送消息
1、生产者和Broker创建TCP毗连。
2、生产者和Broker创建通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
2.消息接收消息
1、消费者和Broker创建TCP毗连
2、消费者和Broker创建通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
应用场景


常见的消息队列体系


启用管理插件

RabbitMQ 提供了一个 web 管理界面,方便管理和监控 RabbitMQ 实例。可以通过以下命令启用该插件:
  1. sudo rabbitmq-plugins enable rabbitmq_management
复制代码
启用后,管理界面通常会在 http://localhost:15672 访问,默认用户名和密码均为 guest。
设置用户权限

在生产情况中,建议创建一个新的用户并分配相应权限。以下是创建新用户的步骤:
1.添加用户
  1. sudo  rabbitmqctl  add_user  用户  密码
复制代码
2.设置用户权限
  1. sudo  rabbitmqctl  set_user_tags 用户  角色
复制代码
脚色范例:
administrator

monitoring

policy

lifecycle

management

3.赋予用户特定主的权限
  1. sudo rabbitmqctl set_permissions -p / 用户 ".*" ".*" ".*"
复制代码
".*" 的寄义
在命令中使用的 "*" 和 ".*" 是正则表达式的语法,详细寄义如下:

因此,".*" 表示匹配任意字符串,包括空字符串。这意味着用户 newuser 对于指定虚拟主机 / 的以下权限:
交换机和对列设置
1.创建交换机
交换机是Rabbitmq 的紧张概念,用于接受消息并将其路由到一个或多个队列。可以通过管理界面或命令
通过命令情势创建交换机:
  1. sudo rabbitmqctl add_exchange my_exchange direct
复制代码
这里,my_exchange 是交换机的名称,direct 是交换机的范例。
2.创建队列
队列用于存储消息,可以使用下面的命令创建队列
通过命令行创建队列
  1. sudo  rabbitmqctl  add_queue  my_queue
复制代码
3.绑定交换机和队列
一旦创建了交换机和队列,就可以将他们绑定在一起:
  1. sudo rabbitmqctl bind_queue my_queue my_exchange routing_key
复制代码
这里,routing_key 是用于路由消息的关键字
设置文件

通常在/etc/rabbitmq/rabbitmq.conf
使用 RabbitMQ 的客户端

RabbitMQ 提供了多种语言的客户端库,包括 Python、Java、Node.js 等。以下是一个使用 Python 的示例:
安装 Pika 库(Python 的 RabbitMQ 客户端)
  1. pip install pika
复制代码
  1. 发送消息
  2. 创建一个 Python 文件,例如 send.py,然后输入以下代码:
  3. import pika
  4. # 连接到 RabbitMQ 服务器
  5. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  6. channel = connection.channel()
  7. # 声明队列
  8. channel.queue_declare(queue='my_queue')
  9. # 发送消息
  10. channel.basic_publish(exchange='',
  11.                       routing_key='my_queue',
  12.                       body='Hello, RabbitMQ!')
  13. print(" [x] Sent 'Hello, RabbitMQ!'")
  14. # 关闭连接
  15. connection.close()
  16. 接收消息
  17. 创建另一个 Python 文件,例如 receive.py,然后输入以下代码:
  18. import pika
  19. # 连接到 RabbitMQ 服务器
  20. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  21. channel = connection.channel()
  22. # 声明队列
  23. channel.queue_declare(queue='my_queue')
  24. # 定义消息处理回调函数
  25. def callback(ch, method, properties, body):
  26.     print(f" [x] Received {body.decode()}")
  27. # 设置消费队列
  28. channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True)
  29. print(' [*] Waiting for messages. To exit press CTRL+C')
  30. # 开始消费消息
  31. channel.start_consuming()
复制代码
运行代码
先运行接受信息脚本
  1. python receive.py
复制代码
其次运行发送信息脚本
  1. python send.py
复制代码
RabbitMQ集群部署

主机域名192.168.218.221node1192.168.218.220node2192.168.218.227node3 编辑每台/etc/hosts文件
  1. 192.168.218.221  node1
  2. 192.168.218.220  node2
  3. 192.168.218.227  node3
复制代码
编辑/etc/rabbitmq/rabbitmq-env.conf
  1. 192.168.218.221上
  2. NODENAME=rabbit@node1
  3. 192.168.218.220上
  4. NODENAME=rabbit@node2
  5. 192.168.218.227上
  6. NODENAME=rabbit@node3
  7. 查看文件,确保每台机器都一样
  8. cat /var/lib/rabbitmq/.erlang.cookie
  9. 重新启动
  10. systemctl restart rabbitmq-server
复制代码
在node1上执行
  1. rabbitmqctl stop_app
  2. rabbitmqctl reset  # 如果此节点以前有集群信息,则重置
  3. rabbitmqctl start_app
复制代码
然后在node2上
  1. rabbitmqctl stop_app
  2. rabbitmqctl join_cluster rabbit@node1
  3. rabbitmqctl join_cluster rabbit@node3
复制代码
查抄集群状态
  1. rabbitmqctl cluster_status
复制代码
创建用户密码都相同
访问任意一台服务
192.168.218.221:15672

keepalive+nginx+RabbitMQ实现负载均衡高可用

设置nginx反向代理负载均衡
  1. vim /etc/nginx/conf.d/rabbitmq.conf
  2. upstream rabbitmqserver {
  3.     server 192.168.218.221:15672;
  4.     server 192.168.218.220:15672;
  5.     server 192.168.218.227:15672;
  6. }
  7. server {
  8.     listen 15675;
  9.     server_name 192.168.218.100;
  10.     location / {
  11.     proxy_pass http://rabbitmqserver;
  12.     index  index.html  index.htm;
  13.     }
  14. }
  15. scp -r /etc/nginx/conf.d/rabbitmq.conf   192.168.218.220:/etc/nginx/conf.d/rabbitmq.conf
复制代码
设置keepalived
  1. 主keepalived在192.168.218.221上
  2. vim  /etc/keepalived/keepalived.conf
  3. ! Configuration File for keepalived
  4.   
  5. global_defs {
  6.     router_id LVS01
  7. }
  8. vrrp_script chk_nginx {
  9.     script "/etc/keepalived/nginx_check.sh"
  10.     interval 1
  11.     weight -30
  12.     fall 3
  13.     rise 2
  14.     timeout 2
  15. }
  16. vrrp_instance VI_MYSQL {
  17.     state MASTER
  18.     interface ens33
  19.     virtual_router_id 100
  20.     priority 100
  21.     advert_int 1
  22.     authentication {
  23.         auth_type PASS
  24.         auth_pass 1111
  25.     }
  26.     virtual_ipaddress {
  27.         192.168.218.100
  28.     }
  29.     track_script {
  30.         chk_nginx
  31.     }
  32. }
  33. 备在192.168.218.220上
  34. vim  /etc/keepalived/keepalived.conf
  35. ! Configuration File for keepalived
  36. global_defs {
  37.     router_id LVS02
  38. }
  39. vrrp_script chk_nginx {
  40.     script "/etc/keepalived/nginx_check.sh"
  41.     interval 2
  42.     fall 3
  43.     weight -30
  44.     rise 2
  45.     timeout 2
  46. }
  47. vrrp_instance VI_1 {
  48.     state BACKUP
  49.     interface ens33
  50.     virtual_router_id 100
  51.     priority 99
  52.     advert_int 1
  53.     authentication {
  54.         auth_type PASS
  55.         auth_pass 1111
  56.     }
  57.     virtual_ipaddress {
  58.         192.168.218.100
  59.     }
  60.     track_script {
  61.         chk_nginx
  62.     }
  63. }
复制代码
编辑nginx健康检测脚本(221和220)
  1. cat /etc/keepalived/nginx_check.sh
  2. #!/bin/bash
  3. # 健康检查 URL
  4. HEALTHCHECK_URL="http://192.168.218.221/healthcheck"
  5. # 使用 curl 检查健康状态
  6. HTTP_RESPONSE=$(curl --write-out "%{http_code}" --silent --output /dev/null "$HEALTHCHECK_URL")
  7. if [ "$HTTP_RESPONSE" -eq 200 ]; then
  8.     exit 0  # 健康状态
  9. else
  10.     exit 1  # 不健康状态
  11. fi
复制代码
依次启动主备keepalived
访问192.168.218.100:15675
Docker部署RabbitMQ集群

  1. docker run -d --hostname  rabbit_node1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
  2. docker run -d --hostname rabbit_node2 --name rabbitmq2 -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_node1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
  3. docker run -d --hostname rabbit_node3 --name rabbitmq3 -p 15674:15672 -p 5674:5672  --link rabbitmq1:rabbit_node1 --link rabbitmq2:rabbit_node2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management
  4. 参数说明:
  5. --hostname ⾃定义Docker容器的 hostname
  6. --link 容器之间连接,link不可或缺,使得三个容器能互相通信
  7. --privileged=true 使⽤该参数,container内的root拥有真正的root权限,否则容器出现permission denied
  8. -v 宿主机和容器路径映射
  9.         参数 RABBITMQ_NODENAME,缺省 Unix*:rabbit@$HOSTNAME
  10.         参数 RABBITMQ_DEFAULT_USER=admin
  11.         参数 RABBITMQ_DEFAULT_PASS=admin
  12. Erlang Cookie 值必须相同,也就是⼀个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同, 相当于不同节点之间通讯的密钥,erlang.cookie是erlang的分布式token⽂件,集群内各个节点的erlang.cookie需要相同,才可以互相通信
  13. #节点⼀配置集群
  14. docker exec -it rabbitmq1 bash
  15. rabbitmqctl stop_app
  16. rabbitmqctl reset
  17. rabbitmqctl start_app
  18. exit
  19. #节点⼆加⼊集群,--ram是以内存⽅式加⼊,忽略该参数默认为磁盘节点
  20. docker exec -it rabbitmq2 bash
  21. rabbitmqctl stop_app
  22. rabbitmqctl reset
  23. rabbitmqctl join_cluster --ram rabbit@rabbit_node1
  24. rabbitmqctl start_app
  25. exit
  26. #节点三加⼊集群,--ram是以内存⽅式加⼊,忽略该参数默认为磁盘节点。
  27. docker exec -it rabbitmq3 bash
  28. rabbitmqctl stop_app
  29. rabbitmqctl reset
  30. rabbitmqctl join_cluster --ram rabbit@rabbit_node2
  31. rabbitmqctl start_app
  32. exit
  33. ##查看集群节点状态,配置启动了3个节点,1个磁盘节点和2个内存节点
  34. rabbitmqctl cluster_status
复制代码
访问页面192.168.218.221:15672
用户为admin 密码为默认密码guest 可以修改

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4