ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Spring RabbitMQ 解决 生产者丢失消息、消息列表丢失消息、消耗者丢失消息
[打印本页]
作者:
自由的羽毛
时间:
2024-8-20 09:54
标题:
Spring RabbitMQ 解决 生产者丢失消息、消息列表丢失消息、消耗者丢失消息
SpringBoot整合RabbitMQ
生产环境中使用RabbitMQ时需要包管消息的可靠传输,消息不可靠的环境大概是消息丢失,挟制等原因;
丢失又分为:
生产者丢失消息、消息列表丢失消息、消耗者丢失消息
;
本文重要展示怎样解决上述3种问题
1.简单使用
1.1生产者
导入POM
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
配置交换机、队列、绑定
@Configuration
public class MyRabbitConfig {
public static final String SECKILL_ROUTING_KEY = "seckill";
public static final String DIRECT_QUEUE_SECKILL = "queue_seckill";
public static final String DIRECT_EXCHANGE_SECKILL = "exchange_seckill";
// 将发送的数据转为JSON
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
//队列 起名:DirectQueue
@Bean
public Queue DirectQueue() {
return new Queue(DIRECT_QUEUE_SECKILL,true);
}
//Direct交换机 起名:DirectExchange
@Bean
DirectExchange DirectExchange() {
return new DirectExchange(DIRECT_EXCHANGE_SECKILL);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:SECKILL_ROUTING_KEY
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with(SECKILL_ROUTING_KEY);
}
}
复制代码
配置文件
spring:
rabbitmq:
host: 192.168.79.129
port: 5672
virtual-host: /
复制代码
使用(@EnableRabbit)
RabbitTemplate 发送消息
复制代码
1.2消耗者
导包
配置文件
使用
使用@RabbitListener 指定消耗的队列
使用@RabbitHandler 指定处理方法
@Service
@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_SECKILL)
public class RabbitConsumer {
AtomicInteger count = new AtomicInteger(0);
@Resource
GoodsService goodsService;
// 单线程处理
@RabbitHandler
public void handle(String seckillDTOJson) {
Gson gson = new Gson();
SeckillDTO seckillDTO = gson.fromJson(seckillDTOJson, SeckillDTO.class);
goodsService.payOrder(seckillDTO.getUserId(), seckillDTO.getGoodsId());
System.out.println("处理:" + seckillDTO + count.incrementAndGet());
}
复制代码
使用(@EnableRabbit)
2.生产者发布确认
修改RabbitTemplate 配置
rabbitTemplate.setConfirmCallback():设置发布确认处理函数
实现接口:
RabbitTemplate.ConfirmCallback
rabbitTemplate.setMandatory(true) + rabbitTemplate.setReturnCallback():设置消息回退处理函数
实现接口:
RabbitTemplate.ReturnCallback
@Service
public class RabbitService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
// todo 如何知道消费者正确消费了消息?
@Resource
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 交换机发布确认
rabbitTemplate.setConfirmCallback(this);
// 消息回退
rabbitTemplate.setMandatory(true);
// 消息回退处理
rabbitTemplate.setReturnCallback(this);
}
public String sendSeckillMessage(SeckillDTO seckillDTO) {
Gson gson = new Gson();
String toJson = gson.toJson(seckillDTO);
rabbitTemplate.convertAndSend(
MyRabbitConfig.DIRECT_EXCHANGE_SECKILL, MyRabbitConfig.SECKILL_ROUTING_KEY, toJson);
return "ok";
}
/**
* @param correlationData 发送消息时我们所绑定的相关数据【rabbitTemplate.convertAndSend中发送】
* @param ack 是否成功投递到交换机
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
}
复制代码
3.设置消息持久化
3.1 队列持久化
队列持久化
在创建(new)队列时指定
参数:
name
:队列名
durable
:是否持久化
exclusive
:体现队列是否为排他的(独占的)。假如
设置为true,则该队列只能被当前连接使用,并且在连接断开后自动删除
。此特性适用于一个客户端需要独享队列的场景,比如用于临时队列或者测试环境。
autoDelete
:
体现队列是否在不再被使用时自动删除
。假如设置为true,
一旦末了一个消耗者取消订阅,队列就会被删除
。这个特性对于临时队列特别有用,可以避免不须要的资源占用。
arguments
:
允许用户自定义一些额外的参数
,这些参数可以用来设置队列的特定举动或特性,比如
死信队列
的配置、
消息逾期时间
等。
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) {
}
复制代码
3.2 消息持久化
消息持久化在
消息创建时指定
// 指定消息属性的对象
MessageProperties messageProperties = new MessageProperties();
// 设置消息交付模式
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 创建消息
Message message = new Message("消息体".getBytes(StandardCharsets.UTF_8), messageProperties);
// ... 发送消息
复制代码
4.@RabbitHandler的传入参数
@RabbitHandler注解标记的方法可以接收多种类型的参数,详细取决于消息的内容以及配置的MessageConverter
。
以下是一些常见的参数类型:(
重要是Message,其他的内容可以中Message中本身取
)
1.
Message对象
:这是最原始的情势,你可以直接接收整个org.springframework.amqp.core.Message对象,它
包含了消息的所有元数据(如消息ID、路由键、交换机等)以及消息体。通过Message.getBody()方法可以得到消息的实际内容,内容通常是字节数组
。
-
message.getBody()
:获取消息体
-
message.getMessageProperties()
:获取消息元数据
2.
Channel对象
:获取该消耗者与对应队列的通道并进行一系列重要操作,详情见 [Channel详情](# 5. Channel对象)
自定义类型对象
:
假如你在发送消息时发送的是一个对象,并且配置了相应的MessageConverter(如Jackson2JsonMessageConverter或SimpleMessageConverter),那么接收端的@RabbitHandler方法可以直接接收该对象类型作为参数。比方,假如你发送的是一个User对象,那么处理方法就可以声明一个User类型的参数。
@Payload
注解的使用:
当消息体需要被直接映射到方法参数时,可以使用@Payload注解来
明确指定哪个参数应该接收消息体
。这在方法接收多个参数时特别有用。
其他附加信息参数:(
@Headers对应的Map我们可以在发送消息时配置
)
可以使用**@Header**注解来获取消息头中的特定信息,如消息ID、时间戳等(
建议搭配AmqpHeaders常量类
)。
@Headers注解可以用来接收所有消息头作为一个Map<String, Object>。
@SendTo和@SendToUser注解
:
虽然这不是参数类型,但是与@RabbitHandler相关的注解,它们用于指示处理方法处理消息后应发送响应到哪个队列或目的地。
@Component
public class MyMessageReceiver {
@RabbitHandler
public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 处理消息的逻辑
System.out.println("Received message: " + new String(message.getBody());
// 成功处理完消息后,手动发送确认
channel.basicAck(tag, false); // 第二个参数false表示只确认当前消息
} catch (Exception e) {
// 如果消息处理失败,可以选择拒绝消息(requeue参数决定是否重新入队)
channel.basicNack(tag, false, true); // 第三个参数true表示重新入队列
// 或者也可以使用basicReject方法,效果类似但不支持批量确认
// channel.basicReject(tag, true);
}
}
@RabbitHandler
public void handleUserMessage(User user) {
// 直接处理User对象
System.out.println("Received User: " + user);
}
@RabbitHandler
public void handleMessageWithDetails(Message message) {
// 处理原始Message对象
byte[] body = message.getBody();
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
System.out.println("Received Message with routingKey: " + routingKey);
}
@RabbitHandler
public void handleMessageWithHeader(@Payload String content, @Headers Map<String, Object> headers) {
// 获取消息体和消息头
System.out.println("Content: " + content);
System.out.println("Headers: " + headers);
}
}
复制代码
5. Channel对象
在RabbitMQ和Spring AMQP的上下文中,Channel对象是一个非常重要的概念,它提供了与RabbitMQ服务器交互的基础功能。在你的@RabbitHandler方法签名中,Channel参数允许你直接执行一些低级别的AMQP操作,如消息确认、拒绝或 nack。以下是Channel对象的一些关键功能:
注
:long deliveryTag 可以在发送时手动指定,也可以自动生成
消息确认
(Acknowledgment):
如之前讨论的,Channel可以用来手动确认消息已经被成功处理,通过basicAck(long deliveryTag, boolean multiple)方法。deliveryTag是消息的唯一标识,multiple标记决定是否同时确认之前的消息(批量确认)(假如为true)。
消息拒绝
(Reject)与
重新入队
(Requeue):
假如消息处理失败,可以通过basicReject(long deliveryTag, boolean requeue)方法拒绝消息。requeue参数指示RabbitMQ是否应该实验重新将消息放入队列以供后续重试。
消息的Nack
(Negative Acknowledgment):
basicNack(long deliveryTag, boolean multiple, boolean requeue)方法类似于拒绝,但它允许批量操作(当multiple为true时),可以一次性拒绝或重新入队多个消息。
发布消息:
虽然通常我们使用RabbitTemplate来发送消息,
但Channel也提供了直接发送消息的本事
,如通过
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)方法。
事务管理
:
假如需要在消息发布和确认之间保持原子性,可以使用Channel.txSelect()开始事务,然后在一系列操作后调用Channel.txCommit()提交,或者在遇到错误时调用Channel.txRollback()回滚。
队列、交换机和绑定操作:
虽然这些操作通常在应用启动时通过配置完成,
但Channel也提供了创建、删除队列、交换机以及绑定等操作的API
,如queueDeclare, exchangeDeclare, queueBind等。
流控
(Flow Control):
通过basicQos(int prefetchSize, int prefetchCount, boolean global)方法,可以
设置预取限制(prefetch count),控制消耗者从队列中获取消息的速率,以实现流量控制。
综上所述,Channel是RabbitMQ客户端与服务端交互的核心接口,它提供了丰富的功能来控制消息的发送、接收、确认、拒绝等操作,是实现消息可靠传输和高级消息处理逻辑的基础。
6.消耗者手动确认
在RabbitMQ中,消息的删除通常是自动进行的,并且这一举动依靠于消息的确认模式。RabbitMQ提供了两种消息确认模式来确保消息被精确处理并决定何时可以从队列中删除消息:
自动确认
(Automatic Acknowledgment):
默认环境下
,RabbitMQ使用自动确认模式。在这种模式下,消耗者接收到消息后,RabbitMQ会立即将消息从队列中移除,而不需要消耗者明确发送确认。这种方式简单但风险在于,假如消耗者处理消息过程中崩溃,消息将会丢失。
手动确认
(Manual Acknowledgment): 若要更准确地控制消息的删除时机,可以使用手动确认模式。在这种模式下,消耗者在成功处理完一条消息后,需主动发送一个确认(ack)给RabbitMQ,告知消息已经被处理,RabbitMQ接收到确认后才会将消息从队列中删除。
@Component
@RabbitListener(queues = "your_queue_name")
public class YourConsumer {
@RabbitHandler
public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 处理消息的逻辑
System.out.println("Received message: " + new String(message.getBody());
// 成功处理完消息后,手动发送确认
channel.basicAck(tag, false); // 第二个参数false表示只确认当前消息
} catch (Exception e) {
// 如果消息处理失败,可以选择拒绝消息(requeue参数决定是否重新入队)
channel.basicNack(tag, false, true); // 第三个参数true表示重新入队列
// 或者也可以使用basicReject方法,效果类似但不支持批量确认
// channel.basicReject(tag, true);
}
}
}
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4