IT评测·应用市场-qidao123.com

标题: RocketMQ 学习 [打印本页]

作者: 光之使者    时间: 2023-4-4 14:29
标题: RocketMQ 学习
前言

  RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等
本篇文章第一部分属于一些核心概念和工作流程的讲解;第二部分就是纯手动搭建了一套环境;第三部分是基于环境进行测试和集成到SpringBoot
核心概念

这里有组的概念是因为可以用来做到不同的生产者组或者消费者组有不同的配置,这样就可以使得生产者或者消费者更加灵活。
工作流程


通过这张图就可以很清楚的知道,RocketMQ大致的工作流程:
就跟上面的图一样,整体的工作流程还是比较简单的,这里简化了很多概念,主要是为了好理解。
环境搭建

  通过上面分析,我们知道,在RocketMQ中有NameServer、Broker、生产者、消费者四种角色。而生产者和消费者实际上就是业务系统,所以这里不需要搭建,真正要搭建的就是NameServer和Broker,但是为了方便RocketMQ数据的可视化,这里多搭建一套可视化的服务。
搭建过程比较简单,按照步骤一步一步来就可以完成,如果提示一些命令不存在,那么直接通过yum安装这些命令就行。
一、准备

需要准备一个linux服务器,需要先安装好JDK
关闭防火墙
  1. systemctl stop firewalld
  2. systemctl disable firewalld
复制代码
下载并解压RocketMQ

1、创建一个目录,用来存放rocketmq相关的东西
  1. mkdir /usr/rocketmq
  2. cd /usr/rocketmq
复制代码
2、下载并解压rocketmq

下载
  1. wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
复制代码
解压
  1. unzip rocketmq-all-4.7.1-bin-release.zip
复制代码
如果提示unzip: Command Not Found
通过yum命令安装,如果已经安装了,请忽略
  1. yum install -y unzip zip
复制代码
看到这一个文件夹就完成了

然后进入rocketmq-all-4.7.1-bin-release文件夹
  1. cd rocketmq-all-4.7.1-bin-release
复制代码
RocketMQ的东西都在这了

二、搭建NameServer

在启动NameServer之前,强烈建议修改一下启动时的jvm参数,因为默认的参数都比较大,为了避免内存不够,建议修改小,当然,如果你的内存足够大,可以忽略。
  1. vi bin/runserver.sh
复制代码
修改画圈的这一行

 可以设置小一点
  1. -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=50m
复制代码
启动NameServer

修改完之后,执行如下命令就可以启动NameServer了
  1. nohup sh bin/mqnamesrv &
复制代码
查看NameServer日志
  1. tail -f ~/logs/rocketmqlogs/namesrv.log
复制代码
如果看到如下的日志,就说明启动成功了
 
关闭NameServer
  1. sh /bin mqshutdown namesrv
复制代码
三、搭建Broker

这里启动单机版的Broker
修改jvm参数

跟启动NameServer一样,也建议去修改jvm参数
  1. vi bin/runbroker.sh
复制代码
将画圈的地方设置小点,当然也别太小啊
 
 可以这样设置
  1. -server -Xms1g -Xmx1g -Xmn512m
复制代码
修改Broker配置文件broker.conf

这里需要改一下Broker配置文件,需要指定NameServer的地址,因为需要Broker需要往NameServer注册
  1. vi conf/broker.conf
复制代码
Broker配置文件

这里就能看出Broker的配置了,什么Broker集群的名称啊,Broker的名称啊,Broker的id啊,都跟前面说的对上了。
在文件末尾追加地址
  1. namesrvAddr = localhost:9876
复制代码
因为NameServer跟Broker在同一台机器,所以是localhost,NameServer端口默认的是9876。
不过这里我还建议再修改一处信息,因为Broker向NameServer进行注册的时候,带过去的ip如果不指定就会自动获取,但是自动获取的有个坑,就是有可能你的电脑无法访问到这个自动获取的ip,所以我建议手动指定你的电脑可以访问到的服务器ip。
我的虚拟机的ip是192.168.3.158,所以就指定为192.168.3.158,如下
  1. brokerIP1 = 192.168.3.158
  2. brokerIP2 = 192.168.3.158
复制代码
开启自动创建Topic
  1. autoCreateTopicEnable = true
复制代码
如果以上都配置的话,最终的配置文件应该如下,红圈的为新加的

启动Broker
  1. nohup sh bin/mqbroker -c conf/broker.conf &
复制代码
-c 参数就是指定配置文件
查看日志
  1. tail -f ~/logs/rocketmqlogs/broker.log
复制代码
当看到如下日志就说明启动成功了

关闭Broker
  1. sh /bin mqshutdown broker
复制代码
查看Broker 与NameServer是否运行
  1. jps
复制代码

 说明Broker与NameServer是运行状态
四、搭建可视化控制台

其实前面NameServer和Broker搭建完成之后,就可以用来收发消息了,但是为了更加直观,可以搭一套可视化的服务。
可视化服务其实就是一个jar包,启动就行了。
jar包可以从这获取
链接:https://pan.baidu.com/s/16s1qwY2qzE2bxR81t5Wm6w

提取码:s0sd

将jar包上传到服务器,放到/usr/rocketmq的目录底下,当然放哪都无所谓,这里只是为了方便,因为rocketmq的东西都在这里
然后进入/usr/rocketmq下,执行如下命名
  1. nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 rocketmq-console-ng-1.0.1.jar &
复制代码
rocketmq.config.namesrvAddr就是用来指定NameServer的地址的
查看日志
  1. tail -f ~/logs/consolelogs/rocketmq-console.log
复制代码
当看到如下日志,就说明启动成功了

然后在浏览器中输入http://linux服务器的ip:8088/就可以看到控制台了,如果无法访问,可以看看防火墙有没有关闭
 
通过控制台可以查看生产者、消费者、Broker集群等信息,非常直观。
功能很多,这里就不一一介绍了。
测试

环境搭好之后,就可以进行测试了。
引入依赖
  1. <dependency>
  2.     <groupId>org.apache.rocketmq</groupId>
  3.     <artifactId>rocketmq-client</artifactId>
  4.     <version>4.7.1</version>
  5. </dependency>
复制代码
生产者发送消息
  1.     @Test
  2.     public void sendTest() throws Exception{
  3.         //创建一个生产者,指定生产者组为ldProducer
  4.         DefaultMQProducer producer = new DefaultMQProducer("ldProducer");
  5.         // 指定NameServer的地址
  6.         producer.setNamesrvAddr("192.168.3.158:9876");
  7.         // 第一次发送可能会超时,我设置的比较大
  8.         producer.setSendMsgTimeout(60000);
  9.         // 启动生产者
  10.         producer.start();
  11.         // 创建一条消息
  12.         // topic为 ldTopic
  13.         // 消息内容为 java学习日记
  14.         // tags 为 TagA
  15.         Message msg = new Message("ldTopic", "TagA", "java学习日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));
  16.         // 发送消息并得到消息的发送结果,然后打印
  17.         SendResult sendResult = producer.send(msg);
  18.         System.out.printf("%s%n", sendResult);
  19.         // 关闭生产者
  20.         producer.shutdown();
  21.     }
复制代码
消费者消费消息
  1. public class ConsumerMsg {
  2.     public static void main(String[] args) throws Exception {
  3.         // 通过push模式消费消息,指定消费者组
  4.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ldConsumer");
  5.         consumer.setNamesrvAddr("192.168.3.158:9876");
  6.         // 订阅这个topic下的所有的消息
  7.         consumer.subscribe("ldTopic", "*");
  8.         // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
  9.         consumer.registerMessageListener(new MessageListenerConcurrently() {
  10.             @Override
  11.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  12.                 for (MessageExt msg : msgs) {
  13.                     System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
  14.                 }
  15.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  16.             }
  17.         });
  18.         consumer.start();
  19.         System.out.printf("Consumer Started.%n");
  20.     }
  21. }
复制代码
启动之后,消费者就会消费刚才生产者发送的消息,于是控制台就打印出如下信息
 
再去看控制台,已消费
 
SpringBoot环境下集成RocketMQ

集成

在实际项目中肯定不会像上面测试那样用,都是集成SpringBoot的。
1、引入依赖
  1. <dependency>
  2.     <groupId>org.apache.rocketmq</groupId>
  3.     <artifactId>rocketmq-spring-boot-starter</artifactId>
  4.     <version>2.1.1</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.springframework.boot</groupId>
  8.     <artifactId>spring-boot-starter-test</artifactId>
  9.     <version>2.1.1.RELEASE</version>
  10. </dependency>
复制代码
2、yml配置
  1. rocketmq:
  2.   producer:
  3.     group: ldProducer
  4.   name-server: 192.168.3.158:9876
复制代码
3、创建消费者

SpringBoot底下只需要实现RocketMQListener接口,然后加上@RocketMQMessageListener注解即可
  1. @Component
  2. @RocketMQMessageListener(consumerGroup = "ldConsumer", topic = "ldDelayTaskTopic")
  3. @Slf4j
  4. public class LdRocketMQListener implements RocketMQListener<String> {
  5.     @Override
  6.     public void onMessage(String msg) {
  7.         log.info("获取到延迟任务消息:{}",msg);
  8.     }
  9. }
复制代码
@RocketMQMessageListener需要指定消费者属于哪个消费者组,消费哪个topic,NameServer的地址已经通过yml配置文件配置类
4、测试
  1. @RestController
  2. @Slf4j
  3. public class RocketMQDelayTaskController {
  4.     @Resource
  5.     private DefaultMQProducer producer;
  6.     @GetMapping("/rocketmq/add")
  7.     public void addTask(@RequestParam("task") String task) throws Exception {
  8.         Message msg = new Message("ldDelayTaskTopic", "TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET));
  9.         msg.setDelayTimeLevel(2);
  10.         // 发送消息并得到消息的发送结果,然后打印
  11.         log.info("提交延迟任务");
  12.         producer.send(msg);
  13.     }
  14. }
复制代码
 

可能遇到的问题

搭完mq单主单从集群之后,美滋滋想发一下message, 没想到碰到一个坑爹的问题:
  1. Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.90 CQ:  0.90 INDEX:  0.90, maybe your broker machine memory too small.
复制代码
  1. org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [549]ms, Topic: ldTopicA, BrokersSent: [broker-a, broker-a, broker-a]
  2. See http://rocketmq.apache.org/docs/faq/ for further details.
  3.     at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:665)
  4.     at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
  5.     at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289)
  6.     at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325)
  7.     at com.example.delay.MQTest.sendTest(MQTest.java:46)
  8.     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  9.     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  10.     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)<br>    ...
  11. Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.90 CQ:  0.90 INDEX:  0.90, maybe your broker machine memory too small.
  12. For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
  13.     at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:665)
  14.     at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:505)
  15.     at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:487)
  16.     at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:431)
  17.     at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:854)
  18.     at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:584)
复制代码
看报错应该是磁盘空间不足的问题,看到一个帖子https://bbs.csdn.net/topics/392568834,还挺符合的,虽然给出的解决方案说的没那么详细,但是值得一试。
查看磁盘空间
 
已用91%,查阅百度之后发现rocketmq源码的DefaultMessageStore类里,默认会把剩余磁盘的比率不足75%(rocketmq版本不同这个比率好像不一样)当做磁盘空间不足处理,看来磁盘是有点不够了。
先cd到rocketmq配置文件的路径,我这里配置的是双主双从同步的模式,所以cd到配置文件(根据配置的不同文件夹的路径不一样,但都在/conf下)。

 重新发送消息Ok了
 
 
出处:https://www.cnblogs.com/donleo123/本文如对您有帮助,还请多推荐下此文,如有错误欢迎指正,相互学习,共同进步。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4