ToB企服应用市场:ToB评测及商务社交产业平台
标题:
RabbitMQ 最新版 安装,设置,java接入使用(详细教程)
[打印本页]
作者:
悠扬随风
时间:
2024-9-12 19:48
标题:
RabbitMQ 最新版 安装,设置,java接入使用(详细教程)
一 RabbitMQ下载
RabbitMQ 官网最新版下载:
RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ依靠
erlang-26.2.5.2-1.el7.x86_64.rpm
下载:
https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm
二 RabbitMQ安装
1 安装
erlang
环境
安装
RabbitMQ
前要先安装
erlang
环境,因为RabbitMQ是用
erlang
开辟的
实行安装指令如下:
rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm
复制代码
实行后如下图:
验证
erlang
安装是否乐成,实行erl可以查看版本,说明安装乐成如下图:
2 安装RabbitMQ
实行安装RabbitMQ指令如下:
rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm
复制代码
实行安装中,如下图:
留意
:如果
erlang
环境没有安装好,或者版本与当前rabbitMQ不匹配则会报错以下错误,提示需要指定范围的依靠版本,如下图:
如果出现上图的错误,请参考上一步重新安装
erlang
环境即可。
安装竣事后,消息队列数据保存在哪?日记在哪?想相识更多的信息?
只需一条指令可查询当前状态信息:
rabbitmq-diagnostics status
复制代码
实行后如下图:
从上图状态中可以看出如今没有使用任何设置文件,以可以看到以下有效的信息:
数据目录: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
日记文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log
上图信息很详细,可以说开辟者开辟这个工具非常的细心,对软件有充足相识使用也安心!
3 设置RabbitMQ(可选项)
安装好后RabbitMQ没有使用任何的设置文件(也没有默认设置文件),但会生成一个空目录位置在:/etc/rabbitmq/ ,在这里你可以按照自己的需求参考官方网站设置自己的项目,格式支持有多种,下面我这里要变更默认端口为例创建一个设置文件:
vi /etc/rabbitmq/rabbitmq.config
复制代码
设置文件内容:
[
{rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},
{rabbitmq_management, [
{listener, [{port,59876}, {ssl, false}]}
]}
].
复制代码
通过设置设置文件实现变更:
客户端 51091 用于消耗或生产端连接,IP 0.0.0.0 代表绑定服务器表里网IP。
管理端口 59876 用于RabbitMQ的Web管理。
再次实行
rabbitmq-diagnostics status
查看新增的设置文件是否被使用,如下图:
上图可以看到刚刚创建的设置文件已被引用状态。
4 RabbitMQ 启动与关闭
RabbitMQ安装好后最终是服务状态,可以通过服务管理控制:
#启动
systemctl start rabbitmq-server
#停止关闭
systemctl stop rabbitmq-server
#重启
systemctl restart rabbitmq-server
#开机启动
systemctl enable rabbitmq-server
#查看状态
systemctl status rabbitmq-server
复制代码
利用如下图:
5 开启RabbitMQ的Web管理界面(可选项,强烈建议开启)
RabbitMQ的安装后自带Web管理界面,但是需要实行以下指令开启:
rabbitmq-plugins enable rabbitmq_management
复制代码
我们寻常只需要一名管员即可,后面要增加用户或设置权限直接在Web利用即可。
新增一位
RabbitMQ的Web管理员并增加设置管理权限 ,
用于管理RabbitMQ.
#新增人员
rabbitmqctl add_user hua abc123uuPP
#设置权限
rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"
#设置为管理员
rabbitmqctl set_user_tags hua administrator
复制代码
* 表示授予该用户对该虚拟主机上全部队列和交换机的 configure、write 和 read 权限。
第一个 ".*" 表示用户可以设置任意队列和交换机。
第二个 ".*" 表示用户可以向任意队列和交换机发送消息。
第三个 ".*" 表示用户可以从任意队列中消耗消息。
实行过程如图:
实行上面命令增加一个Web管理员:
用户名称:hua
密码:abc123uuPP
权限 :管理员
如果只在
本地localhost登陆
RabbitWeb管理平台,用默认的账号登陆即可:
默认用户:guest
默认密码:guest
三 RabbitMQ Web 管理
1 RabbitMQ Web 登陆
进入RabbitMQ Web 登陆页面如下:
起首我们使用默认账号密码尝试登陆,为了安全确实限制本地登陆,如下图:
使用上面新建的账号hua登陆,登陆乐成如下图:
2 用户管理
用户管理,用户增加利用简朴,如下图:
用户管理,用户权限设置利用简朴,如下图:
用户利用界面非凡人性化,可以很方便设置权限,修改用户资料。
3 虚拟主机(紧张)
虚拟主机(vhost)是 RabbitMQ 中的一种逻辑隔离机制,它相当于一个独立的命名空间。每个虚拟主机内部可以拥有自己独立的队列、交换机、绑定等资源,相互之间相互隔离,不能共享资源。
命名空间
:每个虚拟主机都有自己的队列、交换机、绑定等资源。
资源隔离
:不同虚拟主机之间的资源(如队列和交换机)完全隔离,防止不同应用间的资源冲突。
用户权限
:不同的用户可以被授予不同虚拟主机的访问权限,确保用户只能访问指定的虚拟主机中的资源。
虚拟主机提供了一种隔离和权限管理的方式,适用于以了局景:
多租户架构
:在 SaaS(软件即服务)或多租户应用中,你可以为不同的租户创建不同的虚拟主机,以确保数据隔离。
开辟与生产环境隔离
:你可以为开辟环境和生产环境创建不同的虚拟主机,制止资源冲突和干扰。
权限管理
:不同的用户或应用可以通过虚拟主机进行权限分离,确保只有特定用户才能访问某些资源。
默认虚拟主机
RabbitMQ 默认创建一个虚拟主机 /,这是一个特别的虚拟主机,通常用于测试或默认情况下的资源管理。生产环境中,建议创建和使用新的虚拟主机,以更好地管理资源和权限。
虚拟主机利用也非常简朴,如下图:
在用户管理界面选择用户绑定指定的虚拟主机,非常方便,如下图:
功能强大,非常好用。
四 java代码接入
方式一 java通用:
1 引入mvn依靠
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
复制代码
JAVA 连接RabbitMQ生产消息与接收消耗测试代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hua
* @date 2024-08-21 18:01
*/
public class TestRabbitMQ {
private final static String QUEUE_NAME = "hello";
public static void main1(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(51091);
factory.setUsername("java_producer");
factory.setPassword("java_producer");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(51091);
factory.setUsername("java_consumer");
factory.setPassword("java_consumer");
// 连接到 RabbitMQ 服务器
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列(确保队列存在)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 定义回调函数,当有消息送达时执行
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
}
复制代码
测试运行发送消息,发送乐成。如下图:
测试运行接收消息,消耗乐成。如下图:
上面测试通事后,改成服务类方便生产环境使用来发送消息代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hua
* @date 2024-08-22
*/
@Service
public class RabbitMqServiceImpl {
private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
private static final String QUEUE_NAME = "test";
private Connection connection;
private Channel channel;
public RabbitMqServiceImpl() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(51091);
factory.setUsername("java_producer");
factory.setPassword("java_producer");
//如果不指定虚拟机默认会使用/
factory.setVirtualHost("test");
try {
this.connection = factory.newConnection();
this.channel = connection.createChannel();
this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
logger.info("RabbitMqServiceImpl initialized successfully.");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());
throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
}
}
public void sendMessage(String message) {
try {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
logger.error("Failed to send message: {}", e.getMessage());
}
}
public void close() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
复制代码
上面的代码存在题目,未确认发送乐成,有丢失风险,再改善如下:
import com.qyhua.common.table.db_hex_fail_log.entity.DbHexFailLog;
import com.qyhua.common.table.db_hex_fail_log.service.impl.DbHexFailLogServiceImpl;
import com.rabbitmq.client.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
/**
* @author hua
* @date 2024-08-22
*/
@Service
public class RabbitMqServiceImpl {
private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
private static final String QUEUE_NAME = "hex_kyc";
private Connection connection;
private Channel channel;
//存放所有消息,确认时删除,没确认的保存到数据库
private ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
@Autowired
DbHexFailLogServiceImpl dbHexFailLogService;
@PostConstruct
public void init() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(xxx);
factory.setUsername("java_producer");
factory.setPassword("java_producer");
factory.setVirtualHost("xxxx");
factory.setConnectionTimeout(3000);
try {
this.connection = factory.newConnection();
this.channel = connection.createChannel();
this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 启用发布者确认模式
this.channel.confirmSelect();
setupConfirmListener();
logger.info("RabbitMqServiceImpl initialized successfully.");
} catch (IOException | TimeoutException e) {
logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage(), e);
throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
}
}
public void sendMessage(String message) {
try {
long nextSeqNo = channel.getNextPublishSeqNo();
outstandingConfirms.put(nextSeqNo, message);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
logger.info(" [x] Sent '{}'", message);
} catch (Exception e) {
logger.error("Failed to send message: {}", e.getMessage(), e);
saveFailedMessageToDatabase(message,"CF");
}
}
// 设置接收监听器,记录未确认的消息
private void setupConfirmListener() {
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
outstandingConfirms.headMap(deliveryTag + 1).clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("Message confirmed ok deliveryTag="+deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
if (multiple) {
// 获取从起点到 `deliveryTag + 1` 之间的所有未确认的消息
ConcurrentNavigableMap<Long, String> unconfirmedMessages = outstandingConfirms.headMap(deliveryTag + 1);
List<String> FailList= new ArrayList<>();
for (Map.Entry<Long, String> entry : unconfirmedMessages.entrySet()) {
String failedMessage = entry.getValue();
logger.error("Message not confirmed: deliveryTag={}, message={}", entry.getKey(), failedMessage);
FailList.add(failedMessage);
}
saveFailedMessageToDatabaseBy(FailList); // 批量保存到数据库
unconfirmedMessages.clear(); // 清除这些未确认的消息
} else {
String failedMessage = outstandingConfirms.get(deliveryTag);
logger.error("Message not confirmed: deliveryTag={}, message={}", deliveryTag, failedMessage);
saveFailedMessageToDatabase(failedMessage,"SF");
outstandingConfirms.remove(deliveryTag); // 移除单条未确认的消息
}
};
channel.addConfirmListener(ackCallback, nackCallback);
}
private void saveFailedMessageToDatabaseBy(List<String> failList) {
List<DbHexFailLog> list=new ArrayList<>(failList.size());
LocalDateTime now = LocalDateTime.now();
for (String message : failList) {
DbHexFailLog f=new DbHexFailLog();
f.setInHexStr(message);
f.setCtime(now);
f.setFlag("SF");
list.add(f);
}
dbHexFailLogService.saveBatch(list,list.size());
failList.clear();
}
private void saveFailedMessageToDatabase(String message,String flag) {
DbHexFailLog f=new DbHexFailLog();
f.setInHexStr(message);
f.setCtime(LocalDateTime.now());
f.setFlag(flag);
dbHexFailLogService.save(f);
}
@PreDestroy
public void close() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
logger.info("RabbitMqServiceImpl resources closed successfully.");
} catch (IOException | TimeoutException e) {
logger.error("Failed to close RabbitMqServiceImpl resources: {}", e.getMessage(), e);
}
}
}
复制代码
上面的代码优化后,主要增加了三项如下:
1 Publisher Confirms 机制
:
启用 channel.confirmSelect() 来激活发布者确认模式。
使用 ConfirmCallback 和 NackCallback 来处理消息的确认与未确认逻辑。
未确认的消息会被保存到数据库中。
2 保存失败的消息到数据库。
3 在 @PreDestroy 方法中关闭 Channel 和 Connection,确保服务烧毁时正确关闭资源。
方式二 SpringBoot框架使用
mvn依靠包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
复制代码
spring设置文件:
Spring:
rabbitmq:
host: xx.xx.xx.xx
port: 51091
username: java_consumer
password: java_consumer
virtual-host: hellow
connection-timeout: 6000
复制代码
JAVA代码:
发送消息java代码:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author hua
* @date 2024-08-22
*/
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
复制代码
接收消息java代码:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author hua
* @date 2024-08-22
*/
@Component
public class RabbitListener {
private static final Logger logger = LogManager.getLogger(RabbitListener.class);
@RabbitListener(queues = "test")
public void receiveMessage(String message) {
try {
System.out.println("rabbit rev <- "+message);
//具体业务
} catch (Exception e) {
e.printStackTrace();
logger.error("rabbit err= ", e);
}
}
}
复制代码
上面代码在生产发送消息时通过编码方式更机动,接收直接使用注解更简朴。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4