Java消息队列入门详解

打印 上一主题 下一主题

主题 895|帖子 895|积分 2685

什么是消息队列?

消息队列的产生主要是为相识决系统间的异步解耦与确保最终同等性。在实际应用场景中,每每存在一些主流程操作和辅助流程操作,其中主流程需要快速响应用户请求,而辅助流程可能涉及复杂的处理逻辑或者依赖于外部服务。通过将这些辅助流程的消息放入消息队列,使得它们可以并行执行,并且不会壅闭主流程的运行,从而提高了系统的整体性能和用户体验。同时,消息队列支持至少一次的消息投递机制,保证了纵然在网络不稳固或者其他非常环境下,辅助流程也一定能够得到执行的机会,进而增强了系统的可靠性和稳固性。
什么场景要使用消息队列

在实际应用场景中,RocketMQ作为一种高效可靠的消息队列服务,能够很好地满足差别业务需求。以下通过具体场景示例来解说解耦、异步处理、削峰填谷、可靠性和扩展性这几个关键词。
解耦

假设有一个电商系统,每当用户下单时需要触发一系列操作如库存更新、订单记录生成等。如果不使用消息队列,则这些逻辑需要串行执行或直接调用,这不但增加了系统的复杂度,也使得各个组件间的依赖关系变得紧密。而采用RocketMQ后,订单服务可以作为生产者将订单信息发送到特定的主题(Topic)上,后续的库存服务和订单记录服务作为消耗者从该主题订阅并处理相干信息。如许纵然某个下游服务出现故障,也不会直接影响整个购物流程的正常运行。
异步处理

考虑一个在线支付平台,在用户完成支付动作之后,除了立即返回支付结果给前端外,后台还需要执行诸如积分更新、优惠券发放等一系列操作。如果所有步调都同步举行,则会大大增加响应时间。此时使用RocketMQ可以让支付服务快速确认交易乐成并向客户端反馈,同时异步地将相干事件发布出去供其他模块订阅处理,从而有效提升了用户体验和服务效率。
剥峰填谷

对于一些具有明显访问高峰期的应用步伐来说(比如节假日促销期间的电商平台),短时间内涌入大量请求可能会导致服务器压力剧增甚至瓦解。借助于RocketMQ如许的消息中间件,可以在流量高峰时段将超出处理能力范围内的请求暂存起来,并以可控速率逐步分发给后端服务举行处理,以此实现流量平滑化,保护核心业务不受冲击。
可靠性

数据传输过程中的安全性和完备性至关紧张。比方,在金融交易场景下,每笔交易都需要被准确无误地记录下来。RocketMQ支持事务消息功能,保证了消息发送与本地事务的同等性;同时它还提供了消息持久化存储机制以及重试策略,确保即便在网络不稳固或者系统非常环境下也能最终完成消息传递,极大增强了系统的健壮性。
扩展性

随着业务规模的增长,每每需要对现有架构举行横向扩展以应对更高的并发请求量。通过引入RocketMQ,我们可以很容易地添加更多的生产者实例来分担消息生产任务,或者摆设额外的消耗者实例以加快消息消耗速率。由于消耗者组的存在,新增节点无需改变已有配置即可自动参与到负载均衡中去,从而简化了扩展过程并提高了整体吞吐量。
消息队列的主要产物可选项:

在线业务场景适合:
RocketMQ专为高并发、低延迟和高可用性计划,特别实用于金融级的实时交易处理。它支持事务消息,确保分布式系统中的数据同等性。别的,RocketMQ提供了灵活的消息范例,包括顺序消息、定时/延时消息等,非常适合复杂的在线业务需求。
大数据传输适合:
Kafka在大规模数据处理场景中体现出色,尤其是日记收集与流式数据处理。其核心优势在于单文件存储结构,这使得Kafka在处理大量一连写入的数据时能够实现极高的效率。因此,对于需要高效处理海量数据的应用来说,Kafka是理想的选择。
需要JMS标准实现适合:
ActiveMQ作为一款完全兼容JMS1.1规范的消息中间件,非常适合那些依赖于Java企业级应用环境的项目。除了提供丰富的消息协议支持外,ActiveMQ还具备强大的功能集,如持久化消息存储、分布式的摆设模式等,可以满足多种复杂的消息路由需求。
amqp等多细微小场景适合:
RabbitMQ以其对AMQP(高级消息队列协议)的支持闻名,同时兼容STOMP、MQTT等多种协议,非常适合物联网装备间的通信及微服务架构下的解耦需求。只管RocketMQ也在增加对这些协议的支持,但目前RabbitMQ还是这类应用场景下的首选解决方案之一。
详细的使用MQ收发消息的例子(以rocketmq为例)

详细的使用MQ收发消息的例子(以RocketMQ为例)

1. 参加依赖

首先,在项目标pom.xml文件中添加对RocketMQ客户端库的依赖。这一步是必要的,以便在你的Java应用中使用RocketMQ的功能。
  1. 1 <dependency>
  2. 2     <groupId>org.apache.rocketmq</groupId>
  3. 3
  4. 4     <artifactId>rocketmq-client</artifactId>
  5. 5
  6. 6     <version>4.9.1</version>
  7. 7
  8. 8 </dependency>
复制代码
对于Gradle项目,对应的依赖声明如下:
 1 implementation 'org.apache.rocketmq:rocketmq-client:4.9.1' 
确保使用的版本与你本地或长途运行的RocketMQ服务端兼容。
2. 消息发送

下面是一个简单的生产者示例,它向指定的主题发送同步消息。这里也展示了怎样配置和启动一个DefaultMQProducer实例。
  1. 1 import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. 2 import org.apache.rocketmq.common.message.Message;
  3. 3
  4. 4 public class Producer {
  5. 5     public static void main(String[] args) throws Exception {
  6. 6         // 创建生产者实例,并设置生产者组名
  7. 7         DefaultMQProducer producer = new DefaultMQProducer("my-group");
  8. 8         // 设置NameServer地址
  9. 9         producer.setNamesrvAddr("localhost:9876");
  10. 10         // 启动生产者
  11. 11         producer.start();
  12. 12         
  13. 13         for (int i = 0; i < 10; i++) {
  14. 14             // 创建一条消息,指定主题、标签及消息内容
  15. 15             Message msg = new Message("test-topic-1", "TagA", ("Hello RocketMQ " + i).getBytes());
  16. 16            
  17. 17             // 发送消息到Broker
  18. 18             producer.send(msg);
  19. 19         }
  20. 20         
  21. 21         // 关闭生产者
  22. 22         producer.shutdown();
  23. 23     }
  24. 24 }
复制代码
3. 消耗消息

接下来,展示一个消耗者例子,说明怎样订阅特定主题的消息并处理它们。此例采用了推送模式下的DefaultMQPushConsumer。
  1. 1 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. 2 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. 4 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. 5 import org.apache.rocketmq.common.message.MessageExt;
  6. 6
  7. 7 import java.util.List;
  8. 8
  9. 9 public class Consumer {
  10. 10     public static void main(String[] args) throws Exception {
  11. 11         // 创建消费者实例,并设定消费者组名
  12. 12         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer_group");
  13. 13         // 设置NameServer地址
  14. 14         consumer.setNamesrvAddr("localhost:9876");
  15. 15         // 订阅一个或多个Topic,并通过tag过滤所需的消息
  16. 16         consumer.subscribe("test-topic-1", "*");
  17. 17         // 注册消息监听器,当有新消息到达时会触发onMessage方法
  18. 18         consumer.registerMessageListener(new MessageListenerConcurrently() {
  19. 19             @Override
  20. 20             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  21. 21                 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  22. 22                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  23. 23             }
  24. 24         });
  25. 25         // 启动消费者
  26. 26         consumer.start();
  27. 27         System.out.printf("Consumer Started.%n");
  28. 28     }
  29. 29 }
复制代码
为什么如许做可以实现异步解耦?

通过上述步调,我们能够使用RocketMQ实现消息的异步通信,从而到达系统组件之间的解耦结果。生产者将消息发布到特定的主题上,而不必关心哪些消耗者将会接收这些消息;同样地,消耗者只需订阅感爱好的主题即可获取信息,无需直接与生产者交互。这种方式不但简化了系统的复杂度,还提高了其可扩展性和灵活性。比方,在订单系统中,当用户下单后,可以立即发送一个包含订单详情的消息给库存管理服务,后者可以在空闲时处理该请求而不会壅闭用户的操作流程。别的,RocketMQ支持多种消息范例(如顺序消息、延时消息等),使得开辟者可以根据具体需求选择最适合的消息模型来进一步优化应用步伐的计划。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

玛卡巴卡的卡巴卡玛

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表