public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
System.out.println(new Date());
// 业务处理
try {
handleDb();
} catch (Exception e) {
// 重试
int reconsumeTimes = messageExt.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 不要重试了
System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
private void handleDb() {
int i = 10 / 0;
}
复制代码
三、springboot整合
写依赖
rocketmq:
name-server: 47.96.254.46:9876
producer:
group: boot-producer-group
enable-msg-trace: true # ??????????
access-key: rocketmq2
secret-key: 12345678
复制代码
写设置
rocketmq:
name-server: :9876
producer:
group: boot-producer-group
enable-msg-trace: true # ??????????
access-key: rocketmq2
secret-key: 12345678
复制代码
报错No qualifying bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' available: expected at least 1 bean which qualifies as autowire candidate.
解决
https://blog.csdn.net/zhenweiyi/article/details/130722046
3.1常见消息誊写