ToB企服应用市场:ToB评测及商务社交产业平台

标题: springboot kafka多数据源,通过配置动态加载发送者和消费者 [打印本页]

作者: 莫张周刘王    时间: 2024-10-31 11:35
标题: springboot kafka多数据源,通过配置动态加载发送者和消费者
前言

最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是假如需要不修改代码扩展呢,由于kafka本身不处置惩罚额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。
准备test

kafka本身非常容易上手,假如我们需要单位测试,引入jar依赖,JDK利用1.8,当然也可以利用JDK17
  1.     <dependencies>
  2.         <dependency>
  3.             <groupId>org.springframework.boot</groupId>
  4.             <artifactId>spring-boot-starter-web</artifactId>
  5.             <version>2.7.17</version>
  6.         </dependency>
  7.         <dependency>
  8.             <groupId>org.springframework.boot</groupId>
  9.             <artifactId>spring-boot-starter-test</artifactId>
  10.             <version>2.7.17</version>
  11.             <scope>test</scope>
  12.         </dependency>
  13.         <dependency>
  14.             <groupId>org.springframework.kafka</groupId>
  15.             <artifactId>spring-kafka</artifactId>
  16.             <version>2.9.13</version>
  17.         </dependency>
  18.         <dependency>
  19.             <groupId>org.springframework.kafka</groupId>
  20.             <artifactId>spring-kafka-test</artifactId>
  21.             <version>2.9.13</version>
  22.             <scope>test</scope>
  23.         </dependency>
  24.         <!-- https://mvnrepository.com/artifact/org.testcontainers/kafka -->
  25.         <dependency>
  26.             <groupId>org.testcontainers</groupId>
  27.             <artifactId>kafka</artifactId>
  28.             <version>1.20.1</version>
  29.             <scope>test</scope>
  30.         </dependency>
  31.     </dependencies>
复制代码
修改发送者和接收者
  1. @Component
  2. public class KafkaProducer {
  3.     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
  4.     @Autowired
  5.     private KafkaTemplate<String, String> kafkaTemplate;
  6.     public void send(String topic, String payload) {
  7.         LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
  8.         kafkaTemplate.send(topic, payload);
  9.     }
  10. }
  11. @Component
  12. public class KafkaConsumer {
  13.     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
  14.     private String payload;
  15.     @KafkaListener(topics = "${test.topic}")
  16.     public void receive(ConsumerRecord<?, ?> consumerRecord) {
  17.         LOGGER.info("----------------received payload='{}'", consumerRecord.toString());
  18.         payload = consumerRecord.toString();
  19.     }
  20.     public String getPayload() {
  21.         return payload;
  22.     }
  23.     public void setPayload(String payload) {
  24.         this.payload = payload;
  25.     }
  26. }
复制代码
然后写main方法,随意写一个即可,配置入戏
  1. spring:
  2.   kafka:
  3.     consumer:
  4.       auto-offset-reset: earliest
  5.       group-id: mytest
  6. test:
  7.   topic: embedded-test-topic
复制代码
写一个单位测试
  1. @SpringBootTest
  2. @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
  3. class DemoMainTest {
  4.     @Autowired
  5.     private KafkaConsumer consumer;
  6.     @Autowired
  7.     private KafkaProducer producer;
  8.     @Value("${test.topic}")
  9.     private String topic;
  10.     @Test
  11.     void embedKafka() throws InterruptedException {
  12.         String data = "Sending with our own simple KafkaProducer";
  13.         producer.send(topic, data);
  14.         Thread.sleep(3000);
  15.         assertThat(consumer.getPayload(), containsString(data));
  16.         Thread.sleep(10000);
  17.     }
  18. }
复制代码
通过
  1. @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
复制代码
直接模拟一个kafka,内里有一些注解参数,可以设置broker的 数目端口,zk的端口,topic和partition数目等

实际上是通过embed zk和kafka来mock了一个kafka server
单位测试运行成功


思绪

有了kafka单位测试后,根据springboot map可以接收多套配置的方式不就实现了kafka的多数据源的能力,貌似非常简朴;但是假如需要不消修改代码,消费端怎么办,发送者可以手动创建,消费端是注解方式,topic等信息在注解参数中,注解参数值却是常量,代码写死的,那么我们就需要:
这里没思量把消费端和发送者的额外处置惩罚逻辑写在这里的做法,同一处置惩罚kafka,雷同kafka网关,由于kafka一样平常不会仅一套,且不会仅有一个topic,需要分发处置惩罚,好比slb,feign等。
kafka消费者的原理 

其实kafka发送者和消费者也是雷同逻辑,但是spring-kafka通过注解方式实现消费者,假如我们利用原生kafka的kafkaconsumer,那么只需要通过Map接收参数,然后本身实现消费逻辑就行,但是spring-kafka毕竟做了许多公共没须要的逻辑,拉取消费的一系列参数,线程池管理等处置惩罚措施。看看Spring-kafka的消费者初始化原理,
  1. BeanPostProcessor的kafka实现
复制代码
org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor
看前置处置惩罚

什么都没做,所以,所有逻辑都在后置处置惩罚
  1. public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
  2.                 if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
  3.                         Class<?> targetClass = AopUtils.getTargetClass(bean);
  4.             //找到注解,消费注解KafkaListener打在类上,一般不用这种方式
  5.                         Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
  6.             //类上KafkaListener注解的标志
  7.                         final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
  8.                         final List<Method> multiMethods = new ArrayList<>();
  9.             //找到消费方法,去每个方法上找KafkaListener注解
  10.                         Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
  11.                                         (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
  12.                                                 Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
  13.                                                 return (!listenerMethods.isEmpty() ? listenerMethods : null);
  14.                                         });
  15.                         if (hasClassLevelListeners) {
  16.                 //类上KafkaListener注解的时候,通过另外的注解KafkaHandler的方式,找到消费方法
  17.                                 Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
  18.                                                 (ReflectionUtils.MethodFilter) method ->
  19.                                                                 AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
  20.                                 multiMethods.addAll(methodsWithHandler);
  21.                         }
  22.             //实际上大部分类是没有kafka消费注解的,效率并不高,但是因为日志是trace,所以日志一般默认看不见
  23.             //注解KafkaListener打在方法上的时候
  24.                         if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {
  25.                                 this.nonAnnotatedClasses.add(bean.getClass());
  26.                                 this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
  27.                         }
  28.                         else {
  29.                                 // Non-empty set of methods
  30.                                 for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
  31.                                         Method method = entry.getKey();
  32.                                         for (KafkaListener listener : entry.getValue()) {
  33.                         //核心逻辑
  34.                                                 processKafkaListener(listener, method, bean, beanName);
  35.                                         }
  36.                                 }
  37.                                 this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
  38.                                                         + beanName + "': " + annotatedMethods);
  39.                         }
  40.             //注解KafkaListener打在类上,实际上处理逻辑跟KafkaListener打在方法上差不多
  41.                         if (hasClassLevelListeners) {
  42.                                 processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
  43.                         }
  44.                 }
  45.                 return bean;
  46.         }
复制代码
假如是注解打在类上,如下

本文中的示例的@KafkaListener打在方法上,所以分析
  1. processKafkaListener 
复制代码
其实原理都一样,spring-kafka不会写2份一样逻辑,只是读取处置惩罚的参数略有差别
  1. protected synchronized void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
  2.                         String beanName) {
  3.         //检查代理
  4.                 Method methodToUse = checkProxy(method, bean);
  5.         //终端设计思想,Spring很多地方都这样设计,尤其是swagger
  6.                 MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
  7.                 endpoint.setMethod(methodToUse);
  8.         //bean的名称,这里需要定制全局唯一,否则多个listener会冲突
  9.                 String beanRef = kafkaListener.beanRef();
  10.                 this.listenerScope.addListener(beanRef, bean);
  11.                 String[] topics = resolveTopics(kafkaListener);
  12.                 TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
  13.                 if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
  14.             //核心逻辑
  15.                         processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
  16.                 }
  17.                 this.listenerScope.removeListener(beanRef);
  18.         }
复制代码
继承
  1. processListener
复制代码
  1. protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
  2.                                                                 Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
  3.         //MethodKafkaListenerEndpoint赋值了,这个很关键
  4.                 processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);
  5.         //容器工厂
  6.                 String containerFactory = resolve(kafkaListener.containerFactory());
  7.                 KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener,
  8.                                 containerFactory, beanName);
  9.         //注册终端,最终生效
  10.                 this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
  11.         }
复制代码
processKafkaListenerAnnotation
  1. private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> endpoint,
  2.                         KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {
  3.                 endpoint.setBean(bean);
  4.                 endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  5.                 endpoint.setId(getEndpointId(kafkaListener));
  6.                 endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
  7.                 endpoint.setTopicPartitions(tps);
  8.                 endpoint.setTopics(topics);
  9.                 endpoint.setTopicPattern(resolvePattern(kafkaListener));
  10.                 endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
  11.                 endpoint.setListenerInfo(resolveExpressionAsBytes(kafkaListener.info(), "info"));
  12.                 String group = kafkaListener.containerGroup();
  13.                 if (StringUtils.hasText(group)) {
  14.                         Object resolvedGroup = resolveExpression(group);
  15.                         if (resolvedGroup instanceof String) {
  16.                                 endpoint.setGroup((String) resolvedGroup);
  17.                         }
  18.                 }
  19.                 String concurrency = kafkaListener.concurrency();
  20.                 if (StringUtils.hasText(concurrency)) {
  21.                         endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
  22.                 }
  23.                 String autoStartup = kafkaListener.autoStartup();
  24.                 if (StringUtils.hasText(autoStartup)) {
  25.                         endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
  26.                 }
  27.                 resolveKafkaProperties(endpoint, kafkaListener.properties());
  28.                 endpoint.setSplitIterables(kafkaListener.splitIterables());
  29.                 if (StringUtils.hasText(kafkaListener.batch())) {
  30.                         endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
  31.                 }
  32.                 endpoint.setBeanFactory(this.beanFactory);
  33.                 resolveErrorHandler(endpoint, kafkaListener);
  34.                 resolveContentTypeConverter(endpoint, kafkaListener);
  35.                 resolveFilter(endpoint, kafkaListener);
  36.         }
复制代码
各种参数注册,尤其是此中的ID和handler是必须的,不注册不行;笔者试着本身设置endpoint,发现此中的各种handler注册。 
解决方式

先写一个工具类,用于创建一些关键类的bean,定义了发送者创建,消费者工厂类,消费者的创建由注解扫描实现,引用工具类的消费者容器工厂bean。
  1. public class KafkaConfigUtil {
  2.     private DefaultKafkaProducerFactory<String, String> initProducerFactory(KafkaProperties kafkaProperties) {
  3.         return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
  4.     }
  5.     public KafkaTemplate<String, String> initKafkaTemplate(KafkaProperties kafkaProperties) {
  6.         return new KafkaTemplate<>(initProducerFactory(kafkaProperties));
  7.     }
  8.     private ConsumerFactory<? super Integer, ? super String> initConsumerFactory(KafkaProperties kafkaProperties) {
  9.         return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
  10.     }
  11.     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
  12.     initKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
  13.         ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
  14.                 new ConcurrentKafkaListenerContainerFactory<>();
  15.         factory.setConsumerFactory(initConsumerFactory(kafkaProperties));
  16.         return factory;
  17.     }
  18. }
复制代码
1、通过Map接收多数据源

定义一个配置接收器,仿造zuul的模式 
  1. @ConfigurationProperties(prefix = "spring.kafka")
  2. public class KafkaMultiProperties {
  3.     private Map<String, KafkaProperties> routes;
  4.     public Map<String, KafkaProperties> getRoutes() {
  5.         return routes;
  6.     }
  7.     public void setRoutes(Map<String, KafkaProperties> routes) {
  8.         this.routes = routes;
  9.     }
  10. }
复制代码
每一个route其实就说一套kafka,再写一个Configuration,注入配置文件
  1. @Configuration
  2. @EnableConfigurationProperties(KafkaMultiProperties.class)
  3. public class KafkaConfiguration {
  4.    
  5. }
复制代码
如许就可以注入配置了,从此可以根据配置的差别初始化差别的kafka集群逻辑。 如许就可以把自定义的Properties注入Springboot的placeholder中。
2、通过自定义扫描支持消费者

假如消费者大概发送者逻辑需要写在当前kafka网关应用,那么只能通过自定义扫描方式支持配置差别,所有配置的天生者和消费者必须代码实现逻辑,通过配置加载方式,自定义扫描注入bean即可。以消费者为例,生产者不涉及注解发送方式相对简朴。
  1. public class KafkaConfigInit {
  2.     private KafkaMultiProperties kafkaMultiProperties;
  3.     private ConfigurableApplicationContext applicationContext;
  4.     public KafkaConfigInit(KafkaMultiProperties kafkaMultiProperties,
  5.                            ConfigurableApplicationContext applicationContext) {
  6.         this.kafkaMultiProperties = kafkaMultiProperties;
  7.         this.applicationContext = applicationContext;
  8.     }
  9.     @PostConstruct
  10.     public void initConfig() {
  11.         if (kafkaMultiProperties == null || kafkaMultiProperties.getRoutes() == null) return;
  12.         kafkaMultiProperties.getRoutes().forEach((k, v) -> {
  13.             //register producer by config
  14.             ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
  15.             beanFactory.registerSingleton(k + "_producer", KafkaConfigUtil.initKafkaTemplate(v));
  16.             //register consumer container factory
  17.             KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory = KafkaConfigUtil.initKafkaListenerContainerFactory(v);
  18.             beanFactory.registerSingleton(k + "_consumerFactory", kafkaListenerContainerFactory);
  19.         });
  20.     }
  21. }
复制代码
写了一个初始化的bean,用于通过配置加载bean。但是有2个问题:
关于第1点
由于需要按照配置加载,不能代码写bean的加载逻辑,只能本身扫描按照配置加载,那么需要自定义扫描注解和扫描包名(淘汰扫描范围,提高服从)
关于第2点
需要手动执行beanpostprocessor的逻辑即可
show me the code

完满刚刚写的部门代码:
写一个注解
  1. @Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface KafkaConfigConsumer {
  5.     String beanId() default "";
  6. }
复制代码
通过beanId区分,配置文件的key+"_consumer"可以作为唯一标识,定义一种标准
可以利用Spring的
  1. PathMatchingResourcePatternResolver
复制代码
本身剖析resources信息,来拿到写的自定义注解的类,然后天生对象,注入Spring
  1. public class KafkaConfigInit {
  2.     private KafkaMultiProperties kafkaMultiProperties;
  3.     private ConfigurableApplicationContext applicationContext;
  4.     private KafkaListenerAnnotationBeanPostProcessor<?,?> kafkaListenerAnnotationBeanPostProcessor;
  5.     private static final Map<String, Object> consumerMap = new ConcurrentHashMap<>();
  6.     public KafkaConfigInit(KafkaMultiProperties kafkaMultiProperties, ConfigurableApplicationContext applicationContext, KafkaListenerAnnotationBeanPostProcessor<?, ?> kafkaListenerAnnotationBeanPostProcessor) {
  7.         this.kafkaMultiProperties = kafkaMultiProperties;
  8.         this.applicationContext = applicationContext;
  9.         this.kafkaListenerAnnotationBeanPostProcessor = kafkaListenerAnnotationBeanPostProcessor;
  10.     }
  11.     @PostConstruct
  12.     public void initConfig() throws IOException {
  13.         scanConsumer();
  14.         if (kafkaMultiProperties == null || kafkaMultiProperties.getRoutes() == null) return;
  15.         kafkaMultiProperties.getRoutes().forEach((k, v) -> {
  16.             //register producer by config
  17.             ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
  18.             beanFactory.registerSingleton(k + "_producer", KafkaConfigUtil.initKafkaTemplate(v));
  19.             //register consumer container factory
  20.             KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory = KafkaConfigUtil.initKafkaListenerContainerFactory(v);
  21.             beanFactory.registerSingleton(k + "_containerFactory", kafkaListenerContainerFactory);
  22.             beanFactory.registerSingleton(k+"_consumer", consumerMap.get(k+"_consumer"));
  23.             kafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(consumerMap.get(k+"_consumer"), k+"_consumer");
  24.         });
  25.     }
  26.     private void scanConsumer() throws IOException {
  27.         SimpleMetadataReaderFactory register = new SimpleMetadataReaderFactory();
  28.         PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
  29.         Resource[] resources = resolver.getResources(ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + "com/feng/kafka/demo/init/*");
  30.         Arrays.stream(resources).forEach((resource)->{
  31.             try {
  32.                 MetadataReader metadataReader = register.getMetadataReader(resource);
  33.                 if (metadataReader.getAnnotationMetadata().hasAnnotatedMethods("org.springframework.kafka.annotation.KafkaListener")){
  34.                     String className = metadataReader.getClassMetadata().getClassName();
  35.                     Class<?> clazz = Class.forName(className);
  36.                     KafkaConfigConsumer kafkaConfigConsumer = clazz.getDeclaredAnnotation(KafkaConfigConsumer.class);
  37.                     Object obj = clazz.newInstance();
  38.                     consumerMap.put(kafkaConfigConsumer.beanId(), obj);
  39.                 }
  40.             } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
  41.                 throw new RuntimeException(e);
  42.             }
  43.         });
  44.     }
  45. }
复制代码
同时,需要手动执行 
  1. kafkaListenerAnnotationBeanPostProcessor
复制代码
的逻辑,上面有源码分析,而且由于要支持多数据源,所以需要修改消费者的注解参数
  1. //@KafkaListener(topics = "${test.topic}")
  2. //@Component
  3. @KafkaConfigConsumer(beanId = "xxx_consumer")
  4. public class KafkaConsumer {
  5.     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
  6.     private String payload;
  7. //    @KafkaHandler
  8.     @KafkaListener(topics = "${test.topic}", beanRef = "xxx_listener", containerFactory = "xxx_containerFactory")
  9.     public void receive(ConsumerRecord<?, ?> consumerRecord) {
  10.         LOGGER.info("----------------received payload='{}'", consumerRecord.toString());
  11.         payload = consumerRecord.toString();
  12.     }
  13.     // other getters
  14.     public String getPayload() {
  15.         return payload;
  16.     }
  17.     public void setPayload(String payload) {
  18.         this.payload = payload;
  19.     }
  20. }
复制代码
增长beanRef属性外加我们本身写的注解,然后通过@Configuration注入
  1. @Configuration
  2. @EnableConfigurationProperties(KafkaMultiProperties.class)
  3. public class KafkaConfiguration {
  4.     @Bean
  5.     public KafkaConfigInit initKafka(KafkaMultiProperties kafkaMultiProperties,
  6.                                      ConfigurableApplicationContext applicationContext,
  7.                                      KafkaListenerAnnotationBeanPostProcessor<?, ?> kafkaListenerAnnotationBeanPostProcessor){
  8.         return new KafkaConfigInit(kafkaMultiProperties, applicationContext, kafkaListenerAnnotationBeanPostProcessor);
  9.     }
  10. }
复制代码
然后修改配置文件和单位测试类
  1. spring:
  2.   kafka:
  3.     routes:
  4.       xxx:
  5.         producer:
  6.           batchSize: 1
  7.         consumer:
  8.           auto-offset-reset: earliest
  9.           group-id: xxx
复制代码
然后修改单位测试代码
  1. @SpringBootTest
  2. @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
  3. class DemoMainTest {
  4.     @Lazy
  5.     @Autowired
  6.     private KafkaConsumer consumer;
  7.     @Autowired
  8.     private ApplicationContext applicationContext;
  9.     @Value("${test.topic}")
  10.     private String topic;
  11.     @Test
  12.     void embedKafka() throws InterruptedException {
  13.         String data = "Sending with our own simple KafkaProducer";
  14.         applicationContext.getBean("xxx_producer", KafkaTemplate.class).send(topic, data);
  15.         Thread.sleep(3000);
  16.         assertThat(consumer.getPayload(), containsString(data));
  17.         Thread.sleep(10000);
  18.     }
  19. }
复制代码
执行单位测试成功
 

数据正确发送消费,断言正常 
3、通过字节码天生支持消费者

上面的方式觉得还是不方便,一样平常而言处置惩罚消息和消费消息是异步的,纵然是同步也不会在消费线程直接处置惩罚,一样平常是发送到其他地方接口处置惩罚,所以为啥还要写消费者代码呢,默认一个不就好了,但是注解参数确是常量,那么字节码天生一个唯一的类即可。
假如天生者和消费者处置惩罚逻辑不消网关应用处置惩罚,那么仅仅是无脑转发,雷同zuul,可以通过字节码天生方式实现同一逻辑,主要是消费者,毕竟有注解,生产者不存在注解可以直接new出来注入bean。
以javassist为例,简朴些,当然asm也可以
show me the code

其实就说把扫描的消费者类,酿成固定某个类消费
  1. //@KafkaListener(topics = "${test.topic}")
  2. //@Component
  3. //@KafkaConfigConsumer(beanId = "xxx_consumer")
  4. public class KafkaConsumer {
  5.     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
  6.     private String payload;
  7. //    @KafkaHandler
  8. //    @KafkaListener(topics = "${test.topic}", beanRef = "xxx_listener", containerFactory = "xxx_containerFactory")
  9.     public void receive(ConsumerRecord<?, ?> consumerRecord) {
  10.         LOGGER.info("----------------received payload='{}'", consumerRecord.toString());
  11.         payload = consumerRecord.toString();
  12.     }
复制代码
去掉注解,由于注解需要我们动态加上去,下一步修改bean创建流程
  1.     @PostConstruct
  2.     public void initConfig() throws IOException {
  3. //        scanConsumer();
  4.         if (kafkaMultiProperties == null || kafkaMultiProperties.getRoutes() == null) return;
  5.         kafkaMultiProperties.getRoutes().forEach((k, v) -> {
  6.             //register producer by config
  7.             ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
  8.             beanFactory.registerSingleton(k + "_producer", KafkaConfigUtil.initKafkaTemplate(v));
  9.             //register consumer container factory
  10.             KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory = KafkaConfigUtil.initKafkaListenerContainerFactory(v);
  11.             beanFactory.registerSingleton(k + "_containerFactory", kafkaListenerContainerFactory);
  12. //            beanFactory.registerSingleton(k + "_consumer", consumerMap.get(k + "_consumer"));
  13.             Object obj = initConsumerBean(k);
  14.             beanFactory.registerSingleton(k + "_consumer", obj);
  15.             kafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(obj, k + "_consumer");
  16.         });
  17.     }
  18.     private Object initConsumerBean(String key) {
  19.         try {
  20.             ClassPool pool = ClassPool.getDefault();
  21.             CtClass ct = pool.getCtClass("com.feng.kafka.demo.init.KafkaConsumer");
  22.             //修改类名,避免重复
  23.             ct.setName("com.feng.kafka.demo.init.KafkaConsumer"+key);
  24.             //获取类中的方法
  25.             CtMethod ctMethod = ct.getDeclaredMethod("receive");
  26.             MethodInfo methodInfo = ctMethod.getMethodInfo();
  27.             ConstPool cp = methodInfo.getConstPool();
  28.             //获取注解属性
  29.             AnnotationsAttribute attribute = new AnnotationsAttribute(cp, AnnotationsAttribute.visibleTag);
  30.             Annotation annotation = new Annotation("org.springframework.kafka.annotation.KafkaListener", cp);
  31.             ArrayMemberValue arrayMemberValue = new ArrayMemberValue(cp);
  32.             arrayMemberValue.setValue(new MemberValue[]{new StringMemberValue("embedded-test-topic", cp)});
  33.             annotation.addMemberValue("topics", arrayMemberValue);
  34.             annotation.addMemberValue("beanRef", new StringMemberValue(key+"_listener", cp));
  35.             annotation.addMemberValue("containerFactory", new StringMemberValue(key+"_containerFactory", cp));
  36.             attribute.addAnnotation(annotation);
  37.             methodInfo.addAttribute(attribute);
  38.             byte[] bytes = ct.toBytecode();
  39.             Class<?> clazz = ReflectUtils.defineClass("com.feng.kafka.demo.init.KafkaConsumer" + key, bytes, Thread.currentThread().getContextClassLoader());
  40.             return clazz.newInstance();
  41.         } catch (Exception e) {
  42.             throw new RuntimeException(e);
  43.         }
  44.     }
复制代码
通过字节码天生和动态加载class方式,天生唯一的对象,实现通过配置方式支持多数据源,不需要写一句消费代码。
单位测试

去掉了断言,由于类是动态变化的了。 
总结

实际上spring-kafka已经非常完满了,spring-kafka插件的支持也很完满,不需要关注kafka的消费过程,只需要配置即可,但是也为机动性埋下了隐患,当然一样平常而言我们根本上用不到多kafka的环境,也不会做一个kafka网关应用,不外当业务需要的时间,可以设计一套kafka网关应用,分发kafka的消息,起到一个流量网关的能力,解耦业务的应用,实现架构的松耦合。
在Springboot打成fatjar后,由于Springboot的启动时自定义了classloader,会导致找不到类的环境,需要把线程的classloader参加javassist的查找范围
  1. ClassPool pool = ClassPool.getDefault();
  2. pool.appendClassPath(new LoaderClassPath(Thread.currentThread().getContextClassLoader()));
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4