springboot环境下的rokectMQ多数据源实现

打印 上一主题 下一主题

主题 889|帖子 889|积分 2667

业务原因,必要在一个项目中与多方MQ举行业务通信;
步调一,复制一份RocketMQProperties配置文件,制止与原来的冲突
  1. package com.heit.road.web.config;
  2. import org.apache.rocketmq.common.topic.TopicValidator;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. public class MultipleRocketMQProperties {
  6.     /**
  7.      * The name server for rocketMQ, formats: `host:port;host:port`.
  8.      */
  9.     private String nameServer;
  10.     /**
  11.      * Enum type for accessChannel, values: LOCAL, CLOUD
  12.      */
  13.     private String accessChannel;
  14.     private org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer producer;
  15.     /**
  16.      * Configure enable listener or not.
  17.      * In some particular cases, if you don't want the the listener is enabled when container startup,
  18.      * the configuration pattern is like this :
  19.      * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
  20.      * <p>
  21.      * the listener is enabled by default.
  22.      */
  23.     private org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer consumer = new org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer();
  24.     public String getNameServer() {
  25.         return nameServer;
  26.     }
  27.     public void setNameServer(String nameServer) {
  28.         this.nameServer = nameServer;
  29.     }
  30.     public String getAccessChannel() {
  31.         return accessChannel;
  32.     }
  33.     public void setAccessChannel(String accessChannel) {
  34.         this.accessChannel = accessChannel;
  35.     }
  36.     public org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer getProducer() {
  37.         return producer;
  38.     }
  39.     public void setProducer(org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer producer) {
  40.         this.producer = producer;
  41.     }
  42.     public static class Producer {
  43.         /**
  44.          * Group name of producer.
  45.          */
  46.         private String group;
  47.         /**
  48.          * Millis of send message timeout.
  49.          */
  50.         private int sendMessageTimeout = 3000;
  51.         /**
  52.          * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
  53.          */
  54.         private int compressMessageBodyThreshold = 1024 * 4;
  55.         /**
  56.          * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
  57.          * This may potentially cause message duplication which is up to application developers to resolve.
  58.          */
  59.         private int retryTimesWhenSendFailed = 2;
  60.         /**
  61.          * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
  62.          * This may potentially cause message duplication which is up to application developers to resolve.
  63.          */
  64.         private int retryTimesWhenSendAsyncFailed = 2;
  65.         /**
  66.          * Indicate whether to retry another broker on sending failure internally.
  67.          */
  68.         private boolean retryNextServer = false;
  69.         /**
  70.          * Maximum allowed message size in bytes.
  71.          */
  72.         private int maxMessageSize = 1024 * 1024 * 4;
  73.         /**
  74.          * The property of "access-key".
  75.          */
  76.         private String accessKey;
  77.         /**
  78.          * The property of "secret-key".
  79.          */
  80.         private String secretKey;
  81.         /**
  82.          * Switch flag instance for message trace.
  83.          */
  84.         private boolean enableMsgTrace = true;
  85.         /**
  86.          * The name value of message trace topic.If you don't config,you can use the default trace topic name.
  87.          */
  88.         private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
  89.         public String getGroup() {
  90.             return group;
  91.         }
  92.         public void setGroup(String group) {
  93.             this.group = group;
  94.         }
  95.         public int getSendMessageTimeout() {
  96.             return sendMessageTimeout;
  97.         }
  98.         public void setSendMessageTimeout(int sendMessageTimeout) {
  99.             this.sendMessageTimeout = sendMessageTimeout;
  100.         }
  101.         public int getCompressMessageBodyThreshold() {
  102.             return compressMessageBodyThreshold;
  103.         }
  104.         public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
  105.             this.compressMessageBodyThreshold = compressMessageBodyThreshold;
  106.         }
  107.         public int getRetryTimesWhenSendFailed() {
  108.             return retryTimesWhenSendFailed;
  109.         }
  110.         public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
  111.             this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
  112.         }
  113.         public int getRetryTimesWhenSendAsyncFailed() {
  114.             return retryTimesWhenSendAsyncFailed;
  115.         }
  116.         public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
  117.             this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
  118.         }
  119.         public boolean isRetryNextServer() {
  120.             return retryNextServer;
  121.         }
  122.         public void setRetryNextServer(boolean retryNextServer) {
  123.             this.retryNextServer = retryNextServer;
  124.         }
  125.         public int getMaxMessageSize() {
  126.             return maxMessageSize;
  127.         }
  128.         public void setMaxMessageSize(int maxMessageSize) {
  129.             this.maxMessageSize = maxMessageSize;
  130.         }
  131.         public String getAccessKey() {
  132.             return accessKey;
  133.         }
  134.         public void setAccessKey(String accessKey) {
  135.             this.accessKey = accessKey;
  136.         }
  137.         public String getSecretKey() {
  138.             return secretKey;
  139.         }
  140.         public void setSecretKey(String secretKey) {
  141.             this.secretKey = secretKey;
  142.         }
  143.         public boolean isEnableMsgTrace() {
  144.             return enableMsgTrace;
  145.         }
  146.         public void setEnableMsgTrace(boolean enableMsgTrace) {
  147.             this.enableMsgTrace = enableMsgTrace;
  148.         }
  149.         public String getCustomizedTraceTopic() {
  150.             return customizedTraceTopic;
  151.         }
  152.         public void setCustomizedTraceTopic(String customizedTraceTopic) {
  153.             this.customizedTraceTopic = customizedTraceTopic;
  154.         }
  155.     }
  156.     public org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer getConsumer() {
  157.         return consumer;
  158.     }
  159.     public void setConsumer(org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer consumer) {
  160.         this.consumer = consumer;
  161.     }
  162.     public static final class Consumer {
  163.         /**
  164.          * listener configuration container
  165.          * the pattern is like this:
  166.          * group1.topic1 = false
  167.          * group2.topic2 = true
  168.          * group3.topic3 = false
  169.          */
  170.         private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
  171.         public Map<String, Map<String, Boolean>> getListeners() {
  172.             return listeners;
  173.         }
  174.         public void setListeners(Map<String, Map<String, Boolean>> listeners) {
  175.             this.listeners = listeners;
  176.         }
  177.     }
  178. }
复制代码
步调二,复制一份@RocketMQMessageListener,并新增数据源参数soruce
,这里不接纳原来的nameServer参数,大概是版本原因,这个参数现在并不支持多数据源
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements.  See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License.  You may obtain a copy of the License at
  8. *
  9. *     http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package com.heit.road.web.config;
  18. import org.apache.rocketmq.spring.annotation.ConsumeMode;
  19. import org.apache.rocketmq.spring.annotation.MessageModel;
  20. import org.apache.rocketmq.spring.annotation.SelectorType;
  21. import java.lang.annotation.*;
  22. @Target(ElementType.TYPE)
  23. @Retention(RetentionPolicy.RUNTIME)
  24. @Documented
  25. public @interface MultipleRocketMQMessageListener {
  26.     String NAME_SERVER_PLACEHOLDER = "";
  27.     String ACCESS_KEY_PLACEHOLDER = "";
  28.     String SECRET_KEY_PLACEHOLDER = "";
  29.     String TRACE_TOPIC_PLACEHOLDER = "";
  30.     String ACCESS_CHANNEL_PLACEHOLDER = "";
  31.     /**
  32.      * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
  33.      * load balance. It's required and needs to be globally unique.
  34.      * <p>
  35.      * <p>
  36.      * See <a target="_blank" href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
  37.      */
  38.     String soruce();
  39.     String consumerGroup();
  40.     /**
  41.      * Topic name.
  42.      */
  43.     String topic();
  44.     /**
  45.      * Control how to selector message.
  46.      *
  47.      * @see SelectorType
  48.      */
  49.     SelectorType selectorType() default SelectorType.TAG;
  50.     /**
  51.      * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
  52.      */
  53.     String selectorExpression() default "*";
  54.     /**
  55.      * Control consume mode, you can choice receive message concurrently or orderly.
  56.      */
  57.     ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
  58.     /**
  59.      * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
  60.      */
  61.     MessageModel messageModel() default MessageModel.CLUSTERING;
  62.     /**
  63.      * Max consumer thread number.
  64.      */
  65.     int consumeThreadMax() default 64;
  66.     /**
  67.      * Maximum amount of time in minutes a message may block the consuming thread.
  68.      */
  69.     long consumeTimeout() default 15L;
  70.     /**
  71.      * The property of "access-key".
  72.      */
  73.     String accessKey() default ACCESS_KEY_PLACEHOLDER;
  74.     /**
  75.      * The property of "secret-key".
  76.      */
  77.     String secretKey() default SECRET_KEY_PLACEHOLDER;
  78.     /**
  79.      * Switch flag instance for message trace.
  80.      */
  81.     boolean enableMsgTrace() default true;
  82.     /**
  83.      * The name value of message trace topic.If you don't config,you can use the default trace topic name.
  84.      */
  85.     String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
  86.     /**
  87.      * The property of "name-server".
  88.      */
  89.     String nameServer() default NAME_SERVER_PLACEHOLDER;
  90.     /**
  91.      * The property of "access-channel".
  92.      */
  93.     String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
  94. }
复制代码
步调三,复制一份RocketMQListenerContainer,替换参数RocketMQMessageListener
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements.  See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License.  You may obtain a copy of the License at
  8. *
  9. *     http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package com.heit.road.web.config;
  18. import org.apache.rocketmq.client.AccessChannel;
  19. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  20. import org.apache.rocketmq.client.consumer.MessageSelector;
  21. import org.apache.rocketmq.client.consumer.listener.*;
  22. import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
  23. import org.apache.rocketmq.client.exception.MQClientException;
  24. import org.apache.rocketmq.client.producer.SendCallback;
  25. import org.apache.rocketmq.client.producer.SendResult;
  26. import org.apache.rocketmq.client.producer.SendStatus;
  27. import org.apache.rocketmq.client.utils.MessageUtil;
  28. import org.apache.rocketmq.common.message.MessageExt;
  29. import org.apache.rocketmq.remoting.RPCHook;
  30. import org.apache.rocketmq.remoting.exception.RemotingException;
  31. import org.apache.rocketmq.spring.annotation.ConsumeMode;
  32. import org.apache.rocketmq.spring.annotation.MessageModel;
  33. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  34. import org.apache.rocketmq.spring.annotation.SelectorType;
  35. import org.apache.rocketmq.spring.core.RocketMQListener;
  36. import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
  37. import org.apache.rocketmq.spring.core.RocketMQReplyListener;
  38. import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
  39. import org.apache.rocketmq.spring.support.RocketMQUtil;
  40. import org.slf4j.Logger;
  41. import org.slf4j.LoggerFactory;
  42. import org.springframework.aop.framework.AopProxyUtils;
  43. import org.springframework.beans.BeansException;
  44. import org.springframework.beans.factory.InitializingBean;
  45. import org.springframework.context.ApplicationContext;
  46. import org.springframework.context.ApplicationContextAware;
  47. import org.springframework.context.SmartLifecycle;
  48. import org.springframework.core.MethodParameter;
  49. import org.springframework.messaging.Message;
  50. import org.springframework.messaging.MessageHeaders;
  51. import org.springframework.messaging.converter.MessageConversionException;
  52. import org.springframework.messaging.converter.MessageConverter;
  53. import org.springframework.messaging.converter.SmartMessageConverter;
  54. import org.springframework.messaging.support.MessageBuilder;
  55. import org.springframework.util.Assert;
  56. import org.springframework.util.MimeTypeUtils;
  57. import java.lang.reflect.Method;
  58. import java.lang.reflect.ParameterizedType;
  59. import java.lang.reflect.Type;
  60. import java.nio.charset.Charset;
  61. import java.util.List;
  62. import java.util.Objects;
  63. @SuppressWarnings("WeakerAccess")
  64. public class MultipleRocketMQListenerContainer implements InitializingBean,
  65.         RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
  66.     private final static Logger log = LoggerFactory.getLogger(MultipleRocketMQListenerContainer.class);
  67.     private ApplicationContext applicationContext;
  68.     /**
  69.      * The name of the DefaultRocketMQListenerContainer instance
  70.      */
  71.     private String name;
  72.     private long suspendCurrentQueueTimeMillis = 1000;
  73.     /**
  74.      * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
  75.      * >0,client control retry frequency.
  76.      */
  77.     private int delayLevelWhenNextConsume = 0;
  78.     private String nameServer;
  79.     private AccessChannel accessChannel = AccessChannel.LOCAL;
  80.     private String consumerGroup;
  81.     private String topic;
  82.     private int consumeThreadMax = 64;
  83.     private String charset = "UTF-8";
  84.     private MessageConverter messageConverter;
  85.     private RocketMQListener rocketMQListener;
  86.     private RocketMQReplyListener rocketMQReplyListener;
  87.     private MultipleRocketMQMessageListener rocketMQMessageListener;
  88.     private DefaultMQPushConsumer consumer;
  89.     private Type messageType;
  90.     private MethodParameter methodParameter;
  91.     private boolean running;
  92.     // The following properties came from @RocketMQMessageListener.
  93.     private ConsumeMode consumeMode;
  94.     private SelectorType selectorType;
  95.     private String selectorExpression;
  96.     private MessageModel messageModel;
  97.     private long consumeTimeout;
  98.     public long getSuspendCurrentQueueTimeMillis() {
  99.         return suspendCurrentQueueTimeMillis;
  100.     }
  101.     public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
  102.         this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
  103.     }
  104.     public int getDelayLevelWhenNextConsume() {
  105.         return delayLevelWhenNextConsume;
  106.     }
  107.     public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
  108.         this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
  109.     }
  110.     public String getNameServer() {
  111.         return nameServer;
  112.     }
  113.     public void setNameServer(String nameServer) {
  114.         this.nameServer = nameServer;
  115.     }
  116.     public AccessChannel getAccessChannel() {
  117.         return accessChannel;
  118.     }
  119.     public void setAccessChannel(AccessChannel accessChannel) {
  120.         this.accessChannel = accessChannel;
  121.     }
  122.     public String getConsumerGroup() {
  123.         return consumerGroup;
  124.     }
  125.     public void setConsumerGroup(String consumerGroup) {
  126.         this.consumerGroup = consumerGroup;
  127.     }
  128.     public String getTopic() {
  129.         return topic;
  130.     }
  131.     public void setTopic(String topic) {
  132.         this.topic = topic;
  133.     }
  134.     public int getConsumeThreadMax() {
  135.         return consumeThreadMax;
  136.     }
  137.     public String getCharset() {
  138.         return charset;
  139.     }
  140.     public void setCharset(String charset) {
  141.         this.charset = charset;
  142.     }
  143.     public MessageConverter getMessageConverter() {
  144.         return messageConverter;
  145.     }
  146.     public MultipleRocketMQListenerContainer setMessageConverter(MessageConverter messageConverter) {
  147.         this.messageConverter = messageConverter;
  148.         return this;
  149.     }
  150.     public RocketMQListener getRocketMQListener() {
  151.         return rocketMQListener;
  152.     }
  153.     public void setRocketMQListener(RocketMQListener rocketMQListener) {
  154.         this.rocketMQListener = rocketMQListener;
  155.     }
  156.     public RocketMQReplyListener getRocketMQReplyListener() {
  157.         return rocketMQReplyListener;
  158.     }
  159.     public void setRocketMQReplyListener(RocketMQReplyListener rocketMQReplyListener) {
  160.         this.rocketMQReplyListener = rocketMQReplyListener;
  161.     }
  162.     public MultipleRocketMQMessageListener getRocketMQMessageListener() {
  163.         return rocketMQMessageListener;
  164.     }
  165.     public void setRocketMQMessageListener(MultipleRocketMQMessageListener anno) {
  166.         this.rocketMQMessageListener = anno;
  167.         this.consumeMode = anno.consumeMode();
  168.         this.consumeThreadMax = anno.consumeThreadMax();
  169.         this.messageModel = anno.messageModel();
  170.         this.selectorType = anno.selectorType();
  171.         this.selectorExpression = anno.selectorExpression();
  172.         this.consumeTimeout = anno.consumeTimeout();
  173.     }
  174.     public ConsumeMode getConsumeMode() {
  175.         return consumeMode;
  176.     }
  177.     public SelectorType getSelectorType() {
  178.         return selectorType;
  179.     }
  180.     public void setSelectorExpression(String selectorExpression) {
  181.         this.selectorExpression = selectorExpression;
  182.     }
  183.     public String getSelectorExpression() {
  184.         return selectorExpression;
  185.     }
  186.     public MessageModel getMessageModel() {
  187.         return messageModel;
  188.     }
  189.     public DefaultMQPushConsumer getConsumer() {
  190.         return consumer;
  191.     }
  192.     public void setConsumer(DefaultMQPushConsumer consumer) {
  193.         this.consumer = consumer;
  194.     }
  195.     @Override
  196.     public void destroy() {
  197.         this.setRunning(false);
  198.         if (Objects.nonNull(consumer)) {
  199.             consumer.shutdown();
  200.         }
  201.         log.info("container destroyed, {}", this.toString());
  202.     }
  203.     @Override
  204.     public boolean isAutoStartup() {
  205.         return true;
  206.     }
  207.     @Override
  208.     public void stop(Runnable callback) {
  209.         stop();
  210.         callback.run();
  211.     }
  212.     @Override
  213.     public void start() {
  214.         if (this.isRunning()) {
  215.             throw new IllegalStateException("container already running. " + this.toString());
  216.         }
  217.         try {
  218.             consumer.start();
  219.         } catch (MQClientException e) {
  220.             throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
  221.         }
  222.         this.setRunning(true);
  223.         log.info("running container: {}", this.toString());
  224.     }
  225.     @Override
  226.     public void stop() {
  227.         if (this.isRunning()) {
  228.             if (Objects.nonNull(consumer)) {
  229.                 consumer.shutdown();
  230.             }
  231.             setRunning(false);
  232.         }
  233.     }
  234.     @Override
  235.     public boolean isRunning() {
  236.         return running;
  237.     }
  238.     private void setRunning(boolean running) {
  239.         this.running = running;
  240.     }
  241.     @Override
  242.     public int getPhase() {
  243.         // Returning Integer.MAX_VALUE only suggests that
  244.         // we will be the first bean to shutdown and last bean to start
  245.         return Integer.MAX_VALUE;
  246.     }
  247.     @Override
  248.     public void afterPropertiesSet() throws Exception {
  249.         initRocketMQPushConsumer();
  250.         this.messageType = getMessageType();
  251.         this.methodParameter = getMethodParameter();
  252.         log.debug("RocketMQ messageType: {}", messageType);
  253.     }
  254.     @Override
  255.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  256.         this.applicationContext = applicationContext;
  257.     }
  258.     @Override
  259.     public String toString() {
  260.         return "DefaultRocketMQListenerContainer{" +
  261.             "consumerGroup='" + consumerGroup + '\'' +
  262.             ", nameServer='" + nameServer + '\'' +
  263.             ", topic='" + topic + '\'' +
  264.             ", consumeMode=" + consumeMode +
  265.             ", selectorType=" + selectorType +
  266.             ", selectorExpression='" + selectorExpression + '\'' +
  267.             ", messageModel=" + messageModel +
  268.             '}';
  269.     }
  270.     public void setName(String name) {
  271.         this.name = name;
  272.     }
  273.     public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
  274.         @SuppressWarnings("unchecked")
  275.         @Override
  276.         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  277.             for (MessageExt messageExt : msgs) {
  278.                 log.debug("received msg: {}", messageExt);
  279.                 try {
  280.                     long now = System.currentTimeMillis();
  281.                     handleMessage(messageExt);
  282.                     long costTime = System.currentTimeMillis() - now;
  283.                     log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  284.                 } catch (Exception e) {
  285.                     log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
  286.                     context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
  287.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  288.                 }
  289.             }
  290.             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  291.         }
  292.     }
  293.     public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
  294.         @SuppressWarnings("unchecked")
  295.         @Override
  296.         public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  297.             for (MessageExt messageExt : msgs) {
  298.                 log.debug("received msg: {}", messageExt);
  299.                 try {
  300.                     long now = System.currentTimeMillis();
  301.                     handleMessage(messageExt);
  302.                     long costTime = System.currentTimeMillis() - now;
  303.                     log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  304.                 } catch (Exception e) {
  305.                     log.warn("consume message failed. messageExt:{}", messageExt, e);
  306.                     context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
  307.                     return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  308.                 }
  309.             }
  310.             return ConsumeOrderlyStatus.SUCCESS;
  311.         }
  312.     }
  313.     private void handleMessage(
  314.         MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
  315.         if (rocketMQListener != null) {
  316.             rocketMQListener.onMessage(doConvertMessage(messageExt));
  317.         } else if (rocketMQReplyListener != null) {
  318.             Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
  319.             Message<?> message = MessageBuilder.withPayload(replyContent).build();
  320.             org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
  321.             consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
  322.                 @Override public void onSuccess(SendResult sendResult) {
  323.                     if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  324.                         log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
  325.                     } else {
  326.                         log.info("Consumer replies message success.");
  327.                     }
  328.                 }
  329.                 @Override public void onException(Throwable e) {
  330.                     log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
  331.                 }
  332.             });
  333.         }
  334.     }
  335.     private byte[] convertToBytes(Message<?> message) {
  336.         Message<?> messageWithSerializedPayload = doConvert(message.getPayload(), message.getHeaders());
  337.         Object payloadObj = messageWithSerializedPayload.getPayload();
  338.         byte[] payloads;
  339.         try {
  340.             if (null == payloadObj) {
  341.                 throw new RuntimeException("the message cannot be empty");
  342.             }
  343.             if (payloadObj instanceof String) {
  344.                 payloads = ((String) payloadObj).getBytes(Charset.forName(charset));
  345.             } else if (payloadObj instanceof byte[]) {
  346.                 payloads = (byte[]) messageWithSerializedPayload.getPayload();
  347.             } else {
  348.                 String jsonObj = (String) this.messageConverter.fromMessage(messageWithSerializedPayload, payloadObj.getClass());
  349.                 if (null == jsonObj) {
  350.                     throw new RuntimeException(String.format(
  351.                         "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
  352.                         this.messageConverter.getClass(), payloadObj.getClass(), payloadObj));
  353.                 }
  354.                 payloads = jsonObj.getBytes(Charset.forName(charset));
  355.             }
  356.         } catch (Exception e) {
  357.             throw new RuntimeException("convert to bytes failed.", e);
  358.         }
  359.         return payloads;
  360.     }
  361.     private Message<?> doConvert(Object payload, MessageHeaders headers) {
  362.         Message<?> message = this.messageConverter instanceof SmartMessageConverter ?
  363.             ((SmartMessageConverter) this.messageConverter).toMessage(payload, headers, null) :
  364.             this.messageConverter.toMessage(payload, headers);
  365.         if (message == null) {
  366.             String payloadType = payload.getClass().getName();
  367.             Object contentType = headers != null ? headers.get(MessageHeaders.CONTENT_TYPE) : null;
  368.             throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
  369.                 "', contentType='" + contentType + "', converter=[" + this.messageConverter + "]");
  370.         }
  371.         MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
  372.         builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
  373.         return builder.build();
  374.     }
  375.     @SuppressWarnings("unchecked")
  376.     private Object doConvertMessage(MessageExt messageExt) {
  377.         if (Objects.equals(messageType, MessageExt.class)) {
  378.             return messageExt;
  379.         } else {
  380.             String str = new String(messageExt.getBody(), Charset.forName(charset));
  381.             if (Objects.equals(messageType, String.class)) {
  382.                 return str;
  383.             } else {
  384.                 // If msgType not string, use objectMapper change it.
  385.                 try {
  386.                     if (messageType instanceof Class) {
  387.                         //if the messageType has not Generic Parameter
  388.                         return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType);
  389.                     } else {
  390.                         //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
  391.                         //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
  392.                         return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
  393.                     }
  394.                 } catch (Exception e) {
  395.                     log.info("convert failed. str:{}, msgType:{}", str, messageType);
  396.                     throw new RuntimeException("cannot convert message to " + messageType, e);
  397.                 }
  398.             }
  399.         }
  400.     }
  401.     private MethodParameter getMethodParameter() {
  402.         Class<?> targetClass;
  403.         if (rocketMQListener != null) {
  404.             targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
  405.         } else {
  406.             targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
  407.         }
  408.         Type messageType = this.getMessageType();
  409.         Class clazz = null;
  410.         if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
  411.             clazz = (Class) ((ParameterizedType) messageType).getRawType();
  412.         } else if (messageType instanceof Class) {
  413.             clazz = (Class) messageType;
  414.         } else {
  415.             throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
  416.         }
  417.         try {
  418.             final Method method = targetClass.getMethod("onMessage", clazz);
  419.             return new MethodParameter(method, 0);
  420.         } catch (NoSuchMethodException e) {
  421.             e.printStackTrace();
  422.             throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
  423.         }
  424.     }
  425.     private Type getMessageType() {
  426.         Class<?> targetClass;
  427.         if (rocketMQListener != null) {
  428.             targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
  429.         } else {
  430.             targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
  431.         }
  432.         Type matchedGenericInterface = null;
  433.         while (Objects.nonNull(targetClass)) {
  434.             Type[] interfaces = targetClass.getGenericInterfaces();
  435.             if (Objects.nonNull(interfaces)) {
  436.                 for (Type type : interfaces) {
  437.                     if (type instanceof ParameterizedType &&
  438.                         (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) {
  439.                         matchedGenericInterface = type;
  440.                         break;
  441.                     }
  442.                 }
  443.             }
  444.             targetClass = targetClass.getSuperclass();
  445.         }
  446.         if (Objects.isNull(matchedGenericInterface)) {
  447.             return Object.class;
  448.         }
  449.         Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
  450.         if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
  451.             return actualTypeArguments[0];
  452.         }
  453.         return Object.class;
  454.     }
  455.     private void initRocketMQPushConsumer() throws MQClientException {
  456.         if (rocketMQListener == null && rocketMQReplyListener == null) {
  457.             throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
  458.         }
  459.         Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
  460.         Assert.notNull(nameServer, "Property 'nameServer' is required");
  461.         Assert.notNull(topic, "Property 'topic' is required");
  462.         RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
  463.             this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
  464.         boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
  465.         if (Objects.nonNull(rpcHook)) {
  466.             consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
  467.                 enableMsgTrace, this.applicationContext.getEnvironment().
  468.                 resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
  469.             consumer.setVipChannelEnabled(false);
  470.             consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
  471.         } else {
  472.             log.debug("Access-key or secret-key not configure in " + this + ".");
  473.             consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
  474.                 this.applicationContext.getEnvironment().
  475.                     resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
  476.         }
  477.         String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
  478.         if (customizedNameServer != null) {
  479.             consumer.setNamesrvAddr(customizedNameServer);
  480.         } else {
  481.             consumer.setNamesrvAddr(nameServer);
  482.         }
  483.         if (accessChannel != null) {
  484.             consumer.setAccessChannel(accessChannel);
  485.         }
  486.         consumer.setConsumeThreadMax(consumeThreadMax);
  487.         if (consumeThreadMax < consumer.getConsumeThreadMin()) {
  488.             consumer.setConsumeThreadMin(consumeThreadMax);
  489.         }
  490.         consumer.setConsumeTimeout(consumeTimeout);
  491.         switch (messageModel) {
  492.             case BROADCASTING:
  493.                 consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
  494.                 break;
  495.             case CLUSTERING:
  496.                 consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
  497.                 break;
  498.             default:
  499.                 throw new IllegalArgumentException("Property 'messageModel' was wrong.");
  500.         }
  501.         switch (selectorType) {
  502.             case TAG:
  503.                 consumer.subscribe(topic, selectorExpression);
  504.                 break;
  505.             case SQL92:
  506.                 consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
  507.                 break;
  508.             default:
  509.                 throw new IllegalArgumentException("Property 'selectorType' was wrong.");
  510.         }
  511.         switch (consumeMode) {
  512.             case ORDERLY:
  513.                 consumer.setMessageListener(new DefaultMessageListenerOrderly());
  514.                 break;
  515.             case CONCURRENTLY:
  516.                 consumer.setMessageListener(new DefaultMessageListenerConcurrently());
  517.                 break;
  518.             default:
  519.                 throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
  520.         }
  521.         if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
  522.             ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
  523.         } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
  524.             ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
  525.         }
  526.     }
  527. }
复制代码
步调四,配置文件添加参数,与原rocketmq参数区分开
  1. multiplerocketmq:
  2.   xdt:
  3.     name-server: xdtIP:xdtPort
  4.     producer:
  5.       group: xdt_groups
  6.   road:
  7.     name-server: roadIP:roadPort
  8.     producer:
  9.       group: road_groups
复制代码
步调五,通过配置文件加载多数据源
  1. package com.heit.road.web.config;
  2. import cn.hutool.core.date.DateUtil;
  3. import org.apache.rocketmq.client.AccessChannel;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.spring.annotation.ConsumeMode;
  6. import org.apache.rocketmq.spring.annotation.MessageModel;
  7. import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
  8. import org.apache.rocketmq.spring.core.RocketMQListener;
  9. import org.apache.rocketmq.spring.core.RocketMQReplyListener;
  10. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  11. import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
  12. import org.apache.rocketmq.spring.support.RocketMQUtil;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.aop.framework.AopProxyUtils;
  16. import org.springframework.aop.scope.ScopedProxyUtils;
  17. import org.springframework.beans.BeansException;
  18. import org.springframework.beans.factory.SmartInitializingSingleton;
  19. import org.springframework.beans.factory.annotation.Qualifier;
  20. import org.springframework.beans.factory.support.BeanDefinitionValidationException;
  21. import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
  22. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  23. import org.springframework.boot.context.properties.ConfigurationProperties;
  24. import org.springframework.context.ApplicationContext;
  25. import org.springframework.context.ApplicationContextAware;
  26. import org.springframework.context.ConfigurableApplicationContext;
  27. import org.springframework.context.annotation.Bean;
  28. import org.springframework.context.annotation.Configuration;
  29. import org.springframework.context.support.GenericApplicationContext;
  30. import org.springframework.core.env.StandardEnvironment;
  31. import org.springframework.util.Assert;
  32. import org.springframework.util.StringUtils;
  33. import java.util.Collections;
  34. import java.util.Map;
  35. import java.util.concurrent.atomic.AtomicLong;
  36. import java.util.stream.Collectors;
  37. @Configuration
  38. public class MultipleRocketMQConfig implements ApplicationContextAware, SmartInitializingSingleton {
  39.     private final static Logger log = LoggerFactory.getLogger(MultipleRocketMQConfig.class);
  40.     private ConfigurableApplicationContext applicationContext;
  41.     private AtomicLong counter = new AtomicLong(0);
  42.     private StandardEnvironment environment;
  43.     private RocketMQMessageConverter rocketMQMessageConverter;
  44.     public MultipleRocketMQConfig(RocketMQMessageConverter rocketMQMessageConverter,
  45.                                   StandardEnvironment environment) {
  46.         this.rocketMQMessageConverter = rocketMQMessageConverter;
  47.         this.environment = environment;
  48.     }
  49.     @Bean("road")
  50.     @ConditionalOnProperty(prefix = "multiplerocketmq.road", value = {"name-server"})
  51.     @ConfigurationProperties(prefix = "multiplerocketmq.road")
  52.     public MultipleRocketMQProperties road() {
  53.         return new MultipleRocketMQProperties();
  54.     }
  55.     @Bean(value = "roadmq", destroyMethod = "destroy")
  56.     @ConditionalOnMissingBean(name = "roadmq")
  57.     @ConditionalOnProperty(prefix = "multiplerocketmq.road", value = {"name-server", "producer.group"})
  58.     public RocketMQTemplate roadMQProducer(@Qualifier("road") MultipleRocketMQProperties rocketMQProperties, RocketMQMessageConverter rocketMQMessageConverter) {
  59.         return createRocketMQTemplate(rocketMQProperties, rocketMQMessageConverter);
  60.     }
  61.     @Bean("xdt")
  62.     @ConditionalOnProperty(prefix = "multiplerocketmq.xdt", value = {"name-server"})
  63.     @ConfigurationProperties(prefix = "multiplerocketmq.xdt")
  64.     public MultipleRocketMQProperties xdt() {
  65.         return new MultipleRocketMQProperties();
  66.     }
  67.     @Bean(value = "xdtmq", destroyMethod = "destroy")
  68.     @ConditionalOnMissingBean(name = "xdtmq")
  69.     @ConditionalOnProperty(prefix = "multiplerocketmq.xdt", value = {"name-server", "producer.group"})
  70.     public RocketMQTemplate xdtMQProducer(@Qualifier("xdt") MultipleRocketMQProperties rocketMQProperties, RocketMQMessageConverter rocketMQMessageConverter) {
  71.         return createRocketMQTemplate(rocketMQProperties, rocketMQMessageConverter);
  72.     }
  73.     private RocketMQTemplate createRocketMQTemplate(MultipleRocketMQProperties rocketMQProperties, RocketMQMessageConverter rocketMQMessageConverter) {
  74.         RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
  75.         String nameServer = rocketMQProperties.getNameServer();
  76.         String groupName = producerConfig.getGroup();
  77.         Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
  78.         Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
  79.         String accessChannel = rocketMQProperties.getAccessChannel();
  80.         String ak = rocketMQProperties.getProducer().getAccessKey();
  81.         String sk = rocketMQProperties.getProducer().getSecretKey();
  82.         boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
  83.         String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
  84.         DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
  85.         producer.setNamesrvAddr(nameServer);
  86.         if (!StringUtils.isEmpty(accessChannel)) {
  87.             producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
  88.         }
  89.         producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
  90.         producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
  91.         producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
  92.         producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
  93.         producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
  94.         producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
  95.         producer.setInstanceName(producer.getProducerGroup() + DateUtil.now());
  96.         RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
  97.         rocketMQTemplate.setProducer(producer);
  98.         rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
  99.         return rocketMQTemplate;
  100.     }
  101.     @Override
  102.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  103.         this.applicationContext = (ConfigurableApplicationContext) applicationContext;
  104.     }
  105.     @Override
  106.     public void afterSingletonsInstantiated() {
  107.         Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(MultipleRocketMQMessageListener.class)
  108.                 .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
  109.                 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  110.         beans.forEach(this::registerContainer);
  111.     }
  112.     private void registerContainer(String beanName, Object bean) {
  113.         Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
  114.         if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
  115.             throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
  116.         }
  117.         if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
  118.             throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
  119.         }
  120.         MultipleRocketMQMessageListener annotation = clazz.getAnnotation(MultipleRocketMQMessageListener.class);
  121.         String topic = annotation.topic();
  122.         String consumerGroup = annotation.consumerGroup();
  123.         String soruce = annotation.soruce();
  124.         MultipleRocketMQProperties rocketMQProperties = this.applicationContext.getBean(soruce, MultipleRocketMQProperties.class);
  125.         boolean listenerEnabled =
  126.                 (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
  127.                         .getOrDefault(topic, true);
  128.         if (!listenerEnabled) {
  129.             log.debug(
  130.                     "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
  131.                     consumerGroup, topic);
  132.             return;
  133.         }
  134.         validate(annotation);
  135.         String containerBeanName = String.format("%s_%s_%s", topic, consumerGroup,
  136.                 counter.incrementAndGet());
  137.         GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
  138.         genericApplicationContext.registerBean(containerBeanName, MultipleRocketMQListenerContainer.class,
  139.                 () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
  140.         MultipleRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
  141.                 MultipleRocketMQListenerContainer.class);
  142.         if (!container.isRunning()) {
  143.             try {
  144.                 container.getConsumer().setInstanceName(containerBeanName);
  145.                 container.getConsumer().setNamesrvAddr(rocketMQProperties.getNameServer());
  146.                 container.start();
  147.             } catch (Exception e) {
  148.                 throw new RuntimeException(e);
  149.             }
  150.         }
  151.     }
  152.     private MultipleRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
  153.                                                                               MultipleRocketMQMessageListener annotation) {
  154.         MultipleRocketMQListenerContainer container = new MultipleRocketMQListenerContainer();
  155.         container.setRocketMQMessageListener(annotation);
  156.         String soruce = annotation.soruce();
  157.         if (StringUtils.isEmpty(soruce)) {
  158.             throw new RuntimeException(name + " 未指定数据源");
  159.         }
  160.         MultipleRocketMQProperties rocketMQProperties = this.applicationContext.getBean(soruce, MultipleRocketMQProperties.class);
  161.         container.setAccessChannel(AccessChannel.CLOUD);
  162.         container.setTopic(annotation.topic());
  163.         container.setNameServer(rocketMQProperties.getNameServer());
  164.         String tags = annotation.selectorExpression();
  165.         if (!StringUtils.isEmpty(tags)) {
  166.             container.setSelectorExpression(tags);
  167.         }
  168.         container.setConsumerGroup(annotation.consumerGroup());
  169.         if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
  170.             container.setRocketMQListener((RocketMQListener) bean);
  171.         } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
  172.             container.setRocketMQReplyListener((RocketMQReplyListener) bean);
  173.         }
  174.         container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
  175.         return container;
  176.     }
  177.     private void validate(MultipleRocketMQMessageListener annotation) {
  178.         if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
  179.                 annotation.messageModel() == MessageModel.BROADCASTING) {
  180.             throw new BeanDefinitionValidationException(
  181.                     "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
  182.         }
  183.     }
  184. }
复制代码
步调六,准备多数据源listener
  1. package com.heit.road.web.listener;
  2. import com.heit.road.web.config.MultipleRocketMQMessageListener;
  3. import org.apache.rocketmq.spring.core.RocketMQListener;
  4. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @ConditionalOnProperty(prefix = "multiplerocketmq.road", value = {"name-server"})
  8. @MultipleRocketMQMessageListener(soruce = "road", consumerGroup = "RoadRocketMqlistener6", topic = "road_test")
  9. public class RoadRocketMqlistener implements RocketMQListener<String> {
  10.     @Override
  11.     public void onMessage(String message) {
  12.         System.out.println("RoadRocketMqlistener 收到信息==》 " + message);
  13.     }
  14. }
复制代码
  1. package com.heit.road.web.listener;
  2. import com.heit.road.web.config.MultipleRocketMQMessageListener;
  3. import org.apache.rocketmq.spring.core.RocketMQListener;
  4. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @ConditionalOnProperty(prefix = "multiplerocketmq.xdt", value = {"name-server"})
  8. @MultipleRocketMQMessageListener(soruce = "xdt", consumerGroup = "XdtRocketMqlistener6", topic = "road_test")
  9. public class XdtRocketMqlistener implements RocketMQListener<String> {
  10.     @Override
  11.     public void onMessage(String message) {
  12.         System.out.println("XdtRocketMqlistener 收到信息==》 " + message);
  13.     }
  14. }
复制代码
步调七,编写测试方法,多数据源发送数据
  1. //    @Qualifier("xdtmq")
  2.     @Lazy
  3.     @Resource
  4.     private RocketMQTemplate xdtmq;
  5. //    @Qualifier("roadmq")
  6.     @Lazy
  7.     @Resource
  8.     private RocketMQTemplate roadmq;
  9.     @ApiOperation("测试用表-MQ信息测试")
  10.     @PostMapping("sendTest")
  11.     public BaseBack<?> sendTest() {
  12.         xdtmq.asyncSend("road_test", "xdtmq123", new SendCallback() {
  13.             @Override
  14.             public void onSuccess(SendResult sendResult) {
  15.                 System.out.println("尝试xdt================================================");
  16.                 System.out.println("信息发送成功");
  17.                 System.out.println("sendResult = " + sendResult);
  18.             }
  19.             @Override
  20.             public void onException(Throwable e) {
  21.                 System.out.println("尝试xdt================================================");
  22.                 System.out.println("信息发送失败");
  23.                 e.printStackTrace();
  24.             }
  25.         });
  26.         roadmq.asyncSend("road_test", "roadmq123", new SendCallback() {
  27.             @Override
  28.             public void onSuccess(SendResult sendResult) {
  29.                 System.out.println("尝试road================================================");
  30.                 System.out.println("信息发送成功");
  31.                 System.out.println("sendResult = " + sendResult);
  32.             }
  33.             @Override
  34.             public void onException(Throwable e) {
  35.                 System.out.println("尝试road================================================");
  36.                 System.out.println("信息发送失败");
  37.                 e.printStackTrace();
  38.             }
  39.         });
  40.         return null;
  41.     }
复制代码
效果如下:

 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

吴旭华

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表