ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ(集群干系部署) [打印本页]

作者: 大连密封材料    时间: 2024-8-30 16:19
标题: RabbitMQ(集群干系部署)
RabbitMQ

集群部署

环境预备:阿里云centos8 服务器,3台服务器,分别举行安装;
下载Erlang

Erlang和RabbitMQ版本对照:https://www.rabbitmq.com/which-erlang.html
  1. vim /etc/yum.repos.d/rabbitmq.repo
复制代码
  1. # 内容 - 来自官方文档:https://www.rabbitmq.com/docs/install-rpm
  2. # In /etc/yum.repos.d/rabbitmq.repo
  3. ##
  4. ## Zero dependency Erlang RPM
  5. ##
  6. [modern-erlang]
  7. name=modern-erlang-el8
  8. # uses a Cloudsmith mirror @ yum.novemberain.com in addition to its Cloudsmith upstream.
  9. # Unlike Cloudsmith, the mirror does not have any traffic quotas
  10. baseurl=https://yum1.novemberain.com/erlang/el/8/$basearch
  11.         https://yum2.novemberain.com/erlang/el/8/$basearch
  12.         https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/$basearch
  13. repo_gpgcheck=1
  14. enabled=1
  15. gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
  16. gpgcheck=1
  17. sslverify=1
  18. sslcacert=/etc/pki/tls/certs/ca-bundle.crt
  19. metadata_expire=300
  20. pkg_gpgcheck=1
  21. autorefresh=1
  22. type=rpm-md
  23. [modern-erlang-noarch]
  24. name=modern-erlang-el8-noarch
  25. # uses a Cloudsmith mirror @ yum.novemberain.com.
  26. # Unlike Cloudsmith, it does not have any traffic quotas
  27. baseurl=https://yum1.novemberain.com/erlang/el/8/noarch
  28.         https://yum2.novemberain.com/erlang/el/8/noarch
  29.         https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/noarch
  30. repo_gpgcheck=1
  31. enabled=1
  32. gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
  33.        https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
  34. gpgcheck=1
  35. sslverify=1
  36. sslcacert=/etc/pki/tls/certs/ca-bundle.crt
  37. metadata_expire=300
  38. pkg_gpgcheck=1
  39. autorefresh=1
  40. type=rpm-md
  41. [modern-erlang-source]
  42. name=modern-erlang-el8-source
  43. # uses a Cloudsmith mirror @ yum.novemberain.com.
  44. # Unlike Cloudsmith, it does not have any traffic quotas
  45. baseurl=https://yum1.novemberain.com/erlang/el/8/SRPMS
  46.         https://yum2.novemberain.com/erlang/el/8/SRPMS
  47.         https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/SRPMS
  48. repo_gpgcheck=1
  49. enabled=1
  50. gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
  51.        https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
  52. gpgcheck=1
  53. sslverify=1
  54. sslcacert=/etc/pki/tls/certs/ca-bundle.crt
  55. metadata_expire=300
  56. pkg_gpgcheck=1
  57. autorefresh=1
  58. ##
  59. ## RabbitMQ Server
  60. ##
  61. [rabbitmq-el8]
  62. name=rabbitmq-el8
  63. baseurl=https://yum2.novemberain.com/rabbitmq/el/8/$basearch
  64.         https://yum1.novemberain.com/rabbitmq/el/8/$basearch
  65.         https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/$basearch
  66. repo_gpgcheck=1
  67. enabled=1
  68. # Cloudsmith's repository key and RabbitMQ package signing key
  69. gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
  70.        https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
  71. gpgcheck=1
  72. sslverify=1
  73. sslcacert=/etc/pki/tls/certs/ca-bundle.crt
  74. metadata_expire=300
  75. pkg_gpgcheck=1
  76. autorefresh=1
  77. type=rpm-md
  78. [rabbitmq-el8-noarch]
  79. name=rabbitmq-el8-noarch
  80. baseurl=https://yum2.novemberain.com/rabbitmq/el/8/noarch
  81.         https://yum1.novemberain.com/rabbitmq/el/8/noarch
  82.         https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/noarch
  83. repo_gpgcheck=1
  84. enabled=1
  85. # Cloudsmith's repository key and RabbitMQ package signing key
  86. gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
  87.        https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
  88. gpgcheck=1
  89. sslverify=1
  90. sslcacert=/etc/pki/tls/certs/ca-bundle.crt
  91. metadata_expire=300
  92. pkg_gpgcheck=1
  93. autorefresh=1
  94. type=rpm-md
  95. [rabbitmq-el8-source]
  96. name=rabbitmq-el8-source
  97. baseurl=https://yum2.novemberain.com/rabbitmq/el/8/SRPMS
  98.         https://yum1.novemberain.com/rabbitmq/el/8/SRPMS
  99.         https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/SRPMS
  100. repo_gpgcheck=1
  101. enabled=1
  102. gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
  103. gpgcheck=0
  104. sslverify=1
  105. sslcacert=/etc/pki/tls/certs/ca-bundle.crt
  106. metadata_expire=300
  107. pkg_gpgcheck=1
  108. autorefresh=1
  109. type=rpm-md
复制代码
  1. # --nobest表示所需安装包即使不是最佳选择也接受
  2. yum update -y --nobest
  3. # 还需关闭防火墙
  4. systemctl stop firewalld
  5. # 开机关闭防火墙:
  6. systemctl disable firewalld
  7. # 查看防火墙状态:
  8. systemctl status firewalld
复制代码
  1. yum install -y erlang
复制代码
安装RabbitMQ

Erlang和RabbitMQ版本对照:https://www.rabbitmq.com/which-erlang.html
  1. # 导入GPG密钥,如失败多重试几次
  2. rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc'
  3. rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key'
  4. rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key'
  5. # 下载 RPM 包
  6. wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm
  7. # 安装
  8. rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpm
复制代码
举行配置

  1. # 启用管理界面插件
  2. rabbitmq-plugins enable rabbitmq_management
  3. # 启动 RabbitMQ 服务:
  4. systemctl start rabbitmq-server
  5. # 将 RabbitMQ 服务设置为开机自动启动
  6. systemctl enable rabbitmq-server
  7. # 新增登录账号密码 - 用户名 密码
  8. rabbitmqctl add_user yan 123456
  9. # 设置登录账号权限,该用户设置角色是administrator管理员。再设置权限
  10. rabbitmqctl set_user_tags yan administrator
  11. rabbitmqctl set_permissions -p / yan ".*" ".*" ".*"
  12. # 查询当前用户和角色
  13. rabbitmqctl list_users
  14. # 配置所有稳定功能 flag 启用
  15. rabbitmqctl enable_feature_flag all
  16. # 重启RabbitMQ服务生效
  17. systemctl restart rabbitmq-server
  18. # 启动rabbitmq服务器
  19. systemctl start rabbitmq-server
  20. # 查看rabbitmq服务器状态
  21. systemctl status rabbitmq-server
  22. # 停止rabbitmq服务器
  23. systemctl stop rabbitmq-server
  24. # 重启rabbitmq服务器
  25. systemctl restart rabbitmq-server
复制代码
首尾配置

  1. # 删除yum库配置文件,避免下次yum安装在执行一遍
  2. rm -rf /etc/yum.repos.d/rabbitmq.repo
  3. # 分别在不同服务器修改主机名称
  4. vim /etc/hostname
  5. # 分别起名 node01、node02、node03
  6. # 例子:机器1,该文件/etc/hostname 下 填写node1
  7. #      机器2,该文件/etc/hostname 下 填写node2
  8. #      机器3,该文件/etc/hostname 下 填写node3
复制代码
集群配置

机器1配置:
机器2配置:
机器3配置:
负载均衡

一般MQ的服务端口5672、UI管理界面端口15672
浏览器访问,某个端口,通过HAProxy工具,举行分流每个节点的15672端口上,举行管理UI上的负载均衡;
服务端访问,某个端口,通过HAProxy工具,举行分流每个节点的5672端口上,举行服务端的负载均衡;
安装HAProxy

  1. yum install -y haproxy
  2. # 查看安装的版本
  3. haproxy -v
  4. # 启动该程序
  5. systemctl start haproxy
  6. # 设置该服务开机自动启动
  7. systemctl enable haproxy
  8. # 查看状态
  9. systemctl status haproxy
  10. # 停止启动
  11. systemctl stop haproxy
  12. # 重启
  13. systemctl restart haproxy
  14. # 查看当前的线程
  15. ps -ef|grep haproxy
复制代码
修改配置文件

配置成功
  1. # 访问MQ管理UI页面:
  2. http://192.168.xxx.xxx:22222
  3. # 生产者、消费者服务连接MQ
  4.   rabbitmq:
  5.     host: 192.168.xxx.xxx
  6.     port: 11111
  7.     username: guest
  8.     password: 123456
  9. # 可通过生产者发送消息,监听消费者是否接受到消息
复制代码
仲裁队列

仲裁队列是3.8版本以后才有的新功能,是主从模式,支持主从数据同步,在添加队列,队列类型为Quorum就是仲裁队列。
创建交换机
与之前创建平凡交换机同等
创建仲裁队列

绑定交换机
与之前的同等
验证,停止某个节点
  1. # 停止rabbit应用
  2. rabbitmqctl stop_app
  3. # 我们在某个节点进行停止MQ,然后生产者发送消息,查看消费者是否可以接受到,判断集群的数据一致性
  4. # 默认,主节点挂掉,默认重新选举新的主节点
复制代码
流式队列

流式队列是3.9版本以后才有的新功能,每个消息都分配一个偏移量,该消息被消耗掉也不会被删除,可以重复消耗;类似Kafka,但没有到达Kafka的效果。
前提预备 - 安装干系插件
  1. # 启用Stream插件
  2. rabbitmq-plugins enable rabbitmq_stream
  3. # 重启rabbit应用
  4. rabbitmqctl stop_app
  5. rabbitmqctl start_app
  6. # 查看插件状态
  7. rabbitmq-plugins list
复制代码
负载均衡配置
  1. frontend rabbitmq_stream_frontend
  2. bind 192.168.xxx.100:33333
  3. mode tcp
  4. default_backend rabbitmq_stream_backend
  5. backend rabbitmq_stream_backend
  6. mode tcp
  7. balance roundrobin
  8. # 它的端口是5552
  9. server rabbitmq1 192.168.xxx.1:5552 check
  10. server rabbitmq2 192.168.xxx.1:5552 check
  11. server rabbitmq3 192.168.xxx.2:5552 check
复制代码
JAVA干系代码
  1. // 引包
  2. <dependencies>
  3.     <dependency>
  4.         <groupId>com.rabbitmq</groupId>
  5.         <artifactId>stream-client</artifactId>
  6.         <version>0.15.0</version>
  7.     </dependency>
  8.     <dependency>
  9.         <groupId>org.slf4j</groupId>
  10.         <artifactId>slf4j-api</artifactId>
  11.         <version>1.7.30</version>
  12.     </dependency>
  13.     <dependency>
  14.         <groupId>ch.qos.logback</groupId>
  15.         <artifactId>logback-classic</artifactId>
  16.         <version>1.2.3</version>
  17.     </dependency>
  18. </dependencies>
复制代码
创建Stream,不必要创建交换机
  1. // 代码创建
  2. Environment environment = Environment.builder()
  3.         .host("192.168.xxx.1")
  4.         .port(33333)
  5.         .username("用户名")
  6.         .password("密码")
  7.         .build();
  8. environment.streamCreator().stream("stream.atguigu.test2").create();
  9. environment.close();
复制代码
通过管理UI页面创建的话,队列类型选择:Stream
生产者
  1. Environment environment = Environment.builder()
  2.         .host("192.168.xxx.1")
  3.         .port(33333)
  4.         .username("用户名")
  5.         .password("密码")
  6.         .build();
  7. Producer producer = environment.producerBuilder()
  8.         // 队列名称
  9.         .stream("stream.test")
  10.         .build();
  11. // 消息内容
  12. byte[] messagePayload = "hello rabbit stream".getBytes(StandardCharsets.UTF_8);
  13. CountDownLatch countDownLatch = new CountDownLatch(1);
  14. producer.send(
  15.         producer.messageBuilder().addData(messagePayload).build(),
  16.         confirmationStatus -> {
  17.             if (confirmationStatus.isConfirmed()) {
  18.                 System.out.println("[生产者端]the message made it to the broker");
  19.             } else {
  20.                 System.out.println("[生产者端]the message did not make it to the broker");
  21.             }
  22.             countDownLatch.countDown();
  23.         });
  24. countDownLatch.await();
  25. producer.close();
  26. environment.close();
复制代码
消耗者
  1. Environment environment = Environment.builder()
  2.         .host("192.168.xxx.1")
  3.         .port(33333)
  4.         .username("用户名")
  5.         .password("密码")
  6.         .build();
  7. environment.consumerBuilder()
  8.             // 队列名称
  9.         .stream("stream.test")
  10.             // 这个是消费端起名字
  11.         .name("stream.test.consumer")
  12.         .autoTrackingStrategy()
  13.         .builder()
  14.         .messageHandler((offset, message) -> {
  15.             // 接受消息
  16.             byte[] bodyAsBinary = message.getBodyAsBinary();
  17.             String messageContent = new String(bodyAsBinary);
  18.             System.out.println("[消费者端]messageContent = " + messageContent + " Offset=" + offset.offset());
  19.         })
  20.         .build();
复制代码
指定偏移量消耗
  1. // 指定Offset消费
  2. Environment environment = Environment.builder()
  3.         .host("192.168.xxx.1")
  4.         .port(33333)
  5.         .username("用户名")
  6.         .password("密码")
  7.         .build();
  8. CountDownLatch countDownLatch = new CountDownLatch(1);
  9. Consumer consumer = environment.consumerBuilder()
  10.         .stream("stream.test")
  11.         .offset(OffsetSpecification.first())
  12.         .messageHandler((offset, message) -> {
  13.             byte[] bodyAsBinary = message.getBodyAsBinary();
  14.             String messageContent = new String(bodyAsBinary);
  15.             System.out.println("[消费者端]messageContent = " + messageContent);
  16.             countDownLatch.countDown();
  17.         })
  18.         .build();
  19. countDownLatch.await();
  20. consumer.close();
复制代码
Federation插件

它是MQ在差别的Broker节点之间举行消息通报而无须建立集群,可以实现跨集群的数据同步。
Federation交换机

Federation队列

Federation队列和Federation交换机的最焦点区别就是:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4