SpringBoot3集成RocketMq

打印 上一主题 下一主题

主题 559|帖子 559|积分 1677

标签:RocketMq5.Dashboard;
一、简介

RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;
二、环境部署

1、编译打包
  1. 1、下载5.0版本源码包
  2. rocketmq-all-5.0.0-source-release.zip
  3. 2、解压后进入目录,编译打包
  4. mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
复制代码

2、修改配置
  1. 在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh
复制代码
  1. distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh
复制代码

3、服务启动
  1. 1、该目录下
  2. distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/
  3. 2、启动NameServer
  4. sh mqnamesrv
  5. 输出日志
  6. The Name Server boot success. serializeType=JSON
  7. 3、启动Broker+Proxy
  8. sh mqbroker -n localhost:9876 --enable-proxy
  9. 输出日志
  10. rocketmq-proxy startup successfully
  11. 4、关闭服务
  12. sh mqshutdown namesrv
  13. Send shutdown request to mqnamesrv(18636) OK
  14. sh mqshutdown broker
  15. Send shutdown request to mqbroker with proxy enable OK(18647)
复制代码
4、控制台安装
  1. 1、下载master源码包
  2. rocketmq-dashboard-master
  3. 2、解压后进入目录,编译打包
  4. mvn clean package -Dmaven.test.skip=true
  5. 3、启动服务
  6. java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
  7. 4、输出日志
  8. INFO main - Tomcat started on port(s): 8080 (http) with context path ''
  9. 5、访问服务:localhost:8080
复制代码

三、工程搭建

1、工程结构


2、依赖管理

在rocketmq-starter组件中,实际上依赖的是rocketmq-client组件的5.0版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;
  1. <dependency>
  2.     <groupId>org.apache.rocketmq</groupId>
  3.     <artifactId>rocketmq-spring-boot-starter</artifactId>
  4.     <version>${rocketmq-starter.version}</version>
  5. </dependency>
复制代码
3、配置文件

配置RocketMq服务地址,消息生产者和消费者;
  1. rocketmq:
  2.   name-server: 127.0.0.1:9876
  3.   # 生产者
  4.   producer:
  5.     group: boot_group_1
  6.     # 消息发送超时时间
  7.     send-message-timeout: 3000
  8.     # 消息最大长度4M
  9.     max-message-size: 4096
  10.     # 消息发送失败重试次数
  11.     retry-times-when-send-failed: 3
  12.     # 异步消息发送失败重试次数
  13.     retry-times-when-send-async-failed: 2
  14.   # 消费者
  15.   consumer:
  16.     group: boot_group_1
  17.     # 每次提取的最大消息数
  18.     pull-batch-size: 5
复制代码
4、配置类

在配置类中主要定义两个Bean的加载,即RocketMQTemplate和DefaultMQProducer,主要是提供消息发送的能力,即生产消息;
  1. @Configuration
  2. public class RocketMqConfig {
  3.     @Value("${rocketmq.name-server}")
  4.     private String nameServer;
  5.     @Value("${rocketmq.producer.group}")
  6.     private String producerGroup;
  7.     @Value("${rocketmq.producer.send-message-timeout}")
  8.     private Integer sendMsgTimeout;
  9.     @Value("${rocketmq.producer.max-message-size}")
  10.     private Integer maxMessageSize;
  11.     @Value("${rocketmq.producer.retry-times-when-send-failed}")
  12.     private Integer retryTimesWhenSendFailed ;
  13.     @Value("${rocketmq.producer.retry-times-when-send-async-failed}")
  14.     private Integer retryTimesWhenSendAsyncFailed ;
  15.     @Bean
  16.     public RocketMQTemplate rocketMqTemplate(){
  17.         RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
  18.         rocketMqTemplate.setProducer(defaultMqProducer());
  19.         return rocketMqTemplate;
  20.     }
  21.     @Bean
  22.     public DefaultMQProducer defaultMqProducer() {
  23.         DefaultMQProducer producer = new DefaultMQProducer();
  24.         producer.setNamesrvAddr(this.nameServer);
  25.         producer.setProducerGroup(this.producerGroup);
  26.         producer.setSendMsgTimeout(this.sendMsgTimeout);
  27.         producer.setMaxMessageSize(this.maxMessageSize);
  28.         producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
  29.         producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
  30.         return producer;
  31.     }
  32. }
复制代码
四、基础用法

1、消息生产

编写一个生产者接口类,分别使用RocketMQTemplate和DefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情;
  1. @RestController
  2. public class ProducerWeb {
  3.     private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);
  4.     @Autowired
  5.     private RocketMQTemplate rocketMqTemplate;
  6.     @GetMapping("/send/msg1")
  7.     public String sendMsg1 (){
  8.         try {
  9.             // 构建消息主体
  10.             JsonMapper jsonMapper = new JsonMapper();
  11.             String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg"));
  12.             // 发送消息
  13.             rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody);
  14.         } catch (Exception e) {
  15.             e.printStackTrace();
  16.         }
  17.         return "OK" ;
  18.     }
  19.     @Autowired
  20.     private DefaultMQProducer defaultMqProducer ;
  21.     @GetMapping("/send/msg2")
  22.     public String sendMsg2 (){
  23.         try {
  24.             // 构建消息主体
  25.             JsonMapper jsonMapper = new JsonMapper();
  26.             String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg"));
  27.             // 构建消息对象
  28.             Message message = new Message();
  29.             message.setTopic("boot-mq-topic");
  30.             message.setTags("boot-mq-tag");
  31.             message.setKeys("boot-mq-key");
  32.             message.setBody(msgBody.getBytes());
  33.             // 发送消息,打印日志
  34.             SendResult sendResult = defaultMqProducer.send(message);
  35.             log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());
  36.         } catch (Exception e) {
  37.             e.printStackTrace();
  38.         }
  39.         return "OK" ;
  40.     }
  41. }
复制代码
2、消息消费

编写消息监听类,实现RocketMQListener接口,通过RocketMQMessageListener注解控制监听的具体信息;
  1. @Service
  2. @RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic")
  3. public class ConsumerListener implements RocketMQListener<String> {
  4.     private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);
  5.     @Override
  6.     public void onMessage(String message) {
  7.         log.info("\n=====\n message:{} \n=====\n",message);
  8.     }
  9. }
复制代码

五、参考源码
  1. 文档仓库:
  2. https://gitee.com/cicadasmile/butte-java-note
  3. 源码仓库:
  4. https://gitee.com/cicadasmile/butte-spring-parent
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

反转基因福娃

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表