RocketMq源码焦点篇整体栏目
内容链接地址【一】环境搭建、根本使用、可视化界面https://zhenghuisheng.blog.csdn.net/article/details/147481401
如需转载,请附上链接:https://blog.csdn.net/zhenghuishengq/article/details/147481401
一,RocketMq源码分析
在现在盛行的mq消息中间件中,rocketMq和kafka现在是占市面主流,其二者底层思想相互借鉴,又由于rocketMq底层源码是通过java代码实现,因此这里优先思量研究rocketMq的底层,能把rocketMq中间件熟练把握后,那么kafka自然也能够融汇贯通。从本系列开始正式迈入rocketMq源码篇章,在保证会用的同时,也能够知道其底层原理和内部实现,同时也能够学习内部优秀的源码计划和风格。
在rocketmq中,5.x版本是grpc协议实现,4.x版本还是基于netty协议,而且5.x内部模块相对于4.x版本重构了许多,文档等也相对较少,因此为了深度的学习rocketMq的底层源码,我们先从4.x的版本学起,这里保举使用4.9.4版本,其源码地址如下:https://github.com/apache/rocketmq/tree/release-4.9.4
也可以直接使用我gitee堆栈现成的源码,也包含一些案例和注释:https://gitee.com/zhenghuisheng/rocketmq-source-code-learning
可视化界面源码,里面的jar包也能直接使用:https://gitee.com/zhenghuisheng/rocketPageHome
1. docker安装rocketMq
docker拉取rocketmq的镜像,这里保举4.9.4版本,因此这里还是拉取4.9.4的镜像
- docker pull apache/rocketmq:4.9.4
复制代码 启动nameServer,同时设置初始的堆内存大小
- //运行新的nameServer
- docker run -d --name rmqnamesrv -p 9876:9876 -e "JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m" apache/rocketmq:4.9.4 sh mqnamesrv
复制代码 在执行启动broker下令前,需要在配置文件中先设置一些参数,在 /root/rocketmq/broker.conf 配置文件中,这个文件就是linux环境下的原生文件,用于挂载映射到容器内部,没有这个文件的话需要创建一下,下面的namesrvAddr和brokerIP1里面的ip,需要替换成自己服务器的ip
- brokerClusterName=DefaultCluster
- brokerName=broker-a
- brokerId=0
- deleteWhen=04
- fileReservedTime=48
- brokerRole=ASYNC_MASTER
- flushDiskType=ASYNC_FLUSH
- listenPort=10911
- namesrvAddr=159.75.102.237:9876
- brokerIP1=159.75.102.237
复制代码 启动broker,这里是4.x的版本启动方式,同时调整了初始的堆栈大小,因为默认的是4g内存,像我总共只有2核4g的服务器,刚启动就直接把内存给撑爆了,因此需要适当的去调整一下
- //运行broker
- docker run -d --name rmqbroker -p 10911:10911 -p 10909:10909 -e "NAMESRV_ADDR=159.75.102.237:9876" -e "JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m -Duser.home=/home/rocketmq" -v /root/rocketmq/broker.conf:/opt/rocketmq/conf/broker.conf apache/rocketmq:4.9.4 sh mqbroker -c /opt/rocketmq/conf/broker.conf -n 159.75.102.237:9876
复制代码 执行完这两条下令之后,执行在运行容器的下令即可,broker和nameserver全部启动起来了
然后去对应的服务器把防火墙端口打开,分别是nameServer的9876端口和broker对应的10911、10909端口
2. rocketMq根本使用
2.1,创建topic主题
创建topic主题,可以直接在容器内部手动的去创建主题,进入broker容器内部,然后先创建一个测试的主题,如我这边新建一个 zhsTopic 的主题
- # 进入broker容器内部
- docker exec -it rmqbroker bash
- cd /home/rocketmq/rocketmq-4.9.4/bin
- export NAMESRV_ADDR=159.75.102.237:9876
- # 创建 topic,绑定到默认集群和 broker 上
- sh mqadmin updateTopic -n 159.75.102.237:9876 -c DefaultCluster -t zhsTopic
复制代码 后续出现 create topic to 172.17.0.3:10911 success. 表现主题创建乐成
2.2,java底子代码测试
引入项目依靠,我调试的源码版本是4.9.4,所以后续都通过这个
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.9.4</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.83</version>
- </dependency>
复制代码 接下来通过java程序模拟两个客户端,消费者的代码如下,setNamesrvAddr需要设置成对应的ip和开放的端口号,主题设置成自定义的主题
- /**
- *
- * @Author zhenghuisheng
- * @Date:2025/4/15 19:24
- */
- public class SimpleConsumer {
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo-consumer-group");
- consumer.setNamesrvAddr("159.75.102.237:9876");
- consumer.subscribe("zhsTopic", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- for (MessageExt msg : msgs) {
- System.out.printf("收到消息:%s%n", new String(msg.getBody()));
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.println("消费者已启动");
- }
- }
复制代码 生产者的代码如下,同样也是设置对应的ip和端口以及主题,启动乐成后手动销毁竣事一下
- /**
- *
- * @Author zhenghuisheng
- * @Date:2025/4/15 19:24
- */
- public class SimpleProducer {
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("demo-producer-group");
- producer.setNamesrvAddr("159.75.102.237:9876");
- producer.start();
- Message msg = new Message("zhsTopic", "TagA", "Hello RocketMQ!".getBytes());
- SendResult sendResult = producer.send(msg);
- System.out.printf("发送结果:%s%n", sendResult);
- producer.shutdown();
- }
- }
复制代码 最后先启动消费者,然后再启动生产者,其对应的日记如下
生产者日记:发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000013EF818B4AAC26B8EE9230000, offsetMsgId=9F4B66ED00002A9F0000000000000178, messageQueue=MessageQueue [topic=zhsTopic, brokerName=broker-a, queueId=3], queueOffset=0]
21:15:27.970 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[159.75.102.237:9876] result: true
21:15:27.973 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[159.75.102.237:10911] result: true
消费者日记:
消费者已启动
收到消息:Hello RocketMQ!
2.3,结合springboot使用
一样平常整合springboot时,都要引入对应的starer让spring主动注入到容器中,因此这里也引入对应的starer,这道理版本选择2.2.3即可
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.3</version>
- </dependency>
复制代码 然后消费者代码如下,直接@Service大概@Component都可以,使用注解 RocketMQMessageListener 即可监听消息而且消费
- /**
- *
- * @Author zhenghuisheng
- * @Date:2025/4/21 21:22
- */
- @Service
- @RocketMQMessageListener(topic = IndexConstant.CUSTOM_TOPIC, consumerGroup = "demo-consumer-group")
- public class MQConsumerService implements RocketMQListener<String> {
- public void onMessage(String message) {
- System.out.println("收到消息: " + message);
- }
- }
复制代码 常量如下,定义上面要取的topic主题
- @Data
- public class IndexConstant implements Serializable {
- public static final String CUSTOM_TOPIC = "zhsTopic";
- }
复制代码 生产者就比较简朴,直接使用这个 RocketMQTemplate 模板即可,内部是使用刚刚引入的starer中的注解
- /**
- *
- * @Author zhenghuisheng
- * @Date:2025/4/21 21:21
- */
- @Slf4j
- @Service
- public class MQProducerService {
- @Resource
- private RocketMQTemplate rocketMQTemplate;
- public void sendMessage(String message) {
- log.info("发送的主题为,{} --- 消息为,{}", IndexConstant.CUSTOM_TOPIC, message);
- rocketMQTemplate.convertAndSend(IndexConstant.CUSTOM_TOPIC, message);
- }
- }
复制代码 最后定义一个controller发送消息即可,消息从接口处传入
- /**
- *
- * @Author zhenghuisheng
- * @Date:2025/4/21 21:28
- */
- @RestController
- @RequestMapping("/mq")
- @Slf4j
- public class MqController {
- @Resource
- private MQProducerService mqProducerService;
- @GetMapping("/sendMessage")
- public R<String> sendMessage(String message) {
- mqProducerService.sendMessage(message);
- return R.ok(message);
- }
- }
复制代码 3. 安装可视化界面
在使用rocketmq时,可以用官方保举的可视化界面进行查看消息的消费环境以及集群等问题,对相干问题排查也比较友好,因此这里选择使用官方保举的 https://github.com/apache/rocketmq-dashboard/tags ,这里目前有两个版本,2.0版本是为了rocketMq5.0版本之后使用的,会涉及到一些grpc等,因此先使用下面的1.0版本
可以将代码拉到本地,解压后本地先辈行一些配置修改,如yml中 namesrvAddr 地址,以及修改一些默认的端口号等
- rocketmq.config.namesrvAddr=
- server.port=8888
复制代码 随后本地通过maven打一个jar包,将这个jar包发到服务器上面,随后可以看到此时已经build success
随后将本地打出来的包发布到服务器上,也可以在本地执行一下,是可以运行的
我把它放在/opt/rocketmq目录下面,由于java的可移植性,windows打出的包linux也可以运行
随后执行服务启动下令,java -jar rocketmq-dashboard-1.0.0.jar 即可,端口号是改成了8888
随后打开ip+端口号,像8888端口号的话,需要开启防火墙,开放端口号,直接进入这个可视化界面如许就完成可视化几面的安装
可以看到刚刚新建的主题zhsTopic, 可以在这里查看状态,手动的发消息等
以及消费者状态,看每个topic主题消费的环境等,以及生产者等等
这个可视化界面在后续用到时也可以详细的描述他的功能
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |