RabbitMQ 最新版 安装,设置,java接入使用(详细教程) ...

悠扬随风  金牌会员 | 2024-9-12 19:48:56 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 867|帖子 867|积分 2601

一 RabbitMQ下载

RabbitMQ 官网最新版下载:
RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ依靠erlang-26.2.5.2-1.el7.x86_64.rpm下载:
https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm

二 RabbitMQ安装

   1 安装erlang环境

  安装RabbitMQ前要先安装erlang环境,因为RabbitMQ是用erlang开辟的
  实行安装指令如下:
  1. rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm
复制代码
   实行后如下图:

  验证 erlang 安装是否乐成,实行erl可以查看版本,说明安装乐成如下图:
 



2 安装RabbitMQ

  实行安装RabbitMQ指令如下:
  1. rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm
复制代码
  实行安装中,如下图: 

 留意:如果erlang环境没有安装好,或者版本与当前rabbitMQ不匹配则会报错以下错误,提示需要指定范围的依靠版本,如下图: 

如果出现上图的错误,请参考上一步重新安装erlang环境即可。
 安装竣事后,消息队列数据保存在哪?日记在哪?想相识更多的信息?
只需一条指令可查询当前状态信息:
  1. rabbitmq-diagnostics status
复制代码
实行后如下图:


从上图状态中可以看出如今没有使用任何设置文件,以可以看到以下有效的信息:


  • 数据目录: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
  • 日记文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log
 上图信息很详细,可以说开辟者开辟这个工具非常的细心,对软件有充足相识使用也安心!


3 设置RabbitMQ(可选项)

  安装好后RabbitMQ没有使用任何的设置文件(也没有默认设置文件),但会生成一个空目录位置在:/etc/rabbitmq/ ,在这里你可以按照自己的需求参考官方网站设置自己的项目,格式支持有多种,下面我这里要变更默认端口为例创建一个设置文件:
  1. vi /etc/rabbitmq/rabbitmq.config
复制代码
设置文件内容:
  1. [
  2.   {rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},
  3.   {rabbitmq_management, [
  4.   {listener, [{port,59876}, {ssl, false}]}
  5.   ]}
  6. ].
复制代码
  通过设置设置文件实现变更:

  •   客户端 51091 用于消耗或生产端连接,IP 0.0.0.0 代表绑定服务器表里网IP。
  •   管理端口 59876 用于RabbitMQ的Web管理。
再次实行 rabbitmq-diagnostics status 查看新增的设置文件是否被使用,如下图:

 上图可以看到刚刚创建的设置文件已被引用状态。


4 RabbitMQ 启动与关闭

   RabbitMQ安装好后最终是服务状态,可以通过服务管理控制:
  1. #启动
  2. systemctl start rabbitmq-server
  3. #停止关闭
  4. systemctl stop rabbitmq-server
  5. #重启
  6. systemctl restart rabbitmq-server
  7. #开机启动
  8. systemctl enable rabbitmq-server
  9. #查看状态
  10. systemctl status rabbitmq-server
复制代码
  利用如下图:



5 开启RabbitMQ的Web管理界面(可选项,强烈建议开启)

RabbitMQ的安装后自带Web管理界面,但是需要实行以下指令开启:
  1. rabbitmq-plugins enable rabbitmq_management
复制代码
 我们寻常只需要一名管员即可,后面要增加用户或设置权限直接在Web利用即可。
   新增一位 RabbitMQ的Web管理员并增加设置管理权限 ,用于管理RabbitMQ.
  1. #新增人员
  2. rabbitmqctl add_user hua abc123uuPP
  3. #设置权限
  4. rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"
  5. #设置为管理员
  6. rabbitmqctl set_user_tags hua administrator
复制代码
* 表示授予该用户对该虚拟主机上全部队列和交换机的 configure、write 和 read 权限。


  • 第一个 ".*" 表示用户可以设置任意队列和交换机。
  • 第二个 ".*" 表示用户可以向任意队列和交换机发送消息。
  • 第三个 ".*" 表示用户可以从任意队列中消耗消息。
 实行过程如图:

实行上面命令增加一个Web管理员:


  • 用户名称:hua
  • 密码:abc123uuPP
  • 权限 :管理员
  如果只在本地localhost登陆RabbitWeb管理平台,用默认的账号登陆即可:


  • 默认用户:guest
  • 默认密码:guest

三 RabbitMQ Web 管理

1 RabbitMQ Web 登陆

 进入RabbitMQ Web 登陆页面如下:

起首我们使用默认账号密码尝试登陆,为了安全确实限制本地登陆,如下图:

使用上面新建的账号hua登陆,登陆乐成如下图:

2 用户管理

 用户管理,用户增加利用简朴,如下图:

  用户管理,用户权限设置利用简朴,如下图:

用户利用界面非凡人性化,可以很方便设置权限,修改用户资料。 

3 虚拟主机(紧张)

虚拟主机(vhost)是 RabbitMQ 中的一种逻辑隔离机制,它相当于一个独立的命名空间。每个虚拟主机内部可以拥有自己独立的队列、交换机、绑定等资源,相互之间相互隔离,不能共享资源。


  • 命名空间:每个虚拟主机都有自己的队列、交换机、绑定等资源。
  • 资源隔离:不同虚拟主机之间的资源(如队列和交换机)完全隔离,防止不同应用间的资源冲突。
  • 用户权限:不同的用户可以被授予不同虚拟主机的访问权限,确保用户只能访问指定的虚拟主机中的资源。
虚拟主机提供了一种隔离和权限管理的方式,适用于以了局景:


  • 多租户架构:在 SaaS(软件即服务)或多租户应用中,你可以为不同的租户创建不同的虚拟主机,以确保数据隔离。
  • 开辟与生产环境隔离:你可以为开辟环境和生产环境创建不同的虚拟主机,制止资源冲突和干扰。
  • 权限管理:不同的用户或应用可以通过虚拟主机进行权限分离,确保只有特定用户才能访问某些资源。
默认虚拟主机
RabbitMQ 默认创建一个虚拟主机 /,这是一个特别的虚拟主机,通常用于测试或默认情况下的资源管理。生产环境中,建议创建和使用新的虚拟主机,以更好地管理资源和权限。 
虚拟主机利用也非常简朴,如下图:
 在用户管理界面选择用户绑定指定的虚拟主机,非常方便,如下图:
 

功能强大,非常好用。 

四 java代码接入

  方式一 java通用:

  1 引入mvn依靠
  1.         <dependency>
  2.             <groupId>com.rabbitmq</groupId>
  3.             <artifactId>amqp-client</artifactId>
  4.             <version>5.20.0</version>
  5.         </dependency>
复制代码
  JAVA 连接RabbitMQ生产消息与接收消耗测试代码:
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * @author hua
  9. * @date 2024-08-21 18:01
  10. */
  11. public class TestRabbitMQ {
  12.     private final static String QUEUE_NAME = "hello";
  13.     public static void main1(String[] args) {
  14.         ConnectionFactory factory = new ConnectionFactory();
  15.         factory.setHost("xx.xx.xx.xx");
  16.         factory.setPort(51091);
  17.         factory.setUsername("java_producer");
  18.         factory.setPassword("java_producer");
  19.         try (Connection connection = factory.newConnection();
  20.              Channel channel = connection.createChannel()) {
  21.             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  22.             String message = "Hello World!";
  23.             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  24.             System.out.println(" [x] Sent '" + message + "'");
  25.         } catch (TimeoutException e) {
  26.             e.printStackTrace();
  27.         } catch (IOException e) {
  28.             e.printStackTrace();
  29.         }
  30.     }
  31.     public static void main(String[] argv) throws Exception {
  32.         // 创建连接工厂
  33.         ConnectionFactory factory = new ConnectionFactory();
  34.         factory.setHost("xx.xx.xx.xx");
  35.         factory.setPort(51091);
  36.         factory.setUsername("java_consumer");
  37.         factory.setPassword("java_consumer");
  38.         // 连接到 RabbitMQ 服务器
  39.         try (Connection connection = factory.newConnection();
  40.              Channel channel = connection.createChannel()) {
  41.             // 声明队列(确保队列存在)
  42.             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  43.             System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  44.             // 定义回调函数,当有消息送达时执行
  45.             DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  46.                 String message = new String(delivery.getBody(), "UTF-8");
  47.                 System.out.println(" [x] Received '" + message + "'");
  48.             };
  49.             // 消费消息
  50.             channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
  51.         }
  52.     }
  53. }
复制代码
 测试运行发送消息,发送乐成。如下图:
 

 测试运行接收消息,消耗乐成。如下图:

 上面测试通事后,改成服务类方便生产环境使用来发送消息代码:
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.MessageProperties;
  5. import org.apache.logging.log4j.LogManager;
  6. import org.apache.logging.log4j.Logger;
  7. import org.springframework.stereotype.Service;
  8. import java.io.IOException;
  9. import java.util.concurrent.TimeoutException;
  10. /**
  11. * @author hua
  12. * @date 2024-08-22
  13. */
  14. @Service
  15. public class RabbitMqServiceImpl {
  16.     private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
  17.     private static final String QUEUE_NAME = "test";
  18.     private Connection connection;
  19.     private Channel channel;
  20.     public RabbitMqServiceImpl() {
  21.         ConnectionFactory factory = new ConnectionFactory();
  22.         factory.setHost("xx.xx.xx.xx");
  23.         factory.setPort(51091);
  24.         factory.setUsername("java_producer");
  25.         factory.setPassword("java_producer");
  26.         //如果不指定虚拟机默认会使用/
  27.         factory.setVirtualHost("test");
  28.         try {
  29.             this.connection = factory.newConnection();
  30.             this.channel = connection.createChannel();
  31.             this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  32.             logger.info("RabbitMqServiceImpl initialized successfully.");
  33.         } catch (IOException | TimeoutException e) {
  34.             e.printStackTrace();
  35.             logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());
  36.             throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
  37.         }
  38.     }
  39.     public void sendMessage(String message) {
  40.         try {
  41.             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  42.             System.out.println(" [x] Sent '" + message + "'");
  43.         } catch (IOException e) {
  44.             e.printStackTrace();
  45.             logger.error("Failed to send message: {}", e.getMessage());
  46.         }
  47.     }
  48.     public void close() {
  49.         try {
  50.             if (channel != null && channel.isOpen()) {
  51.                 channel.close();
  52.             }
  53.             if (connection != null && connection.isOpen()) {
  54.                 connection.close();
  55.             }
  56.         } catch (IOException | TimeoutException e) {
  57.             e.printStackTrace();
  58.         }
  59.     }
  60. }
复制代码
   上面的代码存在题目,未确认发送乐成,有丢失风险,再改善如下:
  1. import com.qyhua.common.table.db_hex_fail_log.entity.DbHexFailLog;
  2. import com.qyhua.common.table.db_hex_fail_log.service.impl.DbHexFailLogServiceImpl;
  3. import com.rabbitmq.client.*;
  4. import org.apache.logging.log4j.LogManager;
  5. import org.apache.logging.log4j.Logger;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. import javax.annotation.PostConstruct;
  9. import javax.annotation.PreDestroy;
  10. import java.io.IOException;
  11. import java.time.LocalDateTime;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. import java.util.Map;
  15. import java.util.concurrent.ConcurrentNavigableMap;
  16. import java.util.concurrent.ConcurrentSkipListMap;
  17. import java.util.concurrent.TimeoutException;
  18. /**
  19. * @author hua
  20. * @date 2024-08-22
  21. */
  22. @Service
  23. public class RabbitMqServiceImpl {
  24.     private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
  25.     private static final String QUEUE_NAME = "hex_kyc";
  26.     private Connection connection;
  27.     private Channel channel;
  28.     //存放所有消息,确认时删除,没确认的保存到数据库
  29.     private ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
  30.     @Autowired
  31.     DbHexFailLogServiceImpl dbHexFailLogService;
  32.     @PostConstruct
  33.     public void init() {
  34.         ConnectionFactory factory = new ConnectionFactory();
  35.         factory.setHost("xx.xx.xx.xx");
  36.         factory.setPort(xxx);
  37.         factory.setUsername("java_producer");
  38.         factory.setPassword("java_producer");
  39.         factory.setVirtualHost("xxxx");
  40.         factory.setConnectionTimeout(3000);
  41.         try {
  42.             this.connection = factory.newConnection();
  43.             this.channel = connection.createChannel();
  44.             this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  45.             // 启用发布者确认模式
  46.             this.channel.confirmSelect();
  47.             setupConfirmListener();
  48.             logger.info("RabbitMqServiceImpl initialized successfully.");
  49.         } catch (IOException | TimeoutException e) {
  50.             logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage(), e);
  51.             throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
  52.         }
  53.     }
  54.     public void sendMessage(String message) {
  55.         try {
  56.             long nextSeqNo = channel.getNextPublishSeqNo();
  57.             outstandingConfirms.put(nextSeqNo, message);
  58.             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  59.             logger.info(" [x] Sent '{}'", message);
  60.         } catch (Exception e) {
  61.             logger.error("Failed to send message: {}", e.getMessage(), e);
  62.             saveFailedMessageToDatabase(message,"CF");
  63.         }
  64.     }
  65.     // 设置接收监听器,记录未确认的消息
  66.     private void setupConfirmListener() {
  67.         ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
  68.             if (multiple) {
  69.                 outstandingConfirms.headMap(deliveryTag + 1).clear();
  70.             } else {
  71.                 outstandingConfirms.remove(deliveryTag);
  72.             }
  73.             System.out.println("Message confirmed ok deliveryTag="+deliveryTag);
  74.         };
  75.         ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
  76.             if (multiple) {
  77.                 // 获取从起点到 `deliveryTag + 1` 之间的所有未确认的消息
  78.                 ConcurrentNavigableMap<Long, String> unconfirmedMessages = outstandingConfirms.headMap(deliveryTag + 1);
  79.                 List<String> FailList= new ArrayList<>();
  80.                 for (Map.Entry<Long, String> entry : unconfirmedMessages.entrySet()) {
  81.                     String failedMessage = entry.getValue();
  82.                     logger.error("Message not confirmed: deliveryTag={}, message={}", entry.getKey(), failedMessage);
  83.                     FailList.add(failedMessage);
  84.                 }
  85.                 saveFailedMessageToDatabaseBy(FailList);  // 批量保存到数据库
  86.                 unconfirmedMessages.clear();  // 清除这些未确认的消息
  87.             } else {
  88.                 String failedMessage = outstandingConfirms.get(deliveryTag);
  89.                 logger.error("Message not confirmed: deliveryTag={}, message={}", deliveryTag, failedMessage);
  90.                 saveFailedMessageToDatabase(failedMessage,"SF");
  91.                 outstandingConfirms.remove(deliveryTag);  // 移除单条未确认的消息
  92.             }
  93.         };
  94.         channel.addConfirmListener(ackCallback, nackCallback);
  95.     }
  96.     private void saveFailedMessageToDatabaseBy(List<String> failList) {
  97.         List<DbHexFailLog> list=new ArrayList<>(failList.size());
  98.         LocalDateTime now = LocalDateTime.now();
  99.         for (String message : failList) {
  100.             DbHexFailLog f=new DbHexFailLog();
  101.             f.setInHexStr(message);
  102.             f.setCtime(now);
  103.             f.setFlag("SF");
  104.             list.add(f);
  105.         }
  106.         dbHexFailLogService.saveBatch(list,list.size());
  107.         failList.clear();
  108.     }
  109.     private void saveFailedMessageToDatabase(String message,String flag) {
  110.         DbHexFailLog f=new DbHexFailLog();
  111.         f.setInHexStr(message);
  112.         f.setCtime(LocalDateTime.now());
  113.         f.setFlag(flag);
  114.         dbHexFailLogService.save(f);
  115.     }
  116.     @PreDestroy
  117.     public void close() {
  118.         try {
  119.             if (channel != null && channel.isOpen()) {
  120.                 channel.close();
  121.             }
  122.             if (connection != null && connection.isOpen()) {
  123.                 connection.close();
  124.             }
  125.             logger.info("RabbitMqServiceImpl resources closed successfully.");
  126.         } catch (IOException | TimeoutException e) {
  127.             logger.error("Failed to close RabbitMqServiceImpl resources: {}", e.getMessage(), e);
  128.         }
  129.     }
  130. }
复制代码
上面的代码优化后,主要增加了三项如下:
 1 Publisher Confirms 机制


  • 启用 channel.confirmSelect() 来激活发布者确认模式。
  • 使用 ConfirmCallback 和 NackCallback 来处理消息的确认与未确认逻辑。
  • 未确认的消息会被保存到数据库中。
2 保存失败的消息到数据库。

3 在 @PreDestroy 方法中关闭 Channel 和 Connection,确保服务烧毁时正确关闭资源。
 
方式二 SpringBoot框架使用

mvn依靠包:
  1.     <dependency>
  2.         <groupId>org.springframework.boot</groupId>
  3.         <artifactId>spring-boot-starter</artifactId>
  4.     </dependency>
复制代码
spring设置文件:
  1. Spring:
  2. rabbitmq:
  3.     host: xx.xx.xx.xx
  4.     port: 51091
  5.     username: java_consumer
  6.     password: java_consumer
  7.     virtual-host: hellow
  8.     connection-timeout: 6000
复制代码
JAVA代码:
  发送消息java代码:
  1. import org.springframework.amqp.core.Queue;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * @author hua
  7. * @date 2024-08-22
  8. */
  9. @Component
  10. public class MessageProducer {
  11.     @Autowired
  12.     private RabbitTemplate rabbitTemplate;
  13.     @Autowired
  14.     private Queue queue;
  15.     public void sendMessage(String message) {
  16.         rabbitTemplate.convertAndSend(queue.getName(), message);
  17.         System.out.println(" [x] Sent '" + message + "'");
  18.     }
  19. }
复制代码
  接收消息java代码:
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @author hua
  6. * @date 2024-08-22
  7. */
  8. @Component
  9. public class RabbitListener {
  10.     private static final Logger logger = LogManager.getLogger(RabbitListener.class);
  11.     @RabbitListener(queues = "test")
  12.     public void receiveMessage(String message) {
  13.         try {
  14.             System.out.println("rabbit rev <- "+message);
  15.             //具体业务
  16.         } catch (Exception e) {
  17.             e.printStackTrace();
  18.             logger.error("rabbit err= ", e);
  19.         }
  20.     }
  21. }
复制代码
   上面代码在生产发送消息时通过编码方式更机动,接收直接使用注解更简朴。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

悠扬随风

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表