1、从源码角度理解RocketMQ运行架构
1.1、环境准备
假如还没安装RocketMQ的同学,可参考《RocketMQ服务快速搭建和体验消息收发》。
先运行NameServer,Broker两个服务。
- # 启动NameServer
- nohup mqnamesrv &
- # 启动Broker
- nohup mqbroker &
- # 查看服务是否启动成功
- jps
- # 当看到以下两个服务时表示启动成功
- 1939 NamesrvStartup
- 2037 BrokerStartup
复制代码 1.2、源码体验快速发送和消费消息
源码可以从官网下载,或在文末获取。
生产者和消费者代码快速定位到example下的quickstart。假如没有设置环境变量NAMESRV_ADDR指定NameServer的服务地址,就须要在代码指定NamesrvAddr值,也就须要修改参数DEFAULT_NAMESRVADDR值,指向服务摆设的IP和端口。
为方便本身理解,我选择的是在代码设置的方式。
代码所在的位置:
生产者Producer须要修改的地方:
为方便查看消息,把测试消息个数修改为10条。
同理,消费者Consumer也一样修改。
先启动生产者,消息发送完,程序就停止了。
再启动消费者,可以看到控制台打印的消费消息记录,消费完后,消费者进程并没有停止,而是挂起状态,只要有消息再过来就可以继续消费。这个可以自行搭建服务和下载源码体验测试下就清楚了。
1.3、单机版ReocketMQ运行架构
上面从源码提供的示例代码体验了消息的发送和消费,我这边整理出单机版的RocketMQ运行架构图,集群版本也类似。由于集群服务还没搭建暂不提供。
2、Java项目代码角度理解
搭建一个Spring Boot项目,步调相信大家都会了,就不细说截图了,如需Demo文末有获取方式。
2.1、在pom.xml引入依赖
由于创建的是Spring Boot项目,引入的是rocketmq-spring-boot-starter。
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.3.1</version>
- </dependency>
复制代码 start最新版本是2.3.1,对应rocketmq版本是5.3.0。
假如使用5.3.1版本的客户端清除starter的rocketmq-client,再引入5.3.1版本的。但我个建议不消,除非须要使用里面最新的功能。
2.2、设置NameServer地址、生产者和消费者分组
- rocketmq:
- # NameServer地址 ★★修改为自己部署的IP和端口★★
- name-server: 192.168.242.50:9876
- # 配置发送者,才会初始化 RocketMQTemplate
- producer:
- group: SpringBootGroup
- consumer:
- group: TestConsumerGroup
- # 消费者订阅的Topic
- topic: TestTopic
复制代码 2.3、生产者demo代码
- @Component
- public class SpringProducer {
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
- /**
- * 发送消息
- * @param topic 主题
- * @param msg 消息体
- */
- public void sendMsg(String topic, String msg) {
- rocketMQTemplate.convertAndSend(topic, msg);
- }
- }
复制代码 2.4、消费者demo代码
- @Component
- // 消费监听的消费组和主题可以抽取做为配置项
- @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}")
- public class SpringConsumer implements RocketMQListener<String> {
- @Override
- public void onMessage(String msg) {
- System.out.println("Recived msg : " + msg);
- }
- }
复制代码 2.5、测试生产消息的入口demo代码
- @RestController
- @RequestMapping("/mqTest")
- public class TestController {
- private final String topic = "TestTopic";
- @Autowired
- private SpringProducer producer;
- @GetMapping("/sendMsg")
- public String sendMsg(String msg) {
- producer.sendMsg(topic, msg);
- return "success";
- }
- }
复制代码 2.6、启动项目
启动完项目,看到打印日志有:生产者在NameServer初始化,消费者监听,项目本身启动乐成等信息,就表示启动乐成了,可以举行接口测试了。
2.7、在浏览器访问sendMsg测试
在浏览器输入http://127.0.0.1:8090/mqTest/sendMsg?msg=test1 ,在参数msg 后拼接恣意测试消息,得到响应效果success
在控制台看到输出吸收到消息。感兴趣的同学可以以调试模式跟踪代码逻辑。
3、我的公众号&资料获取
敬请关注我的公众号:大象只为你,持续更新技能知识…
干系资料获取:
如需RocketMQ5.3.1资源包,请后台复兴:RocketMQ。
如需RocketMQ Java项目demo,请后台复兴:rmqd。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |