RabbitMQ事务机制和确认机制

民工心事  金牌会员 | 2024-6-19 04:05:21 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 539|帖子 539|积分 1617

生产者:RabbitMQ提供transaction和confirm模式来确保生产者不丢消息

● 通过事务实现
● 通过发送方确认机制(publisher confirm)实现
1.1事务机制:发送消息前,开启事务(channel.txSelect()),然后发送消息,假如发送过程中出现什么异常,事务就会回滚(channel.txRollback()),假如发送成功则提交事务(channel.txCommit())。这种方式有个缺点:吞吐量下降;
事务实现
● channel.txSelect(): 将当前信道设置成事务模式
● channel.txCommit(): 用于提交事务
● channel.txRollback(): 用于回滚事务
通过事务实现机制,只有消息成功被rabbitmq服务器吸收,事务才能提交成功,否则便可在捕获异常之后举行回滚,然后举行消息重发,但是事务非常影响rabbitmq的性能。另有就是事务机制是阻塞的过程,只有等待服务器回应之后才会处理下一条消息
confirm模式用的居多
一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包罗消息的唯一ID),这就使得生产者知道消息已经精确到达目的队列了;假如rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以举行重试操作。
confirm方式有三种模式:平凡confirm模式、批量confirm模式、异步confirm模式
channel.confirmSelect(): 将当前信道设置成了confirm模式
平凡confirm模式
每发送一条消息,就调用waitForConfirms()方法,等待服务端返回Ack或者nack消息
RabbitMQ 事务机制

RabbitMQ 提供了事务机制来确保消息的可靠性传递。使用事务机制时,可以在发送消息之前开启一个事务,在事务内发送消息并举行确认提交,以确保消息被精确地发送到 RabbitMQ 中。
下面是使用 RabbitMQ 事务机制发送消息的一般步调:

  • 开启事务:通过 tx_select() 方法开启一个事务。
  • 发送消息:在事务内使用 basic_publish() 方法发送消息到指定的 Exchange 和队列。
  • 提交事务:通过 tx_commit() 方法提交事务,确保在事务内的操作生效。
  • 回滚事务(可选):假如发送消息过程中出现异常或错误,可以通过 tx_rollback() 方法回滚事务,撤销事务内的操作。
    当使用 Java 编程语言与 RabbitMQ 一起使用时,您可以通过以下步调来使用 RabbitMQ 的事务机制:
  1. 1. 创建 RabbitMQ 连接和信道:
  2. ConnectionFactory factory = new ConnectionFactory();
  3. factory.setHost("localhost");
  4. Connection connection = factory.newConnection();
  5. Channel channel = connection.createChannel();
  6. 2. 开启事务:
  7. channel.txSelect();
  8. 3. 发布消息到队列:
  9. String message = "Hello, RabbitMQ!";
  10. channel.basicPublish("", "queue_name", null, message.getBytes());
  11. 4. 提交事务:
  12. channel.txCommit();
  13. System.out.println("Message sent successfully");
  14. 5. 回滚事务(可选):
  15. channel.txRollback();
  16. System.out.println("Failed to send message");
  17. 需要注意的是,在使用事务机制时,如果发生异常或错误,您可以选择回滚事务以撤销之前的操作,或者提交事务以确认消息的发送。
  18. 下面是一个完整的示例代码,演示了如何在 Java 中使用 RabbitMQ 的事务机制:
  19. import com.rabbitmq.client.ConnectionFactory;
  20. import com.rabbitmq.client.Connection;
  21. import com.rabbitmq.client.Channel;
  22. public class RabbitMqTransactionExample {
  23.     private static final String QUEUE_NAME = "queue_name";
  24.     public static void main(String[] args) {
  25.         try {
  26.             ConnectionFactory factory = new ConnectionFactory();
  27.             factory.setHost("localhost");
  28.             Connection connection = factory.newConnection();
  29.             Channel channel = connection.createChannel();
  30.             // 开启事务
  31.             channel.txSelect();
  32.             String message = "Hello, RabbitMQ!";
  33.             // 发布消息到队列
  34.             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  35.             // 提交事务
  36.             channel.txCommit();
  37.             System.out.println("Message sent successfully");
  38.         } catch (Exception e) {
  39.             // 回滚事务
  40.             channel.txRollback();
  41.             System.out.println("Failed to send message");
  42.             e.printStackTrace();
  43.         }
  44.     }
  45. }
复制代码
请注意,事务机制会对性能产生一定的影响,因为它需要举行额外的操作来维护事务的划一性。在高并发场景下,使用事务可能会导致性能下降。因此,在选择使用事务机制时,请根据实际需求和性能要求举行衡量。
需要注意的是,事务机制会对性能产生一定的影响,因为它需要举行额外的操作来维护事务的划一性。在高并发场景下,使用事务可能会导致性能下降。因此,在选择使用事务机制时,请根据实际需求和性能要求举行衡量。
总的来说,事务机制提供了一种确保消息传递可靠性的方法,但在实际应用中需要慎重考虑其对性能的影响。在大部分情况下,使用确认机制(Publisher Confirm)已经能够满意消息传递的可靠性要求,而且对性能影响较小。
RabbitMQ确认机制

RabbitMQ 的确认机制(Publisher Confirm)是一种用于确保消息可靠发送的机制。通过确认机制,生产者可以在消息被成功投递到 RabbitMQ Broker 后得到确认,从而确保消息已经安全地生存在 Broker 中,而不是在传输过程中丢失。
使用确认机制时,需要将信道(channel)设置为 confirm 模式,然后在发送消息后等待确认。确认可以是单条消息的确认,也可以是批量消息的确认。
当使用 Java 编程语言与 RabbitMQ 一起使用时,您可以使用 RabbitMQ 的确认机制(Publisher Confirm)来确保消息的可靠发送。以下是使用 RabbitMQ 确认机制的一般步调:
  1. 1. 创建 RabbitMQ 连接和信道:
  2. ConnectionFactory factory = new ConnectionFactory();
  3. factory.setHost("localhost");
  4. Connection connection = factory.newConnection();
  5. Channel channel = connection.createChannel();
  6. 2. 设置信道为确认模式:
  7. channel.confirmSelect();
  8. 3. 发布消息到队列:
  9. String message = "Hello, RabbitMQ!";
  10. channel.basicPublish("", "queue_name", null, message.getBytes());
  11. 4. 等待确认:
  12. if (channel.waitForConfirms()) {
  13.     System.out.println("Message sent successfully");
  14. } else {
  15.     System.out.println("Failed to send message");
  16. }
  17. 在上述代码中,我们首先创建了 RabbitMQ 连接和信道。然后,通过 channel.confirmSelect() 将信道设置为确认模式。接下来,我们使用 basicPublish() 方法发布消息到指定的队列。最后,通过 channel.waitForConfirms() 等待消息的确认,如果成功确认,则输出 "Message sent successfully",否则输出 "Failed to send message"。
  18. 请注意,RabbitMQ 的确认机制支持批量确认,即可以一次确认多条消息。您可以在 basicPublish() 方法之前调用 channel.getNextPublishSeqNo() 获取当前消息的序列号,然后在 waitForConfirms() 方法中使用 channel.waitForConfirms(sequenceNumber) 来等待指定序列号之前的所有消息确认。
  19. 以下是一个完整的示例代码,演示了如何在 Java 中使用 RabbitMQ 的确认机制:
  20. import com.rabbitmq.client.ConnectionFactory;
  21. import com.rabbitmq.client.Connection;
  22. import com.rabbitmq.client.Channel;
  23. public class RabbitMqConfirmExample {
  24.     private static final String QUEUE_NAME = "queue_name";
  25.     public static void main(String[] args) {
  26.         try {
  27.             ConnectionFactory factory = new ConnectionFactory();
  28.             factory.setHost("localhost");
  29.             Connection connection = factory.newConnection();
  30.             Channel channel = connection.createChannel();
  31.             // 设置信道为确认模式
  32.             channel.confirmSelect();
  33.             String message = "Hello, RabbitMQ!";
  34.             // 发布消息到队列
  35.             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  36.             // 等待确认
  37.             if (channel.waitForConfirms()) {
  38.                 System.out.println("Message sent successfully");
  39.             } else {
  40.                 System.out.println("Failed to send message");
  41.             }
  42.         } catch (Exception e) {
  43.             System.out.println("Failed to send message");
  44.             e.printStackTrace();
  45.         }
  46.     }
  47. }
复制代码
确认机制是一种轻量且高效的方式来确保消息的可靠性传递。相比于事务机制,确认机制对性能的影响较小,因此在大多数情况下保举使用确认机制。
事务可以用spring-amqp吗

事务可以在Spring AMQP中使用。Spring AMQP是一个基于Spring Framework的项目,用于简化在Java应用步伐中使用AMQP(高级消息队列协议)的开发。AMQP是一个用于消息传递的开放式标准协议,用于在分布式体系中举行异步通信。
在Spring AMQP中,你可以使用Spring的事务管理机制来确保在消息发送或吸收过程中的原子性操作。具体来说,你可以通过以下方式在Spring AMQP中使用事务:

  • 消息发送事务: 你可以配置Spring的事务管理器来包装你的消息发送操作,确保在事务提交时消息被发送成功,而在事务回滚时消息不被发送。如允许以确保消息发送的原子性。
  • 消息吸收事务: 你可以配置Spring的消息监听器容器(Message Listener Container)来在消息处理时使用事务。如允许以确保在消息处理过程中发生异常时,消息会被重新投递给队列,并在一定的重试次数后被移到死信队列或者被抛弃,从而保证消息不会丢失。
    通过使用Spring的事务管理机制,你可以在Spring AMQP中实现消息传递过程中的事务性操作,从而确保数据的划一性和可靠性。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

民工心事

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表