RabbitMQ
集群部署
环境预备:阿里云centos8 服务器,3台服务器,分别举行安装;
下载Erlang
Erlang和RabbitMQ版本对照:https://www.rabbitmq.com/which-erlang.html
- vim /etc/yum.repos.d/rabbitmq.repo
复制代码- # --nobest表示所需安装包即使不是最佳选择也接受
- yum update -y --nobest
- # 还需关闭防火墙
- systemctl stop firewalld
- # 开机关闭防火墙:
- systemctl disable firewalld
- # 查看防火墙状态:
- systemctl status firewalld
复制代码 安装RabbitMQ
Erlang和RabbitMQ版本对照:https://www.rabbitmq.com/which-erlang.html
- # 导入GPG密钥,如失败多重试几次
- rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc'
- rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key'
- rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key'
- # 下载 RPM 包
- wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm
- # 安装
- rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpm
复制代码 举行配置
- # 启用管理界面插件
- rabbitmq-plugins enable rabbitmq_management
- # 启动 RabbitMQ 服务:
- systemctl start rabbitmq-server
- # 将 RabbitMQ 服务设置为开机自动启动
- systemctl enable rabbitmq-server
- # 新增登录账号密码 - 用户名 密码
- rabbitmqctl add_user yan 123456
- # 设置登录账号权限,该用户设置角色是administrator管理员。再设置权限
- rabbitmqctl set_user_tags yan administrator
- rabbitmqctl set_permissions -p / yan ".*" ".*" ".*"
- # 查询当前用户和角色
- rabbitmqctl list_users
- # 配置所有稳定功能 flag 启用
- rabbitmqctl enable_feature_flag all
- # 重启RabbitMQ服务生效
- systemctl restart rabbitmq-server
- # 启动rabbitmq服务器
- systemctl start rabbitmq-server
- # 查看rabbitmq服务器状态
- systemctl status rabbitmq-server
- # 停止rabbitmq服务器
- systemctl stop rabbitmq-server
- # 重启rabbitmq服务器
- systemctl restart rabbitmq-server
复制代码 首尾配置
- # 删除yum库配置文件,避免下次yum安装在执行一遍
- rm -rf /etc/yum.repos.d/rabbitmq.repo
- # 分别在不同服务器修改主机名称
- vim /etc/hostname
- # 分别起名 node01、node02、node03
- # 例子:机器1,该文件/etc/hostname 下 填写node1
- # 机器2,该文件/etc/hostname 下 填写node2
- # 机器3,该文件/etc/hostname 下 填写node3
复制代码 集群配置
机器1配置:
- 分别在3台机器设置IP 所在到对应主机名称的映射
修改文件/etc/hosts,追加如下内容:
- ip1 node01
- ip2 node02
- ip3 node03
复制代码 - 检察该MQ的Cookie值并记录
- cat /var/lib/rabbitmq/.erlang.cookie
- # 并记录该值,后续机器2,机器3上的cookie值保持一致
- # 在node01节点运行,把该文件复制到其他机器上
- scp /var/lib/rabbitmq/.erlang.cookie root@node02:/var/lib/rabbitmq/.erlang.cookie
- scp /var/lib/rabbitmq/.erlang.cookie root@node03:/var/lib/rabbitmq/.erlang.cookie
- # 需要将rabbitmq分别重启
- systemctl stop rabbitmq-server
- systemctl start rabbitmq-server
复制代码 - 重置节点应用
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl start_app
复制代码 机器2配置:
- 分别在3台机器设置IP 所在到对应主机名称的映射
修改文件/etc/hosts,追加如下内容:
- ip1 node01
- ip2 node02
- ip3 node03
复制代码 - 检察该MQ的Cookie值并记录
- cat /var/lib/rabbitmq/.erlang.cookie
- # 把机器1上的cookie值复制到这里,与机器1保持一致
复制代码 - 重置节点应用,并加入集群
- rabbitmqctl stop_app
- rabbitmqctl reset
- # 加入集群,去node01机器上找rabbit的程序,进行加入
- rabbitmqctl join_cluster rabbit@node01
- rabbitmqctl start_app
复制代码 机器3配置:
- 分别在3台机器设置IP 所在到对应主机名称的映射
修改文件/etc/hosts,追加如下内容:
- ip1 node01
- ip2 node02
- ip3 node03
复制代码 - 检察该MQ的Cookie值并记录
- cat /var/lib/rabbitmq/.erlang.cookie
- # 把机器1上的cookie值复制到这里,与机器1保持一致
复制代码 - 重置节点应用,并加入集群
- rabbitmqctl stop_app
- rabbitmqctl reset
- # 加入集群,去node01机器上找rabbit的程序,进行加入
- rabbitmqctl join_cluster rabbit@node01
- rabbitmqctl start_app
复制代码 - 检察集群状态
- rabbitmqctl cluster_status
- # 也可以在图形化界面上,查看集群节点,在Nodes这里查看集群节点
复制代码 负载均衡
一般MQ的服务端口5672、UI管理界面端口15672
浏览器访问,某个端口,通过HAProxy工具,举行分流每个节点的15672端口上,举行管理UI上的负载均衡;
服务端访问,某个端口,通过HAProxy工具,举行分流每个节点的5672端口上,举行服务端的负载均衡;
安装HAProxy
- yum install -y haproxy
- # 查看安装的版本
- haproxy -v
- # 启动该程序
- systemctl start haproxy
- # 设置该服务开机自动启动
- systemctl enable haproxy
- # 查看状态
- systemctl status haproxy
- # 停止启动
- systemctl stop haproxy
- # 重启
- systemctl restart haproxy
- # 查看当前的线程
- ps -ef|grep haproxy
复制代码 修改配置文件
- 修改配置文件
- vim /etc/haproxy/haproxy.cfg
复制代码 文件内容:
- # 在最后一行添加该内容,该部分配置UI管理器上的负载均衡
- # 起MQ的UI的名字(rabbitmq_ui_frontend),前端部分配置
- frontend rabbitmq_ui_frontend
- # 绑定一个地址,端口号,外部浏览器访问该IP地址和端口后,然后通过HAProxy进行转发
- bind 192.168.xxx.xxx:22222
- # http协议
- mode http
- # 默认的后端服务名,使用下面的名字
- default_backend rabbitmq_ui_backend
- # 起MQ的服务端的名字(rabbitmq_ui_backend)
- backend rabbitmq_ui_backend
- # http协议
- mode http
- # 启用轮询的负载均衡
- balance roundrobin
- option httpchk GET /
- # 分别设置,不同的MQ服务节点的IP地址
- server rabbitmq_ui1 192.168.xxx.1:15672 check
- server rabbitmq_ui2 192.168.xxx.2:15672 check
- server rabbitmq_ui3 192.168.xxx.3:15672 check
- # 配置服务端(生产者、消费者)的负载均衡
- # 服务端(生产者、消费者)的名字
- frontend rabbitmq_frontend
- # 绑定一个地址,端口号,服务端(生产者、消费者)访问该IP地址和端口后,然后通过HAProxy进行转发
- bind 192.168.xxx.xxx:11111
- # tcp协议
- mode tcp
- default_backend rabbitmq_backend
- # 与上面的名字保持一致
- backend rabbitmq_backend
- # tcp协议
- mode tcp
- # 启用轮询的负载均衡
- balance roundrobin
- # 分别设置,不同的MQ服务节点的IP地址
- server rabbitmq1 192.168.xxx.1:5672 check
- server rabbitmq2 192.168.xxx.2:5672 check
- server rabbitmq3 192.168.xxx.3:5672 check
复制代码 - 设置SELinux策略,答应HAProxy拥有权限连接任意端口:
- # 保存上面的配置文件,执行下面
- setsebool -P haproxy_connect_any=1
- # 绑定非本机的IP需要在sysctl.conf文件中配置:
- vi /etc/sysctl.conf
- # 添加
- net.ipv4.ip_nonlocal_bind=1
- # 保存退出,执行
- sysctl -p
复制代码 SELinux是Linux系统中的安全模块,它可以限定进程的权限以进步系统的安全性。在某些环境下,SELinux可能会阻止HAProxy绑定指定的端口,这就必要通过设置域(domain)的安全策略来解决此问题。通过执行setsebool -P haproxy_connect_any=1命令,您已经为HAProxy设置了一个布尔值,答应HAProxy连接到任意端口。如许,HAProxy就可以成功绑定指定的socket,并正常工作。
- 重启HAProxy
- # 测试配置文件的正确性
- haproxy -c -f /etc/haproxy/haproxy.cfg
- # 进行重启
- systemctl restart haproxy
复制代码 配置成功
- # 访问MQ管理UI页面:
- http://192.168.xxx.xxx:22222
- # 生产者、消费者服务连接MQ
- rabbitmq:
- host: 192.168.xxx.xxx
- port: 11111
- username: guest
- password: 123456
- # 可通过生产者发送消息,监听消费者是否接受到消息
复制代码 仲裁队列
仲裁队列是3.8版本以后才有的新功能,是主从模式,支持主从数据同步,在添加队列,队列类型为Quorum就是仲裁队列。
创建交换机
与之前创建平凡交换机同等
创建仲裁队列
绑定交换机
与之前的同等
验证,停止某个节点
- # 停止rabbit应用
- rabbitmqctl stop_app
- # 我们在某个节点进行停止MQ,然后生产者发送消息,查看消费者是否可以接受到,判断集群的数据一致性
- # 默认,主节点挂掉,默认重新选举新的主节点
复制代码 流式队列
流式队列是3.9版本以后才有的新功能,每个消息都分配一个偏移量,该消息被消耗掉也不会被删除,可以重复消耗;类似Kafka,但没有到达Kafka的效果。
前提预备 - 安装干系插件
- # 启用Stream插件
- rabbitmq-plugins enable rabbitmq_stream
- # 重启rabbit应用
- rabbitmqctl stop_app
- rabbitmqctl start_app
- # 查看插件状态
- rabbitmq-plugins list
复制代码 负载均衡配置
- frontend rabbitmq_stream_frontend
- bind 192.168.xxx.100:33333
- mode tcp
- default_backend rabbitmq_stream_backend
- backend rabbitmq_stream_backend
- mode tcp
- balance roundrobin
- # 它的端口是5552
- server rabbitmq1 192.168.xxx.1:5552 check
- server rabbitmq2 192.168.xxx.1:5552 check
- server rabbitmq3 192.168.xxx.2:5552 check
复制代码 JAVA干系代码
- // 引包
- <dependencies>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>stream-client</artifactId>
- <version>0.15.0</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.30</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.2.3</version>
- </dependency>
- </dependencies>
复制代码 创建Stream,不必要创建交换机
- // 代码创建
- Environment environment = Environment.builder()
- .host("192.168.xxx.1")
- .port(33333)
- .username("用户名")
- .password("密码")
- .build();
- environment.streamCreator().stream("stream.atguigu.test2").create();
- environment.close();
复制代码 通过管理UI页面创建的话,队列类型选择:Stream
生产者
- Environment environment = Environment.builder()
- .host("192.168.xxx.1")
- .port(33333)
- .username("用户名")
- .password("密码")
- .build();
- Producer producer = environment.producerBuilder()
- // 队列名称
- .stream("stream.test")
- .build();
- // 消息内容
- byte[] messagePayload = "hello rabbit stream".getBytes(StandardCharsets.UTF_8);
- CountDownLatch countDownLatch = new CountDownLatch(1);
- producer.send(
- producer.messageBuilder().addData(messagePayload).build(),
- confirmationStatus -> {
- if (confirmationStatus.isConfirmed()) {
- System.out.println("[生产者端]the message made it to the broker");
- } else {
- System.out.println("[生产者端]the message did not make it to the broker");
- }
- countDownLatch.countDown();
- });
- countDownLatch.await();
- producer.close();
- environment.close();
复制代码 消耗者
- Environment environment = Environment.builder()
- .host("192.168.xxx.1")
- .port(33333)
- .username("用户名")
- .password("密码")
- .build();
- environment.consumerBuilder()
- // 队列名称
- .stream("stream.test")
- // 这个是消费端起名字
- .name("stream.test.consumer")
- .autoTrackingStrategy()
- .builder()
- .messageHandler((offset, message) -> {
- // 接受消息
- byte[] bodyAsBinary = message.getBodyAsBinary();
- String messageContent = new String(bodyAsBinary);
- System.out.println("[消费者端]messageContent = " + messageContent + " Offset=" + offset.offset());
- })
- .build();
复制代码 指定偏移量消耗
- // 指定Offset消费
- Environment environment = Environment.builder()
- .host("192.168.xxx.1")
- .port(33333)
- .username("用户名")
- .password("密码")
- .build();
- CountDownLatch countDownLatch = new CountDownLatch(1);
- Consumer consumer = environment.consumerBuilder()
- .stream("stream.test")
- .offset(OffsetSpecification.first())
- .messageHandler((offset, message) -> {
- byte[] bodyAsBinary = message.getBodyAsBinary();
- String messageContent = new String(bodyAsBinary);
- System.out.println("[消费者端]messageContent = " + messageContent);
- countDownLatch.countDown();
- })
- .build();
- countDownLatch.await();
- consumer.close();
复制代码 Federation插件
它是MQ在差别的Broker节点之间举行消息通报而无须建立集群,可以实现跨集群的数据同步。
Federation交换机
- 前提预备:启用该插件、分为上游和下游,创建2个独立的MQ服务;
- # 使用docker进行安装MQ1服务
- docker run -d \
- --name rabbitmq-shenzhen \
- -p 51000:5672 \
- -p 52000:15672 \
- -v rabbitmq-plugin:/plugins \
- -e RABBITMQ_DEFAULT_USER=guest \
- -e RABBITMQ_DEFAULT_PASS=123456 \
- rabbitmq:3.13-management
- # 使用docker进行安装MQ2服务
- docker run -d \
- --name rabbitmq-shanghai \
- -p 61000:5672 \
- -p 62000:15672 \
- -v rabbitmq-plugin:/plugins \
- -e RABBITMQ_DEFAULT_USER=guest \
- -e RABBITMQ_DEFAULT_PASS=123456 \
- rabbitmq:3.13-management
复制代码 - 启用插件
- # 分别在2台MQ服务,进行安装该插件
- rabbitmq-plugins enable rabbitmq_federation
- rabbitmq-plugins enable rabbitmq_federation_management
复制代码 安装完插件,在右侧菜单栏可看到
- 添加上游连接端点
在下游的MQ服务,举行操纵,点击右侧Federation Upstreams,举行添加,必要起一个上游的连接点名字,(federation-upstreams),URL填写:amqp://用户名:暗码@访问ip:端口 (是上游的可访问的所在)
- 创建控制策略
举行配置,在下游的MQ服务,举行操纵
- 验证
- # 需要满足:
- # 普通交换机和联邦交换机名称要一致
- # 交换机名称要能够和策略正则表达式匹配上
- # 发送消息时,两边使用的路由键也要一致
- # 队列名称不要求一致
复制代码
生产者(上游)发送消息、消息会被上游(MQ)的队列接收到,然后下游(MQ)的队列也会担当到消息;
满足差别集群举行数据同步;
Federation队列
Federation队列和Federation交换机的最焦点区别就是:
- Federation Police作用在交换机上,就是Federation交换机
- Federation Police作用在队列上,就是Federation队列
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |