业务原因,必要在一个项目中与多方MQ举行业务通信;
步调一,复制一份RocketMQProperties配置文件,制止与原来的冲突- package com.heit.road.web.config;
- import org.apache.rocketmq.common.topic.TopicValidator;
- import java.util.HashMap;
- import java.util.Map;
- public class MultipleRocketMQProperties {
- /**
- * The name server for rocketMQ, formats: `host:port;host:port`.
- */
- private String nameServer;
- /**
- * Enum type for accessChannel, values: LOCAL, CLOUD
- */
- private String accessChannel;
- private org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer producer;
- /**
- * Configure enable listener or not.
- * In some particular cases, if you don't want the the listener is enabled when container startup,
- * the configuration pattern is like this :
- * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
- * <p>
- * the listener is enabled by default.
- */
- private org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer consumer = new org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer();
- public String getNameServer() {
- return nameServer;
- }
- public void setNameServer(String nameServer) {
- this.nameServer = nameServer;
- }
- public String getAccessChannel() {
- return accessChannel;
- }
- public void setAccessChannel(String accessChannel) {
- this.accessChannel = accessChannel;
- }
- public org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer getProducer() {
- return producer;
- }
- public void setProducer(org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer producer) {
- this.producer = producer;
- }
- public static class Producer {
- /**
- * Group name of producer.
- */
- private String group;
- /**
- * Millis of send message timeout.
- */
- private int sendMessageTimeout = 3000;
- /**
- * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
- */
- private int compressMessageBodyThreshold = 1024 * 4;
- /**
- * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
- * This may potentially cause message duplication which is up to application developers to resolve.
- */
- private int retryTimesWhenSendFailed = 2;
- /**
- * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
- * This may potentially cause message duplication which is up to application developers to resolve.
- */
- private int retryTimesWhenSendAsyncFailed = 2;
- /**
- * Indicate whether to retry another broker on sending failure internally.
- */
- private boolean retryNextServer = false;
- /**
- * Maximum allowed message size in bytes.
- */
- private int maxMessageSize = 1024 * 1024 * 4;
- /**
- * The property of "access-key".
- */
- private String accessKey;
- /**
- * The property of "secret-key".
- */
- private String secretKey;
- /**
- * Switch flag instance for message trace.
- */
- private boolean enableMsgTrace = true;
- /**
- * The name value of message trace topic.If you don't config,you can use the default trace topic name.
- */
- private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
- public String getGroup() {
- return group;
- }
- public void setGroup(String group) {
- this.group = group;
- }
- public int getSendMessageTimeout() {
- return sendMessageTimeout;
- }
- public void setSendMessageTimeout(int sendMessageTimeout) {
- this.sendMessageTimeout = sendMessageTimeout;
- }
- public int getCompressMessageBodyThreshold() {
- return compressMessageBodyThreshold;
- }
- public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
- this.compressMessageBodyThreshold = compressMessageBodyThreshold;
- }
- public int getRetryTimesWhenSendFailed() {
- return retryTimesWhenSendFailed;
- }
- public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
- this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
- }
- public int getRetryTimesWhenSendAsyncFailed() {
- return retryTimesWhenSendAsyncFailed;
- }
- public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
- this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
- }
- public boolean isRetryNextServer() {
- return retryNextServer;
- }
- public void setRetryNextServer(boolean retryNextServer) {
- this.retryNextServer = retryNextServer;
- }
- public int getMaxMessageSize() {
- return maxMessageSize;
- }
- public void setMaxMessageSize(int maxMessageSize) {
- this.maxMessageSize = maxMessageSize;
- }
- public String getAccessKey() {
- return accessKey;
- }
- public void setAccessKey(String accessKey) {
- this.accessKey = accessKey;
- }
- public String getSecretKey() {
- return secretKey;
- }
- public void setSecretKey(String secretKey) {
- this.secretKey = secretKey;
- }
- public boolean isEnableMsgTrace() {
- return enableMsgTrace;
- }
- public void setEnableMsgTrace(boolean enableMsgTrace) {
- this.enableMsgTrace = enableMsgTrace;
- }
- public String getCustomizedTraceTopic() {
- return customizedTraceTopic;
- }
- public void setCustomizedTraceTopic(String customizedTraceTopic) {
- this.customizedTraceTopic = customizedTraceTopic;
- }
- }
- public org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer getConsumer() {
- return consumer;
- }
- public void setConsumer(org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer consumer) {
- this.consumer = consumer;
- }
- public static final class Consumer {
- /**
- * listener configuration container
- * the pattern is like this:
- * group1.topic1 = false
- * group2.topic2 = true
- * group3.topic3 = false
- */
- private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
- public Map<String, Map<String, Boolean>> getListeners() {
- return listeners;
- }
- public void setListeners(Map<String, Map<String, Boolean>> listeners) {
- this.listeners = listeners;
- }
- }
- }
复制代码 步调二,复制一份@RocketMQMessageListener,并新增数据源参数soruce
,这里不接纳原来的nameServer参数,大概是版本原因,这个参数现在并不支持多数据源步调三,复制一份RocketMQListenerContainer,替换参数RocketMQMessageListener- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package com.heit.road.web.config;
- import org.apache.rocketmq.client.AccessChannel;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.MessageSelector;
- import org.apache.rocketmq.client.consumer.listener.*;
- import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.SendCallback;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.client.producer.SendStatus;
- import org.apache.rocketmq.client.utils.MessageUtil;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.remoting.RPCHook;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.apache.rocketmq.spring.annotation.ConsumeMode;
- import org.apache.rocketmq.spring.annotation.MessageModel;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.annotation.SelectorType;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
- import org.apache.rocketmq.spring.core.RocketMQReplyListener;
- import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
- import org.apache.rocketmq.spring.support.RocketMQUtil;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.aop.framework.AopProxyUtils;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.context.SmartLifecycle;
- import org.springframework.core.MethodParameter;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.MessageHeaders;
- import org.springframework.messaging.converter.MessageConversionException;
- import org.springframework.messaging.converter.MessageConverter;
- import org.springframework.messaging.converter.SmartMessageConverter;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.util.Assert;
- import org.springframework.util.MimeTypeUtils;
- import java.lang.reflect.Method;
- import java.lang.reflect.ParameterizedType;
- import java.lang.reflect.Type;
- import java.nio.charset.Charset;
- import java.util.List;
- import java.util.Objects;
- @SuppressWarnings("WeakerAccess")
- public class MultipleRocketMQListenerContainer implements InitializingBean,
- RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
- private final static Logger log = LoggerFactory.getLogger(MultipleRocketMQListenerContainer.class);
- private ApplicationContext applicationContext;
- /**
- * The name of the DefaultRocketMQListenerContainer instance
- */
- private String name;
- private long suspendCurrentQueueTimeMillis = 1000;
- /**
- * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
- * >0,client control retry frequency.
- */
- private int delayLevelWhenNextConsume = 0;
- private String nameServer;
- private AccessChannel accessChannel = AccessChannel.LOCAL;
- private String consumerGroup;
- private String topic;
- private int consumeThreadMax = 64;
- private String charset = "UTF-8";
- private MessageConverter messageConverter;
- private RocketMQListener rocketMQListener;
- private RocketMQReplyListener rocketMQReplyListener;
- private MultipleRocketMQMessageListener rocketMQMessageListener;
- private DefaultMQPushConsumer consumer;
- private Type messageType;
- private MethodParameter methodParameter;
- private boolean running;
- // The following properties came from @RocketMQMessageListener.
- private ConsumeMode consumeMode;
- private SelectorType selectorType;
- private String selectorExpression;
- private MessageModel messageModel;
- private long consumeTimeout;
- public long getSuspendCurrentQueueTimeMillis() {
- return suspendCurrentQueueTimeMillis;
- }
- public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
- this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
- }
- public int getDelayLevelWhenNextConsume() {
- return delayLevelWhenNextConsume;
- }
- public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
- this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
- }
- public String getNameServer() {
- return nameServer;
- }
- public void setNameServer(String nameServer) {
- this.nameServer = nameServer;
- }
- public AccessChannel getAccessChannel() {
- return accessChannel;
- }
- public void setAccessChannel(AccessChannel accessChannel) {
- this.accessChannel = accessChannel;
- }
- public String getConsumerGroup() {
- return consumerGroup;
- }
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
- public String getTopic() {
- return topic;
- }
- public void setTopic(String topic) {
- this.topic = topic;
- }
- public int getConsumeThreadMax() {
- return consumeThreadMax;
- }
- public String getCharset() {
- return charset;
- }
- public void setCharset(String charset) {
- this.charset = charset;
- }
- public MessageConverter getMessageConverter() {
- return messageConverter;
- }
- public MultipleRocketMQListenerContainer setMessageConverter(MessageConverter messageConverter) {
- this.messageConverter = messageConverter;
- return this;
- }
- public RocketMQListener getRocketMQListener() {
- return rocketMQListener;
- }
- public void setRocketMQListener(RocketMQListener rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
- public RocketMQReplyListener getRocketMQReplyListener() {
- return rocketMQReplyListener;
- }
- public void setRocketMQReplyListener(RocketMQReplyListener rocketMQReplyListener) {
- this.rocketMQReplyListener = rocketMQReplyListener;
- }
- public MultipleRocketMQMessageListener getRocketMQMessageListener() {
- return rocketMQMessageListener;
- }
- public void setRocketMQMessageListener(MultipleRocketMQMessageListener anno) {
- this.rocketMQMessageListener = anno;
- this.consumeMode = anno.consumeMode();
- this.consumeThreadMax = anno.consumeThreadMax();
- this.messageModel = anno.messageModel();
- this.selectorType = anno.selectorType();
- this.selectorExpression = anno.selectorExpression();
- this.consumeTimeout = anno.consumeTimeout();
- }
- public ConsumeMode getConsumeMode() {
- return consumeMode;
- }
- public SelectorType getSelectorType() {
- return selectorType;
- }
- public void setSelectorExpression(String selectorExpression) {
- this.selectorExpression = selectorExpression;
- }
- public String getSelectorExpression() {
- return selectorExpression;
- }
- public MessageModel getMessageModel() {
- return messageModel;
- }
- public DefaultMQPushConsumer getConsumer() {
- return consumer;
- }
- public void setConsumer(DefaultMQPushConsumer consumer) {
- this.consumer = consumer;
- }
- @Override
- public void destroy() {
- this.setRunning(false);
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- log.info("container destroyed, {}", this.toString());
- }
- @Override
- public boolean isAutoStartup() {
- return true;
- }
- @Override
- public void stop(Runnable callback) {
- stop();
- callback.run();
- }
- @Override
- public void start() {
- if (this.isRunning()) {
- throw new IllegalStateException("container already running. " + this.toString());
- }
- try {
- consumer.start();
- } catch (MQClientException e) {
- throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
- }
- this.setRunning(true);
- log.info("running container: {}", this.toString());
- }
- @Override
- public void stop() {
- if (this.isRunning()) {
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- setRunning(false);
- }
- }
- @Override
- public boolean isRunning() {
- return running;
- }
- private void setRunning(boolean running) {
- this.running = running;
- }
- @Override
- public int getPhase() {
- // Returning Integer.MAX_VALUE only suggests that
- // we will be the first bean to shutdown and last bean to start
- return Integer.MAX_VALUE;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- initRocketMQPushConsumer();
- this.messageType = getMessageType();
- this.methodParameter = getMethodParameter();
- log.debug("RocketMQ messageType: {}", messageType);
- }
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
- }
- @Override
- public String toString() {
- return "DefaultRocketMQListenerContainer{" +
- "consumerGroup='" + consumerGroup + '\'' +
- ", nameServer='" + nameServer + '\'' +
- ", topic='" + topic + '\'' +
- ", consumeMode=" + consumeMode +
- ", selectorType=" + selectorType +
- ", selectorExpression='" + selectorExpression + '\'' +
- ", messageModel=" + messageModel +
- '}';
- }
- public void setName(String name) {
- this.name = name;
- }
- public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
- @SuppressWarnings("unchecked")
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- handleMessage(messageExt);
- long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
- } catch (Exception e) {
- log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
- context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
- public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
- @SuppressWarnings("unchecked")
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- handleMessage(messageExt);
- long costTime = System.currentTimeMillis() - now;
- log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
- } catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
- return ConsumeOrderlyStatus.SUCCESS;
- }
- }
- private void handleMessage(
- MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
- if (rocketMQListener != null) {
- rocketMQListener.onMessage(doConvertMessage(messageExt));
- } else if (rocketMQReplyListener != null) {
- Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
- Message<?> message = MessageBuilder.withPayload(replyContent).build();
- org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
- consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
- @Override public void onSuccess(SendResult sendResult) {
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
- } else {
- log.info("Consumer replies message success.");
- }
- }
- @Override public void onException(Throwable e) {
- log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
- }
- });
- }
- }
- private byte[] convertToBytes(Message<?> message) {
- Message<?> messageWithSerializedPayload = doConvert(message.getPayload(), message.getHeaders());
- Object payloadObj = messageWithSerializedPayload.getPayload();
- byte[] payloads;
- try {
- if (null == payloadObj) {
- throw new RuntimeException("the message cannot be empty");
- }
- if (payloadObj instanceof String) {
- payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
- } else if (payloadObj instanceof byte[]) {
- payloads = (byte[]) messageWithSerializedPayload.getPayload();
- } else {
- String jsonObj = (String) this.messageConverter.fromMessage(messageWithSerializedPayload, payloadObj.getClass());
- if (null == jsonObj) {
- throw new RuntimeException(String.format(
- "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
- this.messageConverter.getClass(), payloadObj.getClass(), payloadObj));
- }
- payloads = jsonObj.getBytes(Charset.forName(charset));
- }
- } catch (Exception e) {
- throw new RuntimeException("convert to bytes failed.", e);
- }
- return payloads;
- }
- private Message<?> doConvert(Object payload, MessageHeaders headers) {
- Message<?> message = this.messageConverter instanceof SmartMessageConverter ?
- ((SmartMessageConverter) this.messageConverter).toMessage(payload, headers, null) :
- this.messageConverter.toMessage(payload, headers);
- if (message == null) {
- String payloadType = payload.getClass().getName();
- Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
- throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
- "', contentType='" + contentType + "', converter=[" + this.messageConverter + "]");
- }
- MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
- builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
- return builder.build();
- }
- @SuppressWarnings("unchecked")
- private Object doConvertMessage(MessageExt messageExt) {
- if (Objects.equals(messageType, MessageExt.class)) {
- return messageExt;
- } else {
- String str = new String(messageExt.getBody(), Charset.forName(charset));
- if (Objects.equals(messageType, String.class)) {
- return str;
- } else {
- // If msgType not string, use objectMapper change it.
- try {
- if (messageType instanceof Class) {
- //if the messageType has not Generic Parameter
- return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType);
- } else {
- //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
- //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
- return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
- }
- } catch (Exception e) {
- log.info("convert failed. str:{}, msgType:{}", str, messageType);
- throw new RuntimeException("cannot convert message to " + messageType, e);
- }
- }
- }
- }
- private MethodParameter getMethodParameter() {
- Class<?> targetClass;
- if (rocketMQListener != null) {
- targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
- } else {
- targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
- }
- Type messageType = this.getMessageType();
- Class clazz = null;
- if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
- clazz = (Class) ((ParameterizedType) messageType).getRawType();
- } else if (messageType instanceof Class) {
- clazz = (Class) messageType;
- } else {
- throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
- }
- try {
- final Method method = targetClass.getMethod("onMessage", clazz);
- return new MethodParameter(method, 0);
- } catch (NoSuchMethodException e) {
- e.printStackTrace();
- throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
- }
- }
- private Type getMessageType() {
- Class<?> targetClass;
- if (rocketMQListener != null) {
- targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
- } else {
- targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
- }
- Type matchedGenericInterface = null;
- while (Objects.nonNull(targetClass)) {
- Type[] interfaces = targetClass.getGenericInterfaces();
- if (Objects.nonNull(interfaces)) {
- for (Type type : interfaces) {
- if (type instanceof ParameterizedType &&
- (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) {
- matchedGenericInterface = type;
- break;
- }
- }
- }
- targetClass = targetClass.getSuperclass();
- }
- if (Objects.isNull(matchedGenericInterface)) {
- return Object.class;
- }
- Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
- if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
- return actualTypeArguments[0];
- }
- return Object.class;
- }
- private void initRocketMQPushConsumer() throws MQClientException {
- if (rocketMQListener == null && rocketMQReplyListener == null) {
- throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
- }
- Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
- Assert.notNull(nameServer, "Property 'nameServer' is required");
- Assert.notNull(topic, "Property 'topic' is required");
- RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
- this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
- boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
- if (Objects.nonNull(rpcHook)) {
- consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
- enableMsgTrace, this.applicationContext.getEnvironment().
- resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
- consumer.setVipChannelEnabled(false);
- consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
- } else {
- log.debug("Access-key or secret-key not configure in " + this + ".");
- consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
- this.applicationContext.getEnvironment().
- resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
- }
- String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
- if (customizedNameServer != null) {
- consumer.setNamesrvAddr(customizedNameServer);
- } else {
- consumer.setNamesrvAddr(nameServer);
- }
- if (accessChannel != null) {
- consumer.setAccessChannel(accessChannel);
- }
- consumer.setConsumeThreadMax(consumeThreadMax);
- if (consumeThreadMax < consumer.getConsumeThreadMin()) {
- consumer.setConsumeThreadMin(consumeThreadMax);
- }
- consumer.setConsumeTimeout(consumeTimeout);
- switch (messageModel) {
- case BROADCASTING:
- consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
- break;
- case CLUSTERING:
- consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
- break;
- default:
- throw new IllegalArgumentException("Property 'messageModel' was wrong.");
- }
- switch (selectorType) {
- case TAG:
- consumer.subscribe(topic, selectorExpression);
- break;
- case SQL92:
- consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
- break;
- default:
- throw new IllegalArgumentException("Property 'selectorType' was wrong.");
- }
- switch (consumeMode) {
- case ORDERLY:
- consumer.setMessageListener(new DefaultMessageListenerOrderly());
- break;
- case CONCURRENTLY:
- consumer.setMessageListener(new DefaultMessageListenerConcurrently());
- break;
- default:
- throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
- }
- if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
- ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
- } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
- ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
- }
- }
- }
复制代码 步调四,配置文件添加参数,与原rocketmq参数区分开- multiplerocketmq:
- xdt:
- name-server: xdtIP:xdtPort
- producer:
- group: xdt_groups
- road:
- name-server: roadIP:roadPort
- producer:
- group: road_groups
复制代码 步调五,通过配置文件加载多数据源- package com.heit.road.web.config;
- import cn.hutool.core.date.DateUtil;
- import org.apache.rocketmq.client.AccessChannel;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.spring.annotation.ConsumeMode;
- import org.apache.rocketmq.spring.annotation.MessageModel;
- import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.apache.rocketmq.spring.core.RocketMQReplyListener;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
- import org.apache.rocketmq.spring.support.RocketMQUtil;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.aop.framework.AopProxyUtils;
- import org.springframework.aop.scope.ScopedProxyUtils;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.SmartInitializingSingleton;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.beans.factory.support.BeanDefinitionValidationException;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.context.ConfigurableApplicationContext;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.support.GenericApplicationContext;
- import org.springframework.core.env.StandardEnvironment;
- import org.springframework.util.Assert;
- import org.springframework.util.StringUtils;
- import java.util.Collections;
- import java.util.Map;
- import java.util.concurrent.atomic.AtomicLong;
- import java.util.stream.Collectors;
- @Configuration
- public class MultipleRocketMQConfig implements ApplicationContextAware, SmartInitializingSingleton {
- private final static Logger log = LoggerFactory.getLogger(MultipleRocketMQConfig.class);
- private ConfigurableApplicationContext applicationContext;
- private AtomicLong counter = new AtomicLong(0);
- private StandardEnvironment environment;
- private RocketMQMessageConverter rocketMQMessageConverter;
- public MultipleRocketMQConfig(RocketMQMessageConverter rocketMQMessageConverter,
- StandardEnvironment environment) {
- this.rocketMQMessageConverter = rocketMQMessageConverter;
- this.environment = environment;
- }
- @Bean("road")
- @ConditionalOnProperty(prefix = "multiplerocketmq.road", value = {"name-server"})
- @ConfigurationProperties(prefix = "multiplerocketmq.road")
- public MultipleRocketMQProperties road() {
- return new MultipleRocketMQProperties();
- }
- @Bean(value = "roadmq", destroyMethod = "destroy")
- @ConditionalOnMissingBean(name = "roadmq")
- @ConditionalOnProperty(prefix = "multiplerocketmq.road", value = {"name-server", "producer.group"})
- public RocketMQTemplate roadMQProducer(@Qualifier("road") MultipleRocketMQProperties rocketMQProperties, RocketMQMessageConverter rocketMQMessageConverter) {
- return createRocketMQTemplate(rocketMQProperties, rocketMQMessageConverter);
- }
- @Bean("xdt")
- @ConditionalOnProperty(prefix = "multiplerocketmq.xdt", value = {"name-server"})
- @ConfigurationProperties(prefix = "multiplerocketmq.xdt")
- public MultipleRocketMQProperties xdt() {
- return new MultipleRocketMQProperties();
- }
- @Bean(value = "xdtmq", destroyMethod = "destroy")
- @ConditionalOnMissingBean(name = "xdtmq")
- @ConditionalOnProperty(prefix = "multiplerocketmq.xdt", value = {"name-server", "producer.group"})
- public RocketMQTemplate xdtMQProducer(@Qualifier("xdt") MultipleRocketMQProperties rocketMQProperties, RocketMQMessageConverter rocketMQMessageConverter) {
- return createRocketMQTemplate(rocketMQProperties, rocketMQMessageConverter);
- }
- private RocketMQTemplate createRocketMQTemplate(MultipleRocketMQProperties rocketMQProperties, RocketMQMessageConverter rocketMQMessageConverter) {
- RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
- String nameServer = rocketMQProperties.getNameServer();
- String groupName = producerConfig.getGroup();
- Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
- Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
- String accessChannel = rocketMQProperties.getAccessChannel();
- String ak = rocketMQProperties.getProducer().getAccessKey();
- String sk = rocketMQProperties.getProducer().getSecretKey();
- boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
- String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
- DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
- producer.setNamesrvAddr(nameServer);
- if (!StringUtils.isEmpty(accessChannel)) {
- producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
- }
- producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
- producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
- producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
- producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
- producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
- producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
- producer.setInstanceName(producer.getProducerGroup() + DateUtil.now());
- RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
- rocketMQTemplate.setProducer(producer);
- rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
- return rocketMQTemplate;
- }
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = (ConfigurableApplicationContext) applicationContext;
- }
- @Override
- public void afterSingletonsInstantiated() {
- Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(MultipleRocketMQMessageListener.class)
- .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- beans.forEach(this::registerContainer);
- }
- private void registerContainer(String beanName, Object bean) {
- Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
- if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
- throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
- }
- if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
- throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
- }
- MultipleRocketMQMessageListener annotation = clazz.getAnnotation(MultipleRocketMQMessageListener.class);
- String topic = annotation.topic();
- String consumerGroup = annotation.consumerGroup();
- String soruce = annotation.soruce();
- MultipleRocketMQProperties rocketMQProperties = this.applicationContext.getBean(soruce, MultipleRocketMQProperties.class);
- boolean listenerEnabled =
- (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
- .getOrDefault(topic, true);
- if (!listenerEnabled) {
- log.debug(
- "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
- consumerGroup, topic);
- return;
- }
- validate(annotation);
- String containerBeanName = String.format("%s_%s_%s", topic, consumerGroup,
- counter.incrementAndGet());
- GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
- genericApplicationContext.registerBean(containerBeanName, MultipleRocketMQListenerContainer.class,
- () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
- MultipleRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
- MultipleRocketMQListenerContainer.class);
- if (!container.isRunning()) {
- try {
- container.getConsumer().setInstanceName(containerBeanName);
- container.getConsumer().setNamesrvAddr(rocketMQProperties.getNameServer());
- container.start();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- private MultipleRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
- MultipleRocketMQMessageListener annotation) {
- MultipleRocketMQListenerContainer container = new MultipleRocketMQListenerContainer();
- container.setRocketMQMessageListener(annotation);
- String soruce = annotation.soruce();
- if (StringUtils.isEmpty(soruce)) {
- throw new RuntimeException(name + " 未指定数据源");
- }
- MultipleRocketMQProperties rocketMQProperties = this.applicationContext.getBean(soruce, MultipleRocketMQProperties.class);
- container.setAccessChannel(AccessChannel.CLOUD);
- container.setTopic(annotation.topic());
- container.setNameServer(rocketMQProperties.getNameServer());
- String tags = annotation.selectorExpression();
- if (!StringUtils.isEmpty(tags)) {
- container.setSelectorExpression(tags);
- }
- container.setConsumerGroup(annotation.consumerGroup());
- if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
- container.setRocketMQListener((RocketMQListener) bean);
- } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
- container.setRocketMQReplyListener((RocketMQReplyListener) bean);
- }
- container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
- return container;
- }
- private void validate(MultipleRocketMQMessageListener annotation) {
- if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
- annotation.messageModel() == MessageModel.BROADCASTING) {
- throw new BeanDefinitionValidationException(
- "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
- }
- }
- }
复制代码 步调六,准备多数据源listener- package com.heit.road.web.listener;
- import com.heit.road.web.config.MultipleRocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.stereotype.Component;
- @Component
- @ConditionalOnProperty(prefix = "multiplerocketmq.road", value = {"name-server"})
- @MultipleRocketMQMessageListener(soruce = "road", consumerGroup = "RoadRocketMqlistener6", topic = "road_test")
- public class RoadRocketMqlistener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("RoadRocketMqlistener 收到信息==》 " + message);
- }
- }
复制代码- package com.heit.road.web.listener;
- import com.heit.road.web.config.MultipleRocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.stereotype.Component;
- @Component
- @ConditionalOnProperty(prefix = "multiplerocketmq.xdt", value = {"name-server"})
- @MultipleRocketMQMessageListener(soruce = "xdt", consumerGroup = "XdtRocketMqlistener6", topic = "road_test")
- public class XdtRocketMqlistener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("XdtRocketMqlistener 收到信息==》 " + message);
- }
- }
复制代码 步调七,编写测试方法,多数据源发送数据- // @Qualifier("xdtmq")
- @Lazy
- @Resource
- private RocketMQTemplate xdtmq;
- // @Qualifier("roadmq")
- @Lazy
- @Resource
- private RocketMQTemplate roadmq;
- @ApiOperation("测试用表-MQ信息测试")
- @PostMapping("sendTest")
- public BaseBack<?> sendTest() {
- xdtmq.asyncSend("road_test", "xdtmq123", new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println("尝试xdt================================================");
- System.out.println("信息发送成功");
- System.out.println("sendResult = " + sendResult);
- }
- @Override
- public void onException(Throwable e) {
- System.out.println("尝试xdt================================================");
- System.out.println("信息发送失败");
- e.printStackTrace();
- }
- });
- roadmq.asyncSend("road_test", "roadmq123", new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println("尝试road================================================");
- System.out.println("信息发送成功");
- System.out.println("sendResult = " + sendResult);
- }
- @Override
- public void onException(Throwable e) {
- System.out.println("尝试road================================================");
- System.out.println("信息发送失败");
- e.printStackTrace();
- }
- });
- return null;
- }
复制代码 效果如下:

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |