标签:RocketMq5.Dashboard;
一、简介
RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;
二、环境部署
1、编译打包
- 1、下载5.0版本源码包
- rocketmq-all-5.0.0-source-release.zip
- 2、解压后进入目录,编译打包
- mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
复制代码
2、修改配置
- 在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh
复制代码 - distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh
复制代码
3、服务启动
- 1、该目录下
- distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/
- 2、启动NameServer
- sh mqnamesrv
- 输出日志
- The Name Server boot success. serializeType=JSON
- 3、启动Broker+Proxy
- sh mqbroker -n localhost:9876 --enable-proxy
- 输出日志
- rocketmq-proxy startup successfully
- 4、关闭服务
- sh mqshutdown namesrv
- Send shutdown request to mqnamesrv(18636) OK
- sh mqshutdown broker
- Send shutdown request to mqbroker with proxy enable OK(18647)
复制代码 4、控制台安装
- 1、下载master源码包
- rocketmq-dashboard-master
- 2、解压后进入目录,编译打包
- mvn clean package -Dmaven.test.skip=true
- 3、启动服务
- java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
- 4、输出日志
- INFO main - Tomcat started on port(s): 8080 (http) with context path ''
- 5、访问服务:localhost:8080
复制代码
三、工程搭建
1、工程结构

2、依赖管理
在rocketmq-starter组件中,实际上依赖的是rocketmq-client组件的5.0版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>${rocketmq-starter.version}</version>
- </dependency>
复制代码 3、配置文件
配置RocketMq服务地址,消息生产者和消费者;- rocketmq:
- name-server: 127.0.0.1:9876
- # 生产者
- producer:
- group: boot_group_1
- # 消息发送超时时间
- send-message-timeout: 3000
- # 消息最大长度4M
- max-message-size: 4096
- # 消息发送失败重试次数
- retry-times-when-send-failed: 3
- # 异步消息发送失败重试次数
- retry-times-when-send-async-failed: 2
- # 消费者
- consumer:
- group: boot_group_1
- # 每次提取的最大消息数
- pull-batch-size: 5
复制代码 4、配置类
在配置类中主要定义两个Bean的加载,即RocketMQTemplate和DefaultMQProducer,主要是提供消息发送的能力,即生产消息;- @Configuration
- public class RocketMqConfig {
- @Value("${rocketmq.name-server}")
- private String nameServer;
- @Value("${rocketmq.producer.group}")
- private String producerGroup;
- @Value("${rocketmq.producer.send-message-timeout}")
- private Integer sendMsgTimeout;
- @Value("${rocketmq.producer.max-message-size}")
- private Integer maxMessageSize;
- @Value("${rocketmq.producer.retry-times-when-send-failed}")
- private Integer retryTimesWhenSendFailed ;
- @Value("${rocketmq.producer.retry-times-when-send-async-failed}")
- private Integer retryTimesWhenSendAsyncFailed ;
- @Bean
- public RocketMQTemplate rocketMqTemplate(){
- RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
- rocketMqTemplate.setProducer(defaultMqProducer());
- return rocketMqTemplate;
- }
- @Bean
- public DefaultMQProducer defaultMqProducer() {
- DefaultMQProducer producer = new DefaultMQProducer();
- producer.setNamesrvAddr(this.nameServer);
- producer.setProducerGroup(this.producerGroup);
- producer.setSendMsgTimeout(this.sendMsgTimeout);
- producer.setMaxMessageSize(this.maxMessageSize);
- producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
- producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
- return producer;
- }
- }
复制代码 四、基础用法
1、消息生产
编写一个生产者接口类,分别使用RocketMQTemplate和DefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情;- @RestController
- public class ProducerWeb {
- private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);
- @Autowired
- private RocketMQTemplate rocketMqTemplate;
- @GetMapping("/send/msg1")
- public String sendMsg1 (){
- try {
- // 构建消息主体
- JsonMapper jsonMapper = new JsonMapper();
- String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg"));
- // 发送消息
- rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return "OK" ;
- }
- @Autowired
- private DefaultMQProducer defaultMqProducer ;
- @GetMapping("/send/msg2")
- public String sendMsg2 (){
- try {
- // 构建消息主体
- JsonMapper jsonMapper = new JsonMapper();
- String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg"));
- // 构建消息对象
- Message message = new Message();
- message.setTopic("boot-mq-topic");
- message.setTags("boot-mq-tag");
- message.setKeys("boot-mq-key");
- message.setBody(msgBody.getBytes());
- // 发送消息,打印日志
- SendResult sendResult = defaultMqProducer.send(message);
- log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());
- } catch (Exception e) {
- e.printStackTrace();
- }
- return "OK" ;
- }
- }
复制代码 2、消息消费
编写消息监听类,实现RocketMQListener接口,通过RocketMQMessageListener注解控制监听的具体信息;- @Service
- @RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic")
- public class ConsumerListener implements RocketMQListener<String> {
- private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);
- @Override
- public void onMessage(String message) {
- log.info("\n=====\n message:{} \n=====\n",message);
- }
- }
复制代码
五、参考源码
- 文档仓库:
- https://gitee.com/cicadasmile/butte-java-note
- 源码仓库:
- https://gitee.com/cicadasmile/butte-spring-parent
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |