一 RabbitMQ下载
RabbitMQ 官网最新版下载:
RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ依靠erlang-26.2.5.2-1.el7.x86_64.rpm下载:
https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm
二 RabbitMQ安装
1 安装erlang环境
安装RabbitMQ前要先安装erlang环境,因为RabbitMQ是用erlang开辟的
实行安装指令如下:
- rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm
复制代码 实行后如下图:
验证 erlang 安装是否乐成,实行erl可以查看版本,说明安装乐成如下图:
2 安装RabbitMQ
实行安装RabbitMQ指令如下:
- rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm
复制代码 实行安装中,如下图:
留意:如果erlang环境没有安装好,或者版本与当前rabbitMQ不匹配则会报错以下错误,提示需要指定范围的依靠版本,如下图:
如果出现上图的错误,请参考上一步重新安装erlang环境即可。
安装竣事后,消息队列数据保存在哪?日记在哪?想相识更多的信息?
只需一条指令可查询当前状态信息:
- rabbitmq-diagnostics status
复制代码 实行后如下图:
从上图状态中可以看出如今没有使用任何设置文件,以可以看到以下有效的信息:
- 数据目录: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
- 日记文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log
上图信息很详细,可以说开辟者开辟这个工具非常的细心,对软件有充足相识使用也安心!
3 设置RabbitMQ(可选项)
安装好后RabbitMQ没有使用任何的设置文件(也没有默认设置文件),但会生成一个空目录位置在:/etc/rabbitmq/ ,在这里你可以按照自己的需求参考官方网站设置自己的项目,格式支持有多种,下面我这里要变更默认端口为例创建一个设置文件:
- vi /etc/rabbitmq/rabbitmq.config
复制代码 设置文件内容:
- [
- {rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},
- {rabbitmq_management, [
- {listener, [{port,59876}, {ssl, false}]}
- ]}
- ].
复制代码 通过设置设置文件实现变更:
- 客户端 51091 用于消耗或生产端连接,IP 0.0.0.0 代表绑定服务器表里网IP。
- 管理端口 59876 用于RabbitMQ的Web管理。
再次实行 rabbitmq-diagnostics status 查看新增的设置文件是否被使用,如下图:
上图可以看到刚刚创建的设置文件已被引用状态。
4 RabbitMQ 启动与关闭
RabbitMQ安装好后最终是服务状态,可以通过服务管理控制:
- #启动
- systemctl start rabbitmq-server
- #停止关闭
- systemctl stop rabbitmq-server
- #重启
- systemctl restart rabbitmq-server
- #开机启动
- systemctl enable rabbitmq-server
- #查看状态
- systemctl status rabbitmq-server
复制代码 利用如下图:
5 开启RabbitMQ的Web管理界面(可选项,强烈建议开启)
RabbitMQ的安装后自带Web管理界面,但是需要实行以下指令开启:
- rabbitmq-plugins enable rabbitmq_management
复制代码 我们寻常只需要一名管员即可,后面要增加用户或设置权限直接在Web利用即可。
新增一位 RabbitMQ的Web管理员并增加设置管理权限 ,用于管理RabbitMQ.
- #新增人员
- rabbitmqctl add_user hua abc123uuPP
- #设置权限
- rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"
- #设置为管理员
- rabbitmqctl set_user_tags hua administrator
复制代码 * 表示授予该用户对该虚拟主机上全部队列和交换机的 configure、write 和 read 权限。
- 第一个 ".*" 表示用户可以设置任意队列和交换机。
- 第二个 ".*" 表示用户可以向任意队列和交换机发送消息。
- 第三个 ".*" 表示用户可以从任意队列中消耗消息。
实行过程如图:
实行上面命令增加一个Web管理员:
- 用户名称:hua
- 密码:abc123uuPP
- 权限 :管理员
如果只在本地localhost登陆RabbitWeb管理平台,用默认的账号登陆即可:
三 RabbitMQ Web 管理
1 RabbitMQ Web 登陆
进入RabbitMQ Web 登陆页面如下:
起首我们使用默认账号密码尝试登陆,为了安全确实限制本地登陆,如下图:
使用上面新建的账号hua登陆,登陆乐成如下图:
2 用户管理
用户管理,用户增加利用简朴,如下图:
用户管理,用户权限设置利用简朴,如下图:
用户利用界面非凡人性化,可以很方便设置权限,修改用户资料。
3 虚拟主机(紧张)
虚拟主机(vhost)是 RabbitMQ 中的一种逻辑隔离机制,它相当于一个独立的命名空间。每个虚拟主机内部可以拥有自己独立的队列、交换机、绑定等资源,相互之间相互隔离,不能共享资源。
- 命名空间:每个虚拟主机都有自己的队列、交换机、绑定等资源。
- 资源隔离:不同虚拟主机之间的资源(如队列和交换机)完全隔离,防止不同应用间的资源冲突。
- 用户权限:不同的用户可以被授予不同虚拟主机的访问权限,确保用户只能访问指定的虚拟主机中的资源。
虚拟主机提供了一种隔离和权限管理的方式,适用于以了局景:
- 多租户架构:在 SaaS(软件即服务)或多租户应用中,你可以为不同的租户创建不同的虚拟主机,以确保数据隔离。
- 开辟与生产环境隔离:你可以为开辟环境和生产环境创建不同的虚拟主机,制止资源冲突和干扰。
- 权限管理:不同的用户或应用可以通过虚拟主机进行权限分离,确保只有特定用户才能访问某些资源。
默认虚拟主机
RabbitMQ 默认创建一个虚拟主机 /,这是一个特别的虚拟主机,通常用于测试或默认情况下的资源管理。生产环境中,建议创建和使用新的虚拟主机,以更好地管理资源和权限。
虚拟主机利用也非常简朴,如下图:
在用户管理界面选择用户绑定指定的虚拟主机,非常方便,如下图:
功能强大,非常好用。
四 java代码接入
方式一 java通用:
1 引入mvn依靠
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.20.0</version>
- </dependency>
复制代码 JAVA 连接RabbitMQ生产消息与接收消耗测试代码:
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DeliverCallback;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * @author hua
- * @date 2024-08-21 18:01
- */
- public class TestRabbitMQ {
- private final static String QUEUE_NAME = "hello";
- public static void main1(String[] args) {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("xx.xx.xx.xx");
- factory.setPort(51091);
- factory.setUsername("java_producer");
- factory.setPassword("java_producer");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- } catch (TimeoutException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] argv) throws Exception {
- // 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("xx.xx.xx.xx");
- factory.setPort(51091);
- factory.setUsername("java_consumer");
- factory.setPassword("java_consumer");
- // 连接到 RabbitMQ 服务器
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- // 声明队列(确保队列存在)
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- // 定义回调函数,当有消息送达时执行
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- };
- // 消费消息
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
- }
- }
- }
复制代码 测试运行发送消息,发送乐成。如下图:

测试运行接收消息,消耗乐成。如下图:

上面测试通事后,改成服务类方便生产环境使用来发送消息代码:
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- import org.apache.logging.log4j.LogManager;
- import org.apache.logging.log4j.Logger;
- import org.springframework.stereotype.Service;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * @author hua
- * @date 2024-08-22
- */
- @Service
- public class RabbitMqServiceImpl {
- private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
- private static final String QUEUE_NAME = "test";
- private Connection connection;
- private Channel channel;
- public RabbitMqServiceImpl() {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("xx.xx.xx.xx");
- factory.setPort(51091);
- factory.setUsername("java_producer");
- factory.setPassword("java_producer");
- //如果不指定虚拟机默认会使用/
- factory.setVirtualHost("test");
- try {
- this.connection = factory.newConnection();
- this.channel = connection.createChannel();
- this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- logger.info("RabbitMqServiceImpl initialized successfully.");
- } catch (IOException | TimeoutException e) {
- e.printStackTrace();
- logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());
- throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
- }
- }
- public void sendMessage(String message) {
- try {
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- } catch (IOException e) {
- e.printStackTrace();
- logger.error("Failed to send message: {}", e.getMessage());
- }
- }
- public void close() {
- try {
- if (channel != null && channel.isOpen()) {
- channel.close();
- }
- if (connection != null && connection.isOpen()) {
- connection.close();
- }
- } catch (IOException | TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
复制代码 上面的代码存在题目,未确认发送乐成,有丢失风险,再改善如下:
- import com.qyhua.common.table.db_hex_fail_log.entity.DbHexFailLog;
- import com.qyhua.common.table.db_hex_fail_log.service.impl.DbHexFailLogServiceImpl;
- import com.rabbitmq.client.*;
- import org.apache.logging.log4j.LogManager;
- import org.apache.logging.log4j.Logger;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.io.IOException;
- import java.time.LocalDateTime;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentNavigableMap;
- import java.util.concurrent.ConcurrentSkipListMap;
- import java.util.concurrent.TimeoutException;
- /**
- * @author hua
- * @date 2024-08-22
- */
- @Service
- public class RabbitMqServiceImpl {
- private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
- private static final String QUEUE_NAME = "hex_kyc";
- private Connection connection;
- private Channel channel;
- //存放所有消息,确认时删除,没确认的保存到数据库
- private ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
- @Autowired
- DbHexFailLogServiceImpl dbHexFailLogService;
- @PostConstruct
- public void init() {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("xx.xx.xx.xx");
- factory.setPort(xxx);
- factory.setUsername("java_producer");
- factory.setPassword("java_producer");
- factory.setVirtualHost("xxxx");
- factory.setConnectionTimeout(3000);
- try {
- this.connection = factory.newConnection();
- this.channel = connection.createChannel();
- this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 启用发布者确认模式
- this.channel.confirmSelect();
- setupConfirmListener();
- logger.info("RabbitMqServiceImpl initialized successfully.");
- } catch (IOException | TimeoutException e) {
- logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage(), e);
- throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
- }
- }
- public void sendMessage(String message) {
- try {
- long nextSeqNo = channel.getNextPublishSeqNo();
- outstandingConfirms.put(nextSeqNo, message);
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- logger.info(" [x] Sent '{}'", message);
- } catch (Exception e) {
- logger.error("Failed to send message: {}", e.getMessage(), e);
- saveFailedMessageToDatabase(message,"CF");
- }
- }
- // 设置接收监听器,记录未确认的消息
- private void setupConfirmListener() {
- ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
- if (multiple) {
- outstandingConfirms.headMap(deliveryTag + 1).clear();
- } else {
- outstandingConfirms.remove(deliveryTag);
- }
- System.out.println("Message confirmed ok deliveryTag="+deliveryTag);
- };
- ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
- if (multiple) {
- // 获取从起点到 `deliveryTag + 1` 之间的所有未确认的消息
- ConcurrentNavigableMap<Long, String> unconfirmedMessages = outstandingConfirms.headMap(deliveryTag + 1);
- List<String> FailList= new ArrayList<>();
- for (Map.Entry<Long, String> entry : unconfirmedMessages.entrySet()) {
- String failedMessage = entry.getValue();
- logger.error("Message not confirmed: deliveryTag={}, message={}", entry.getKey(), failedMessage);
- FailList.add(failedMessage);
- }
- saveFailedMessageToDatabaseBy(FailList); // 批量保存到数据库
- unconfirmedMessages.clear(); // 清除这些未确认的消息
- } else {
- String failedMessage = outstandingConfirms.get(deliveryTag);
- logger.error("Message not confirmed: deliveryTag={}, message={}", deliveryTag, failedMessage);
- saveFailedMessageToDatabase(failedMessage,"SF");
- outstandingConfirms.remove(deliveryTag); // 移除单条未确认的消息
- }
- };
- channel.addConfirmListener(ackCallback, nackCallback);
- }
- private void saveFailedMessageToDatabaseBy(List<String> failList) {
- List<DbHexFailLog> list=new ArrayList<>(failList.size());
- LocalDateTime now = LocalDateTime.now();
- for (String message : failList) {
- DbHexFailLog f=new DbHexFailLog();
- f.setInHexStr(message);
- f.setCtime(now);
- f.setFlag("SF");
- list.add(f);
- }
- dbHexFailLogService.saveBatch(list,list.size());
- failList.clear();
- }
- private void saveFailedMessageToDatabase(String message,String flag) {
- DbHexFailLog f=new DbHexFailLog();
- f.setInHexStr(message);
- f.setCtime(LocalDateTime.now());
- f.setFlag(flag);
- dbHexFailLogService.save(f);
- }
- @PreDestroy
- public void close() {
- try {
- if (channel != null && channel.isOpen()) {
- channel.close();
- }
- if (connection != null && connection.isOpen()) {
- connection.close();
- }
- logger.info("RabbitMqServiceImpl resources closed successfully.");
- } catch (IOException | TimeoutException e) {
- logger.error("Failed to close RabbitMqServiceImpl resources: {}", e.getMessage(), e);
- }
- }
- }
复制代码 上面的代码优化后,主要增加了三项如下:
1 Publisher Confirms 机制:
- 启用 channel.confirmSelect() 来激活发布者确认模式。
- 使用 ConfirmCallback 和 NackCallback 来处理消息的确认与未确认逻辑。
- 未确认的消息会被保存到数据库中。
2 保存失败的消息到数据库。
3 在 @PreDestroy 方法中关闭 Channel 和 Connection,确保服务烧毁时正确关闭资源。
方式二 SpringBoot框架使用
mvn依靠包:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
复制代码 spring设置文件:
- Spring:
- rabbitmq:
- host: xx.xx.xx.xx
- port: 51091
- username: java_consumer
- password: java_consumer
- virtual-host: hellow
- connection-timeout: 6000
复制代码 JAVA代码:
发送消息java代码:
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- /**
- * @author hua
- * @date 2024-08-22
- */
- @Component
- public class MessageProducer {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Autowired
- private Queue queue;
- public void sendMessage(String message) {
- rabbitTemplate.convertAndSend(queue.getName(), message);
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
复制代码 接收消息java代码:
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- /**
- * @author hua
- * @date 2024-08-22
- */
- @Component
- public class RabbitListener {
- private static final Logger logger = LogManager.getLogger(RabbitListener.class);
- @RabbitListener(queues = "test")
- public void receiveMessage(String message) {
- try {
- System.out.println("rabbit rev <- "+message);
- //具体业务
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("rabbit err= ", e);
- }
- }
- }
复制代码 上面代码在生产发送消息时通过编码方式更机动,接收直接使用注解更简朴。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |