马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
一、概述
1.1先容
RocketMQ是阿里巴巴2016年MQ中心件,使用Java语言开辟,RocketMQ 是一款开源的分布式消息体系,基于高可用分布式集群技能,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
官网: http://rocketmq.apache.org/
1.2相关概念
Producer:消息的发送者,生产者;举例:发件人
Consumer:消息吸收者,消耗者;举例:收件人
Broker:临时保存生产者发送消息的服务器;举例:快递
NameServer:管理Broker;举例:各个快递公司的管理机构 相当于broker的注册中心,保留了broker的信息
Queue:队列,消息存放的位置,一个Broker中可以有多个队列
Topic:主题,消息的分类
ProducerGroup:生产者组
ConsumerGroup:消耗者组,多个消耗者组可以同时消耗一个主题的消息
消息发送的流程是,Producer询问NameServer,NameServer分配一个broker 然后Consumer也要询问NameServer,得到一个详细的broker,然后消耗消息
单机版
集群
概述
- 生产者:就是用于生产消息的应用程序。
- 消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自界说的复杂格式(只要能按预定格式解析出来即可)。
- 队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。
- 消耗者:就是用于读取队列中消息的应用程序。
1.3部署
https://blog.csdn.net/Acloasia/article/details/130548105
- version: '3'
- services:
- namesrv:
- image: apacherocketmq/rocketmq:4.6.0
- ports:
- - 9876:9876
- volumes:
- - ./data/namesrv/logs:/home/rocketmq/logs
- command: sh mqnamesrv
- broker:
- image: apacherocketmq/rocketmq:4.6.0
- ports:
- - 10909:10909
- - 10911:10911
- - 10912:10912
- user: "${UID}:3000"
- volumes:
- - ./data/broker/logs:/home/rocketmq/logs
- - ./data/broker/store:/home/rocketmq/store
- - ./broker.conf:/home/rocketmq/rocketmq-4.6.0/conf/broker.conf
- command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
- depends_on:
- - namesrv
复制代码 1.4应用场景
使用消息中心件最主要的目的:
- [1] 应用解耦
- [2] 异步处理
- [3] 流量削峰
[1] 应用解耦
场景:双11是购物狂节,用户下单后,订单体系需要通知库存体系,传统的做法就是订单体系调用库存体系的接口. `
标题:`当库存体系出现故障时,订单就会失败。 订单体系和库存体系高耦合。如何解决这个标题? 引入消息队列之后
- 订单体系:用户下单后,订单体系完成长期化处理,将消息写入消息队列,返回用户订单下单成功。
- 库存体系:订阅下单的消息,获取下单消息,举行减库操纵。 就算库存体系出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
[2] 异步处理
场景阐明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式`
- 串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个标题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等候没有必要等候的东西.
- 并行方式: `将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
- 消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回。 消息队列`: 引入消息队列后,把发送邮件、短信不是必须的业务逻辑异步处理。
结论: 由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。
[3] 流量削峰
场景: 秒杀运动,一样平常会由于流量过大,导致应用挂掉,为相识决这个标题,一样平常在应用前端加入消息队列。
作用:`
- 可以控制运动人数,超过此肯定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^) 。
- 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单) 。
- 用户的请求,服务器收到之后,起首写入消息队列,加入消息队列长度超过最大值,则直接扬弃用户请求或跳转到错误页面;
- 秒杀业务根据消息队列中的请求信息,再做后续处理;
二、快速入门
2.1现有脚色分析与流程
1 相关脚色
- 消息生产者:producer,消息生产者,web-service中web是生产者。
- 消息服务器:broker,经纪人。实现吸收、提供、长期化、过滤消息。
- 消息消耗者:consumer。消耗消息,web-service中service是消耗者。
- 上述三个脚色都可以搭建集群,实现高可用;
- 监听器监听broker,消耗者监听broker,有消息就消耗
- 偏移量(offset):消耗者需要从代理服务器中获取消息,消耗使用;消耗完之后并没有删除,而是打了一个已经消耗完的标签;偏移量记载的就是全部已经消耗过的数据的编码。
- 命名服务器:NameServer [cluster],统筹管理前前三个脚色
- 消息组成:消息体(body)、主题(Topic)、标签(tag子主题)
- broker组成:内含多个不同主题(Topic),每个topic中包含多个队列(默认4个)
2 工作流程
常见概念
消息(Message)
消息是 Apache RocketMQ 中的最小数据传输单位。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消耗端举行消耗。
普通理解: 消息就是自己想要通报业务数据
主题(Topic)
主题 是Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。
普通理解: 就是用来给发送消息举行分类。一个消息发送者可以发送消息到一个或多个主题,一个消息消耗者也可以消耗一个或多个主题的消息。
消息类型(MessageType)
Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事件消息和定时/延时消息。
注意:Apache RocketMQ 从5.0版本开始,支持逼迫校验消息类型,即每个主题Topic只允许发送一种消息类型的消息,如许可以更好的运维和管理生产体系,避免混乱。但同时保证向下兼容4.x版本行为,逼迫校验功能默认开启。
消息队列(MessageQueue)
队列是 Apache RocketMQ 中消息存储和传输的现实容器,也是消息的最小存储单位。 Apache RocketMQ 的全部主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。
普通理解: 就是 topic 的分区,用来更好实现队列数量的水平拆分和队列内部的流式存储。(水平拆分意味着可以通过增长更多的队列来提高体系的并行处理能力,而流式存储则是指队列可以连续吸收和发送消息,适用于高吞吐量的场景。)
消耗者分组(ConsumerGroup)
消耗者分组是Apache RocketMQ 体系中承载多个消耗行为同等的消耗者的负载均衡分组。和消耗者不同,消耗者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消耗者分组内初始化多个消耗者实现消耗性能的水平扩展以及高可用容灾。
订阅关系(Subscription)
Apache RocketMQ 发布订阅模型中消息过滤、重试、消耗进度的规则设置。订阅关系以消耗组粒度举行管理,消耗组通过界说订阅关系控制指定消耗组下的消耗者如何实现消息过滤、消耗重试及消耗进度恢复等。
2.2消息发送和监听的流程
消息生产者
1.创建消息生产者producer,并订定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体等
5.发送消息
6.关闭生产者producer
消息消耗者
1.创建消耗者consumer,订定消耗者组名
2.指定Nameserver地址
3.创建监听订阅主题Topic和Tag等
4.处理消息
5.启动消耗者consumer
2.3原生API的spring整合
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.9.2</version>
- </dependency>
复制代码 生产者
- @Test
- public void testProducer() throws Exception {
- // 创建默认的生产者,并设置生产者组名为"test-group"
- DefaultMQProducer producer = new DefaultMQProducer("test-group");
- // 设置nameServer地址为本地的9876端口
- producer.setNamesrvAddr("host:9876");
- // 启动生产者实例
- producer.start();
- // 循环发送10条消息
- for (int i = 0; i < 10; i++) {
- // 创建消息对象,指定主题为"TopicTest",内容为"Hello RocketMQ "加上当前循环次数
- Message msg = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes());
- // 发送消息,并获取发送结果
- SendResult send = producer.send(msg);
- // 打印发送结果
- System.out.println(send);
- }
- // 关闭生产者实例
- producer.shutdown();
- }
复制代码 消耗者
- @Test
- public void testConsumer() throws Exception {
- // 创建默认消费者组
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
- // 设置nameServer地址
- consumer.setNamesrvAddr("host:9876");
- // 订阅一个主题来消费,*表示没有过滤参数,表示这个主题的任何消息
- consumer.subscribe("TopicTest", "*");
- // 注册一个消费监听,MessageListenerConcurrently是多线程消费,默认20个线程,可以参看consumer.setConsumeThreadMax()
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- // 打印当前线程名和接收到的消息
- System.out.println(Thread.currentThread().getName() + "----" + msgs);
- // 返回消费的状态,如果是CONSUME_SUCCESS则成功,若为RECONSUME_LATER则该条消息会被重回队列,重新被投递
- // 重试的时间为messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- // 也就是第一次1s,第二次5s,第三次10s,.... 如果重试了18次,那么这个消息就会被终止发送给消费者
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- });
- // 这个start一定要写在registerMessageListener下面
- consumer.start();
- // 阻塞等待用户输入,防止程序立即退出
- System.in.read();
- }
复制代码 2.4消耗模式
MQ的消耗模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push是服务端【MQ】自动推送消息给客户端,优点是实时性较好,但假如客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积以致崩溃。
Pull是客户端需要自动到服务端取数据,优点是客户端可以依据自己的消耗能力举行消耗,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取隔断长又容易造成消耗不实时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一样平常场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,比方电商大促,抢优惠券等场景可以选择pull模式
RocketMQ发送同步消息
上面的快速入门就是发送同步消息,发送过后会有一个返回值,也就是mq服务器吸收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到全部的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方
RocketMQ发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等候Broker的响应。发送完以后会有一个异步消息通知
生产者
- @Test
- public void testAsyncProducer() throws Exception {
- // 创建默认的生产者,指定生产者组名为"test-group"
- DefaultMQProducer producer = new DefaultMQProducer("test-group");
- // 设置nameServer地址为本地的9876端口
- producer.setNamesrvAddr("localhost:9876");
- // 启动生产者实例
- producer.start();
- // 创建一个消息对象,主题为"TopicTest",内容为"异步消息"的字节数组
- Message msg = new Message("TopicTest", ("异步消息").getBytes());
- // 发送消息,并提供一个回调函数来处理发送结果
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- // 当消息发送成功时,打印"发送成功"
- System.out.println("发送成功");
- }
- @Override
- public void onException(Throwable e) {
- // 当消息发送失败时,打印"发送失败"
- System.out.println("发送失败");
- }
- });
- // 打印一条信息,用于观察回调函数是否已经执行
- System.out.println("看看谁先执行");
- // 挂起jvm,等待回调函数执行完成,因为回调是异步的,如果不挂起jvm,测试可能无法观察到回调的效果
- System.in.read();
- // 关闭生产者实例
- producer.shutdown();
- }
复制代码 消耗者
- @Test
- public void testAsyncConsumer() throws Exception {
- // 创建默认消费者组
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
- // 设置nameServer地址
- consumer.setNamesrvAddr("localhost:9876");
- // 订阅一个主题来消费,*表示没有过滤参数,表示这个主题的任何消息
- consumer.subscribe("TopicTest", "*");
- // 注册一个消费监听,MessageListenerConcurrently是并发消费
- // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- // 这里执行消费的代码,默认是多线程消费
- System.out.println(Thread.currentThread().getName() + "----" + msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- // 启动消费者
- consumer.start();
- // 等待用户输入,防止程序立即退出
- System.in.read();
- }
复制代码 RocketMQ发送单向消息
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,比方日记信息的发送
生产者
- @Test
- public void testOnewayProducer() throws Exception {
- // 创建默认的生产者,设置生产者组名为"test-group"
- DefaultMQProducer producer = new DefaultMQProducer("test-group");
- // 设置nameServer地址为本地地址,端口为9876
- producer.setNamesrvAddr("localhost:9876");
- // 启动生产者实例
- producer.start();
- // 创建一个消息对象,主题为"TopicTest",内容为"单向消息"的字节数组
- Message msg = new Message("TopicTest", ("单向消息").getBytes());
- // 发送单向消息,不需要等待服务器响应
- producer.sendOneway(msg);
- // 关闭生产者实例,释放资源
- producer.shutdown();
- }
复制代码 消耗者
消耗者和上面一样
RocketMQ发送延迟消息
消息放入mq后,过一段时间,才会被监听到,然后消耗
好比下订单业务,提交了一个订单就可以发送一个延时消息,30min后去查抄这个订单的状态,假如还是未付款就取消订单释放库存。
- @Test
- public void testDelayProducer() throws Exception {
- // 创建默认的生产者,并设置生产者组名为"test-group"
- DefaultMQProducer producer = new DefaultMQProducer("test-group");
- // 设置nameServer地址为本地的9876端口
- producer.setNamesrvAddr("localhost:9876");
- // 启动生产者实例
- producer.start();
- // 创建一个消息对象,主题为"TopicTest",内容为"延迟消息"的字节数组
- Message msg = new Message("TopicTest", ("延迟消息").getBytes());
- // 给这个消息设定一个延迟等级,这里设置为3,表示延迟5秒发送
- // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
- msg.setDelayTimeLevel(3);
- // 使用生产者发送单向消息
- producer.send(msg);
- // 打印当前时间,用于观察消息发送的时间
- System.out.println(new Date());
- // 关闭生产者实例
- producer.shutdown();
- }
复制代码 RocketMQ批量消息
生产者
- @Test
- public void testBatchProducer() throws Exception {
- // 创建默认的生产者
- DefaultMQProducer producer = new DefaultMQProducer("test-group");
- // 设置nameServer地址
- producer.setNamesrvAddr("localhost:9876");
- // 启动实例
- producer.start();
- List<Message> msgs = Arrays.asList(
- new Message("TopicTest", "我是一组消息的A消息".getBytes()),
- new Message("TopicTest", "我是一组消息的B消息".getBytes()),
- new Message("TopicTest", "我是一组消息的C消息".getBytes())
- );
- SendResult send = producer.send(msgs);
- System.out.println(send);
- // 关闭实例
- producer.shutdown();
- }
复制代码 消耗者
- @Test
- public void testBatchConsumer() throws Exception {
- // 创建默认消费者组
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
- // 设置nameServer地址
- consumer.setNamesrvAddr("localhost:9876");
- // 订阅一个主题来消费 表达式,默认是*
- consumer.subscribe("TopicTest", "*");
- // 注册一个消费监听 MessageListenerConcurrently是并发消费
- // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- // 这里执行消费的代码 默认是多线程消费
- System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.in.read();
- }
复制代码 RocketMQ发送顺序消息
消息有序指的是可以按照消息的发送顺序来消耗(FIFO)。RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序。
大概大家会有疑问,mq不就是FIFO吗?
rocketMq的broker的机制,导致了rocketMq会有这个标题
由于一个broker中对应了四个queue
顺序消耗的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消耗消息的时候从多个queue上拉取消息,这种情况发送和消耗是不能保证顺序。但是假如控制发送的顺序消息只依次发送到同一个queue中,消耗的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消耗参与的queue只有一个,则是全局有序;假如多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单举行分区有序的示例。一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消耗时,同一个顺序获取到的肯定是同一个队列。
生产者
- @Test
- public void testOrderlyProducer() throws Exception {
- // 创建默认的生产者
- DefaultMQProducer producer = new DefaultMQProducer("test-group");
- // 设置nameServer地址
- producer.setNamesrvAddr("localhost:9876");
- // 启动实例
- producer.start();
- List<Order> orderList = Arrays.asList(
- new Order(1, 111, 59D, new Date(), "下订单"),
- new Order(2, 111, 59D, new Date(), "物流"),
- new Order(3, 111, 59D, new Date(), "签收"),
- new Order(4, 112, 89D, new Date(), "下订单"),
- new Order(5, 112, 89D, new Date(), "物流"),
- new Order(6, 112, 89D, new Date(), "拒收")
- );
- // 循环集合开始发送
- orderList.forEach(order -> {
- Message message = new Message("TopicTest", order.toString().getBytes());
- try {
- // 发送的时候 相同的订单号选择同一个队列
- producer.send(message, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- // 当前主题有多少个队列
- int queueNumber = mqs.size();
- // 这个arg就是后面传入的 order.getOrderNumber()
- Integer i = (Integer) arg;
- // 用这个值去%队列的个数得到一个队列
- int index = i % queueNumber;
- // 返回选择的这个队列即可 ,那么相同的订单号 就会被放在相同的队列里 实现FIFO了
- return mqs.get(index);
- }
- }, order.getOrderNumber());
- } catch (Exception e) {
- System.out.println("发送异常");
- }
- });
- // 关闭实例
- producer.shutdown();
- }
复制代码 消耗者
- @Test
- public void testOrderlyConsumer() throws Exception {
- // 创建默认消费者组
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
- // 设置nameServer地址
- consumer.setNamesrvAddr("localhost:9876");
- // 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息
- consumer.subscribe("TopicTest", "*");
- // 注册一个消费监听 MessageListenerOrderly 是顺序消费 单线程消费
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- MessageExt messageExt = msgs.get(0);
- System.out.println(new String(messageExt.getBody()));
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- consumer.start();
- System.in.read();
- }
复制代码 RocketMQ发送带标签的消息
Rocketmq提供消息过滤功能,通过tag或者key举行区分
我们往一个主题内里发送消息的时候,根据业务逻辑,大概需要区分,好比带有tagA标签的被A消耗,带有tagB标签的被B消耗,另有在事件监听的类内里,只要是事件消息都要走同一个监听,我们也需要通过过滤才区别对待
tag方法
标签消息生产者
- @Test
- public void tagProducer() throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
- producer.setNamesrvAddr(":9876");
- producer.start();
- Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());
- Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());
- producer.send(message);
- producer.send(message2);
- System.out.println("发送成功");
- producer.shutdown();
- }
复制代码 标签消息消耗者
- /**
- * vip1
- *
- * @throws Exception
- */
- @Test
- public void tagConsumer1() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
- consumer.setNamesrvAddr("47.96.254.46:9876");
- consumer.subscribe("tagTopic", "vip1");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- System.out.println("我是vip1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.in.read();
- }
- /**
- * vip1 || vip2
- *
- * @throws Exception
- */
- @Test
- public void tagConsumer2() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
- consumer.setNamesrvAddr("47.96.254.46:9876");
- consumer.subscribe("tagTopic", "vip1 || vip2");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- System.out.println("我是vip2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.in.read();
- }
复制代码 总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有肯定的接洽,而 Tag 则用来区分同一个 Topic 下相互关联的消息,比方全集和子集的关系、流程先后的关系。
Key方法
在rocketmq中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或者key来举行查询
生产者
- @Test
- public void keyProducer() throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
- producer.setNamesrvAddr("");
- producer.start();
- String key = UUID.randomUUID().toString();
- System.out.println(key);
- Message message = new Message("keyTopic", "vip1", key, "我是vip1的文章".getBytes());
- producer.send(message);
- System.out.println("发送成功");
- producer.shutdown();
- }
复制代码 消耗者
- @Test
- public void keyConsumer() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group");
- consumer.setNamesrvAddr("");
- consumer.subscribe("keyTopic", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- MessageExt messageExt = msgs.get(0);
- System.out.println("我是vip1的消费者,我正在消费消息" + new String(messageExt.getBody()));
- System.out.println("我们业务的标识:" + messageExt.getKeys());
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.in.read();
- }
复制代码 2.5重复消耗标题
在BROADCASTING(广播)模式下,全部注册的消耗者都会收到消息,通常这些消耗者是集群部署的微服务,导致多台机器重复消耗,这取决于需求。在CLUSTERING(负载均衡)模式下,假如多个consumerGroup消耗同一个topic,也会发生重复消耗。对于同一个consumerGroup,虽然一个队列只分配给一个消耗者看似避免重复消耗,但在消耗者上下线时需重新负载均衡,大概导致新消耗者重复消耗未提交offset的消息。此外,在发送批量消息时,若部分失败,则整个批量消息会被重新消耗。
消息会重复
1.生产者多次投递了
2.消耗者方由于扩容时会重试
生产者
- @Test
- void repeatProducer() throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");
- producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
- producer.start();
- String key = UUID.randomUUID().toString();
- System.out.println(key);
- // 测试 发两个key一样的消息
- Message m1 = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());
- Message m1Repeat = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());
- producer.send(m1);
- producer.send(m1Repeat);
- System.out.println("发送成功");
- producer.shutdown();
- }
复制代码 消耗组
- @Test
- public void testRepeatConsumer() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
- consumer.setNamesrvAddr("47.96.254.46:9876");
- consumer.subscribe("repeatTopic", "*");
- // 注册一个消费监听 MessageListenerConcurrently是并发消费
- // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- // 拿到消息的key
- MessageExt messageExt = msgs.get(0);
- String keys = messageExt.getKeys();
- // 判断是否存在布隆过滤器中
- if (bloomFilter.contains(keys)) {
- // 直接返回了 不往下处理业务
- System.out.println("消息重复了");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- // 这个处理业务,然后放入过滤器中
- // do sth...
- bloomFilter.add(keys);
- System.out.println("我是消费者,我正在消费消息" + new String(messageExt.getBody()));
- System.out.println("我们业务的标识:" + messageExt.getKeys());
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.in.read();
- }
复制代码 2.6RocketMQ重试机制
生产者重试
// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);
- @Test
- public void retryProducer() throws Exception {
- // 创建一个名为"retry-producer-group"的DefaultMQProducer实例
- DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
- // 设置NameServer地址,用于生产者与Broker通信
- producer.setNamesrvAddr(":9876");
- // 启动生产者
- producer.start();
- // 设置生产者发送消息失败时的重试次数为2次
- producer.setRetryTimesWhenSendFailed(2);
- // 设置异步发送消息失败时的重试次数为2次
- producer.setRetryTimesWhenSendAsyncFailed(2);
- // 生成一个随机的UUID作为消息的key
- String key = UUID.randomUUID().toString();
- // 打印生成的key
- System.out.println(key);
- // 创建一个消息实例,包含主题、队列、key和内容
- Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes());
- // 发送消息
- producer.send(message);
- // 打印发送成功的提示信息
- System.out.println("发送成功");
- // 关闭生产者
- producer.shutdown();
- }
复制代码 在消耗者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就会实行重试
上图代码中阐明了,我们再现实生产过程中,一样平常重试3-5次,假如还没有消耗成功,则可以把消息签收了,通知人工等处理
- /**
- * 重试的时间间隔
- * 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- * 默认重试16次
- * 1.能否自定义重试次数
- * 2.如果重试了16次(并发模式) 顺序模式下(int最大值次)都是失败的? 是一个死信消息 则会放在一个死信主题中去 主题的名称:%DLQ%retry-consumer-group
- * 3.当消息处理失败的时候 该如何正确的处理?
- * --------------
- * 重试的次数一般 5次
- * @throws Exception
- */
- @Test
- public void retryConsumer() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
- consumer.setNamesrvAddr(":9876");
- consumer.subscribe("retryTopic", "*");
- // 设定重试次数
- consumer.setMaxReconsumeTimes(2);
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- MessageExt messageExt = msgs.get(0);
- System.out.println(new Date());
- System.out.println(messageExt.getReconsumeTimes());
- System.out.println(new String(messageExt.getBody()));
- // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- });
- consumer.start();
- System.in.read();
- }
复制代码 2.7RocketMQ死信消息
当消耗重试到达阈值以后,消息不会被投递给消耗者了,而是进入了死信队列
当一条消息初次消耗失败,RocketMQ会自动举行消息重试,达到最大重试次数后,若消耗依然失败,则表明消耗者在正常情况下无法正确地消耗该消息。此时,该消息不会立即被丢弃,而是将其发送到该消耗者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。假如产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消耗。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。我们也可以去监听死信队列,然后举行自己的业务上的逻辑
- /// 直接监听死信主题的消息,记录下拉 通知人工接入处理
- @Test
- public void retryDeadConsumer() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
- consumer.setNamesrvAddr("9876");
- consumer.subscribe("%DLQ%retry-consumer-group", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- MessageExt messageExt = msgs.get(0);
- System.out.println(new Date());
- System.out.println(new String(messageExt.getBody()));
- System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
- // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.in.read();
- }
复制代码 第二种方法
- 第二种方案 用法比较多
- @Test
- public void retryConsumer2() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
- consumer.setNamesrvAddr("47.96.254.46:9876");
- consumer.subscribe("retryTopic", "*");
- // 设定重试次数
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- MessageExt messageExt = msgs.get(0);
- System.out.println(new Date());
- // 业务处理
- try {
- handleDb();
- } catch (Exception e) {
- // 重试
- int reconsumeTimes = messageExt.getReconsumeTimes();
- if (reconsumeTimes >= 3) {
- // 不要重试了
- System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.in.read();
- }
- private void handleDb() {
- int i = 10 / 0;
- }
复制代码 三、springboot整合
写依赖
- rocketmq:
- name-server: 47.96.254.46:9876
- producer:
- group: boot-producer-group
- enable-msg-trace: true # ??????????
- access-key: rocketmq2
- secret-key: 12345678
复制代码 写设置
- rocketmq:
- name-server: :9876
- producer:
- group: boot-producer-group
- enable-msg-trace: true # ??????????
- access-key: rocketmq2
- secret-key: 12345678
复制代码 报错No qualifying bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' available: expected at least 1 bean which qualifies as autowire candidate.
解决
https://blog.csdn.net/zhenweiyi/article/details/130722046
3.1常见消息誊写
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
- @Test
- void contextLoads() {
- // 同步
- // rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");
- // // 异步
- rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println("成功");
- }
- @Override
- public void onException(Throwable throwable) {
- System.out.println("失败" + throwable.getMessage());
- }
- });
- //
- // 单向
- rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");
- // 延迟
- Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
- rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3);
- // 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去 消费者 需要单线程消费
- List<MsgModel> msgModels = Arrays.asList(
- new MsgModel("qwer", 1, "下单"),
- new MsgModel("qwer", 1, "短信"),
- new MsgModel("qwer", 1, "物流"),
- new MsgModel("zxcv", 2, "下单"),
- new MsgModel("zxcv", 2, "短信"),
- new MsgModel("zxcv", 2, "物流")
- );
- msgModels.forEach(msgModel -> {
- // 发送 一般都是以json的方式进行处理
- rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
- });
- }
复制代码 3.2标签消息
- @Test
- void tagKeyTest() throws Exception {
- // topic:tag
- rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
- // key是写带在消息头的
- Message<String> message = MessageBuilder.withPayload("我是一个带key的消息")
- .setHeader(RocketMQHeaders.KEYS, "qwertasdafg")
- .build();
- rocketMQTemplate.syncSend("bootKeyTopic", message);
- }
复制代码 3.3常见消耗监听
- @Component
- @RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
- public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {
- /**
- * 这个方法就是消费者的方法
- * 如果泛型制定了固定的类型 那么消息体就是我们的参数
- * MessageExt 类型是消息的所有内容
- * ------------------------
- * 没有报错 就签收了
- * 如果报错了 就是拒收 就会重试
- *
- * @param message
- */
- @Override
- public void onMessage(MessageExt message) {
- System.out.println("消息的id:" + message.getMsgId());
- System.out.println(new String(message.getBody()));
- }
- }
复制代码- @Component
- @RocketMQMessageListener(topic = "bootOrderlyTopic",
- consumerGroup = "boot-orderly-consumer-group",
- consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
- maxReconsumeTimes = 5 // 消费重试的次数
- )
- public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {
- @Override
- public void onMessage(MessageExt message) {
- System.out.println(121212);
- MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
- System.out.println(msgModel);
- }
- }
复制代码 3.4标签消息监听
- @Component
- @RocketMQMessageListener(topic = "bootTagTopic",
- consumerGroup = "boot-tag-consumer-group",
- selectorType = SelectorType.TAG,// tag过滤模式
- selectorExpression = "tagA || tagB"
- // selectorType = SelectorType.SQL92,// sql92过滤模式
- // selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
- )
- public class CTagMsgListener implements RocketMQListener<MessageExt> {
- @Override
- public void onMessage(MessageExt message) {
- System.out.println(new String(message.getBody()));
- }
- }
复制代码 3.5RocketMQ集成SpringBoot发送不同消息模式
Rocketmq消息消耗的模式分为两种:
负载均衡模式和广播模式负载均衡模式表示多个消耗者瓜代消耗同一个主题内里的消息
广播模式表示每个每个消耗者都消耗一遍订阅的主题的消息
- / 测试消息消费模式 集群模块 广播模式
- @Test
- void modeTest() throws Exception {
- for (int i = 1; i <= 5; i++) {
- rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息");
- }
- }
复制代码 集群模式
- /**
- * [CLUSTERING] 集群模式下 队列会被消费者分摊, 队列数量>=消费者数量 消息的消费位点 mq服务器会记录处理
- * BROADCASTING 广播模式下 消息会被每一个消费者都处理一次, mq服务器不会记录消费点位,也不会重试
- */
- @Component
- @RocketMQMessageListener(topic = "modeTopic",
- consumerGroup = "mode-consumer-group-a",
- messageModel = MessageModel.CLUSTERING, // 集群模式 负载均衡
- consumeThreadNumber = 40
- )
- public class DC1 implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("我是mode-consumer-group-a组的第一个消费者:" + message);
- }
- }
复制代码 广播模式
- @Component
- @RocketMQMessageListener(topic = "modeTopic",
- consumerGroup = "mode-consumer-group-b",
- messageModel = MessageModel.BROADCASTING // 广播模式
- )
- public class DC4 implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("我是mode-consumer-group-b组的第一个消费者:" + message);
- }
- }
复制代码 3.6解决消息堆积标题
一样平常认为单条队列消息差值>=10w时 算堆积标题
1.生产太快了
生产方可以做业务限流
增长消耗者数量,但是消耗者数量<=队列数量,适当的设置最大的消耗线程数量(根据IO(2n)/CPU(n+1))
动态扩容队列数量,从而增长消耗者数量
2.消耗者消耗出现标题
排查消耗者程序的标题
- // 积压问题
- @Test
- void jyTest() throws Exception {
- for (int i = 1001; i <= 1200; i++) {
- rocketMQTemplate.syncSend("jyTopic", "我是第" + i + "个消息");
- }
- }
复制代码- @Component
- @RocketMQMessageListener(topic = "jyTopic",
- consumerGroup = "jy-consumer-group",
- consumeThreadNumber = 40,
- consumeMode = ConsumeMode.CONCURRENTLY
- )
- public class EJyListener1 implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("我是第一个消费者:" + message);
- }
- }
复制代码 3.7确保消息不丢失
- 生产者使用同步发送模式 ,收到mq的返回确认以后 顺便往自己的数据库内里写msgId status(0) time
- 消耗者消耗以后 修改数据这条消息的状态 = 1
- 写一个定时任务 隔断两天去查询数据 假如有status = 0 and time < day-2
- 将mq的刷盘机制设置为同步刷盘
- 使用集群模式 ,搞主备模式,将消息长期化在不同的硬件上
- 可以开启mq的trace机制,消息跟踪机制
1.在broker.conf中开启消息追踪
traceTopicEnable=true
2.重启broker即可
3.生产者设置文件开启消息轨迹
enable-msg-trace: true
4.消耗者开启消息轨迹功能,可以给单独的某一个消耗者开启
enableMsgTrace = true
在rocketmq的面板中可以查看消息轨迹
默认会将消息轨迹的数据存在 RMQ_SYS_TRACE_TOPIC 主题内里
- @Test
- void jyTest() throws Exception {
- for (int i = 1001; i <= 1200; i++) {
- System.out.println("我是第" + i + "个消息");
- rocketMQTemplate.syncSend("jumpTopic", "我是第" + i + "个消息");
- }
- }
复制代码- @Component
- @RocketMQMessageListener(topic = "jumpTopic",
- consumerGroup = "jump-consumer-group",
- consumeThreadNumber = 40,
- consumeMode = ConsumeMode.CONCURRENTLY
- )
- public class FJumpListener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("我是消费者:" + message);
- }
- }
复制代码- // 轨迹消息
- @Test
- void traceTest() throws Exception {
- rocketMQTemplate.syncSend("traceTopic", "我是第个消息");
- }
复制代码- @Component
- @RocketMQMessageListener(topic = "traceTopic",
- consumerGroup = "trace-consumer-group",
- consumeThreadNumber = 40,
- consumeMode = ConsumeMode.CONCURRENTLY,
- enableMsgTrace = true // 开启消费者方的轨迹
- )
- public class GTraceListener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("我是消费者:" + message);
- }
- }
复制代码 3.8安全
- 开启acl的控制 在broker.conf中开启aclEnable=true
- 设置账号暗码 修改plain_acl.yml
- 修改控制面板的设置文件 放开52/53行 把49行改为true 上传到服务器的jar包平级目次下即可
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |