前言
最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是假如需要不修改代码扩展呢,由于kafka本身不处置惩罚额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。
准备test
kafka本身非常容易上手,假如我们需要单位测试,引入jar依赖,JDK利用1.8,当然也可以利用JDK17
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- <version>2.7.17</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <version>2.7.17</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>2.9.13</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka-test</artifactId>
- <version>2.9.13</version>
- <scope>test</scope>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.testcontainers/kafka -->
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>kafka</artifactId>
- <version>1.20.1</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
复制代码 修改发送者和接收者
- @Component
- public class KafkaProducer {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- public void send(String topic, String payload) {
- LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
- kafkaTemplate.send(topic, payload);
- }
- }
- @Component
- public class KafkaConsumer {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
- private String payload;
- @KafkaListener(topics = "${test.topic}")
- public void receive(ConsumerRecord<?, ?> consumerRecord) {
- LOGGER.info("----------------received payload='{}'", consumerRecord.toString());
- payload = consumerRecord.toString();
- }
- public String getPayload() {
- return payload;
- }
- public void setPayload(String payload) {
- this.payload = payload;
- }
- }
复制代码 然后写main方法,随意写一个即可,配置入戏
- spring:
- kafka:
- consumer:
- auto-offset-reset: earliest
- group-id: mytest
- test:
- topic: embedded-test-topic
复制代码 写一个单位测试
- @SpringBootTest
- @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
- class DemoMainTest {
- @Autowired
- private KafkaConsumer consumer;
- @Autowired
- private KafkaProducer producer;
- @Value("${test.topic}")
- private String topic;
- @Test
- void embedKafka() throws InterruptedException {
- String data = "Sending with our own simple KafkaProducer";
- producer.send(topic, data);
- Thread.sleep(3000);
- assertThat(consumer.getPayload(), containsString(data));
- Thread.sleep(10000);
- }
- }
复制代码 通过
- @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
复制代码 直接模拟一个kafka,内里有一些注解参数,可以设置broker的 数目端口,zk的端口,topic和partition数目等
实际上是通过embed zk和kafka来mock了一个kafka server
单位测试运行成功
思绪
有了kafka单位测试后,根据springboot map可以接收多套配置的方式不就实现了kafka的多数据源的能力,貌似非常简朴;但是假如需要不消修改代码,消费端怎么办,发送者可以手动创建,消费端是注解方式,topic等信息在注解参数中,注解参数值却是常量,代码写死的,那么我们就需要:
- 不让Springboot主动扫描,根据配置手动扫描注册bean
- 字节码天生bean,就可以根据参数
这里没思量把消费端和发送者的额外处置惩罚逻辑写在这里的做法,同一处置惩罚kafka,雷同kafka网关,由于kafka一样平常不会仅一套,且不会仅有一个topic,需要分发处置惩罚,好比slb,feign等。
kafka消费者的原理
其实kafka发送者和消费者也是雷同逻辑,但是spring-kafka通过注解方式实现消费者,假如我们利用原生kafka的kafkaconsumer,那么只需要通过Map接收参数,然后本身实现消费逻辑就行,但是spring-kafka毕竟做了许多公共没须要的逻辑,拉取消费的一系列参数,线程池管理等处置惩罚措施。看看Spring-kafka的消费者初始化原理,
- BeanPostProcessor的kafka实现
复制代码 org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor
看前置处置惩罚

什么都没做,所以,所有逻辑都在后置处置惩罚
- public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
- if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
- Class<?> targetClass = AopUtils.getTargetClass(bean);
- //找到注解,消费注解KafkaListener打在类上,一般不用这种方式
- Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
- //类上KafkaListener注解的标志
- final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
- final List<Method> multiMethods = new ArrayList<>();
- //找到消费方法,去每个方法上找KafkaListener注解
- Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
- (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
- Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
- return (!listenerMethods.isEmpty() ? listenerMethods : null);
- });
- if (hasClassLevelListeners) {
- //类上KafkaListener注解的时候,通过另外的注解KafkaHandler的方式,找到消费方法
- Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
- (ReflectionUtils.MethodFilter) method ->
- AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
- multiMethods.addAll(methodsWithHandler);
- }
- //实际上大部分类是没有kafka消费注解的,效率并不高,但是因为日志是trace,所以日志一般默认看不见
- //注解KafkaListener打在方法上的时候
- if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {
- this.nonAnnotatedClasses.add(bean.getClass());
- this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
- }
- else {
- // Non-empty set of methods
- for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
- Method method = entry.getKey();
- for (KafkaListener listener : entry.getValue()) {
- //核心逻辑
- processKafkaListener(listener, method, bean, beanName);
- }
- }
- this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
- + beanName + "': " + annotatedMethods);
- }
- //注解KafkaListener打在类上,实际上处理逻辑跟KafkaListener打在方法上差不多
- if (hasClassLevelListeners) {
- processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
- }
- }
- return bean;
- }
复制代码 假如是注解打在类上,如下

本文中的示例的@KafkaListener打在方法上,所以分析
其实原理都一样,spring-kafka不会写2份一样逻辑,只是读取处置惩罚的参数略有差别
- protected synchronized void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
- String beanName) {
- //检查代理
- Method methodToUse = checkProxy(method, bean);
- //终端设计思想,Spring很多地方都这样设计,尤其是swagger
- MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
- endpoint.setMethod(methodToUse);
- //bean的名称,这里需要定制全局唯一,否则多个listener会冲突
- String beanRef = kafkaListener.beanRef();
- this.listenerScope.addListener(beanRef, bean);
- String[] topics = resolveTopics(kafkaListener);
- TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
- if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
- //核心逻辑
- processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
- }
- this.listenerScope.removeListener(beanRef);
- }
复制代码 继承
- protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
- Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
- //MethodKafkaListenerEndpoint赋值了,这个很关键
- processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);
- //容器工厂
- String containerFactory = resolve(kafkaListener.containerFactory());
- KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener,
- containerFactory, beanName);
- //注册终端,最终生效
- this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
- }
复制代码 processKafkaListenerAnnotation
- private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> endpoint,
- KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {
- endpoint.setBean(bean);
- endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
- endpoint.setId(getEndpointId(kafkaListener));
- endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
- endpoint.setTopicPartitions(tps);
- endpoint.setTopics(topics);
- endpoint.setTopicPattern(resolvePattern(kafkaListener));
- endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
- endpoint.setListenerInfo(resolveExpressionAsBytes(kafkaListener.info(), "info"));
- String group = kafkaListener.containerGroup();
- if (StringUtils.hasText(group)) {
- Object resolvedGroup = resolveExpression(group);
- if (resolvedGroup instanceof String) {
- endpoint.setGroup((String) resolvedGroup);
- }
- }
- String concurrency = kafkaListener.concurrency();
- if (StringUtils.hasText(concurrency)) {
- endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
- }
- String autoStartup = kafkaListener.autoStartup();
- if (StringUtils.hasText(autoStartup)) {
- endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
- }
- resolveKafkaProperties(endpoint, kafkaListener.properties());
- endpoint.setSplitIterables(kafkaListener.splitIterables());
- if (StringUtils.hasText(kafkaListener.batch())) {
- endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
- }
- endpoint.setBeanFactory(this.beanFactory);
- resolveErrorHandler(endpoint, kafkaListener);
- resolveContentTypeConverter(endpoint, kafkaListener);
- resolveFilter(endpoint, kafkaListener);
- }
复制代码 各种参数注册,尤其是此中的ID和handler是必须的,不注册不行;笔者试着本身设置endpoint,发现此中的各种handler注册。
解决方式
先写一个工具类,用于创建一些关键类的bean,定义了发送者创建,消费者工厂类,消费者的创建由注解扫描实现,引用工具类的消费者容器工厂bean。
- public class KafkaConfigUtil {
- private DefaultKafkaProducerFactory<String, String> initProducerFactory(KafkaProperties kafkaProperties) {
- return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
- }
- public KafkaTemplate<String, String> initKafkaTemplate(KafkaProperties kafkaProperties) {
- return new KafkaTemplate<>(initProducerFactory(kafkaProperties));
- }
- private ConsumerFactory<? super Integer, ? super String> initConsumerFactory(KafkaProperties kafkaProperties) {
- return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
- }
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
- initKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
- ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
- new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(initConsumerFactory(kafkaProperties));
- return factory;
- }
- }
复制代码 1、通过Map接收多数据源
定义一个配置接收器,仿造zuul的模式
- @ConfigurationProperties(prefix = "spring.kafka")
- public class KafkaMultiProperties {
- private Map<String, KafkaProperties> routes;
- public Map<String, KafkaProperties> getRoutes() {
- return routes;
- }
- public void setRoutes(Map<String, KafkaProperties> routes) {
- this.routes = routes;
- }
- }
复制代码 每一个route其实就说一套kafka,再写一个Configuration,注入配置文件
- @Configuration
- @EnableConfigurationProperties(KafkaMultiProperties.class)
- public class KafkaConfiguration {
-
- }
复制代码 如许就可以注入配置了,从此可以根据配置的差别初始化差别的kafka集群逻辑。 如许就可以把自定义的Properties注入Springboot的placeholder中。
2、通过自定义扫描支持消费者
假如消费者大概发送者逻辑需要写在当前kafka网关应用,那么只能通过自定义扫描方式支持配置差别,所有配置的天生者和消费者必须代码实现逻辑,通过配置加载方式,自定义扫描注入bean即可。以消费者为例,生产者不涉及注解发送方式相对简朴。
- public class KafkaConfigInit {
- private KafkaMultiProperties kafkaMultiProperties;
- private ConfigurableApplicationContext applicationContext;
- public KafkaConfigInit(KafkaMultiProperties kafkaMultiProperties,
- ConfigurableApplicationContext applicationContext) {
- this.kafkaMultiProperties = kafkaMultiProperties;
- this.applicationContext = applicationContext;
- }
- @PostConstruct
- public void initConfig() {
- if (kafkaMultiProperties == null || kafkaMultiProperties.getRoutes() == null) return;
- kafkaMultiProperties.getRoutes().forEach((k, v) -> {
- //register producer by config
- ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
- beanFactory.registerSingleton(k + "_producer", KafkaConfigUtil.initKafkaTemplate(v));
- //register consumer container factory
- KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory = KafkaConfigUtil.initKafkaListenerContainerFactory(v);
- beanFactory.registerSingleton(k + "_consumerFactory", kafkaListenerContainerFactory);
- });
- }
- }
复制代码 写了一个初始化的bean,用于通过配置加载bean。但是有2个问题:
- 消费者是注解方式扫描,bean需要根据配置加载,不能写在代码内里
- 这里仅仅是注册bean,并不会被beanpostprocessor处置惩罚
关于第1点
由于需要按照配置加载,不能代码写bean的加载逻辑,只能本身扫描按照配置加载,那么需要自定义扫描注解和扫描包名(淘汰扫描范围,提高服从)
关于第2点
需要手动执行beanpostprocessor的逻辑即可
show me the code
完满刚刚写的部门代码:
写一个注解
- @Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface KafkaConfigConsumer {
- String beanId() default "";
- }
复制代码 通过beanId区分,配置文件的key+"_consumer"可以作为唯一标识,定义一种标准
可以利用Spring的
- PathMatchingResourcePatternResolver
复制代码 本身剖析resources信息,来拿到写的自定义注解的类,然后天生对象,注入Spring
- public class KafkaConfigInit {
- private KafkaMultiProperties kafkaMultiProperties;
- private ConfigurableApplicationContext applicationContext;
- private KafkaListenerAnnotationBeanPostProcessor<?,?> kafkaListenerAnnotationBeanPostProcessor;
- private static final Map<String, Object> consumerMap = new ConcurrentHashMap<>();
- public KafkaConfigInit(KafkaMultiProperties kafkaMultiProperties, ConfigurableApplicationContext applicationContext, KafkaListenerAnnotationBeanPostProcessor<?, ?> kafkaListenerAnnotationBeanPostProcessor) {
- this.kafkaMultiProperties = kafkaMultiProperties;
- this.applicationContext = applicationContext;
- this.kafkaListenerAnnotationBeanPostProcessor = kafkaListenerAnnotationBeanPostProcessor;
- }
- @PostConstruct
- public void initConfig() throws IOException {
- scanConsumer();
- if (kafkaMultiProperties == null || kafkaMultiProperties.getRoutes() == null) return;
- kafkaMultiProperties.getRoutes().forEach((k, v) -> {
- //register producer by config
- ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
- beanFactory.registerSingleton(k + "_producer", KafkaConfigUtil.initKafkaTemplate(v));
- //register consumer container factory
- KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory = KafkaConfigUtil.initKafkaListenerContainerFactory(v);
- beanFactory.registerSingleton(k + "_containerFactory", kafkaListenerContainerFactory);
- beanFactory.registerSingleton(k+"_consumer", consumerMap.get(k+"_consumer"));
- kafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(consumerMap.get(k+"_consumer"), k+"_consumer");
- });
- }
- private void scanConsumer() throws IOException {
- SimpleMetadataReaderFactory register = new SimpleMetadataReaderFactory();
- PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
- Resource[] resources = resolver.getResources(ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + "com/feng/kafka/demo/init/*");
- Arrays.stream(resources).forEach((resource)->{
- try {
- MetadataReader metadataReader = register.getMetadataReader(resource);
- if (metadataReader.getAnnotationMetadata().hasAnnotatedMethods("org.springframework.kafka.annotation.KafkaListener")){
- String className = metadataReader.getClassMetadata().getClassName();
- Class<?> clazz = Class.forName(className);
- KafkaConfigConsumer kafkaConfigConsumer = clazz.getDeclaredAnnotation(KafkaConfigConsumer.class);
- Object obj = clazz.newInstance();
- consumerMap.put(kafkaConfigConsumer.beanId(), obj);
- }
- } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- });
- }
- }
复制代码 同时,需要手动执行
- kafkaListenerAnnotationBeanPostProcessor
复制代码 的逻辑,上面有源码分析,而且由于要支持多数据源,所以需要修改消费者的注解参数
- //@KafkaListener(topics = "${test.topic}")
- //@Component
- @KafkaConfigConsumer(beanId = "xxx_consumer")
- public class KafkaConsumer {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
- private String payload;
- // @KafkaHandler
- @KafkaListener(topics = "${test.topic}", beanRef = "xxx_listener", containerFactory = "xxx_containerFactory")
- public void receive(ConsumerRecord<?, ?> consumerRecord) {
- LOGGER.info("----------------received payload='{}'", consumerRecord.toString());
- payload = consumerRecord.toString();
- }
- // other getters
- public String getPayload() {
- return payload;
- }
- public void setPayload(String payload) {
- this.payload = payload;
- }
- }
复制代码 增长beanRef属性外加我们本身写的注解,然后通过@Configuration注入
- @Configuration
- @EnableConfigurationProperties(KafkaMultiProperties.class)
- public class KafkaConfiguration {
- @Bean
- public KafkaConfigInit initKafka(KafkaMultiProperties kafkaMultiProperties,
- ConfigurableApplicationContext applicationContext,
- KafkaListenerAnnotationBeanPostProcessor<?, ?> kafkaListenerAnnotationBeanPostProcessor){
- return new KafkaConfigInit(kafkaMultiProperties, applicationContext, kafkaListenerAnnotationBeanPostProcessor);
- }
- }
复制代码 然后修改配置文件和单位测试类
- spring:
- kafka:
- routes:
- xxx:
- producer:
- batchSize: 1
- consumer:
- auto-offset-reset: earliest
- group-id: xxx
复制代码 然后修改单位测试代码
- @SpringBootTest
- @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
- class DemoMainTest {
- @Lazy
- @Autowired
- private KafkaConsumer consumer;
- @Autowired
- private ApplicationContext applicationContext;
- @Value("${test.topic}")
- private String topic;
- @Test
- void embedKafka() throws InterruptedException {
- String data = "Sending with our own simple KafkaProducer";
- applicationContext.getBean("xxx_producer", KafkaTemplate.class).send(topic, data);
- Thread.sleep(3000);
- assertThat(consumer.getPayload(), containsString(data));
- Thread.sleep(10000);
- }
- }
复制代码 执行单位测试成功
数据正确发送消费,断言正常
3、通过字节码天生支持消费者
上面的方式觉得还是不方便,一样平常而言处置惩罚消息和消费消息是异步的,纵然是同步也不会在消费线程直接处置惩罚,一样平常是发送到其他地方接口处置惩罚,所以为啥还要写消费者代码呢,默认一个不就好了,但是注解参数确是常量,那么字节码天生一个唯一的类即可。
假如天生者和消费者处置惩罚逻辑不消网关应用处置惩罚,那么仅仅是无脑转发,雷同zuul,可以通过字节码天生方式实现同一逻辑,主要是消费者,毕竟有注解,生产者不存在注解可以直接new出来注入bean。
以javassist为例,简朴些,当然asm也可以
show me the code
其实就说把扫描的消费者类,酿成固定某个类消费
- //@KafkaListener(topics = "${test.topic}")
- //@Component
- //@KafkaConfigConsumer(beanId = "xxx_consumer")
- public class KafkaConsumer {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
- private String payload;
- // @KafkaHandler
- // @KafkaListener(topics = "${test.topic}", beanRef = "xxx_listener", containerFactory = "xxx_containerFactory")
- public void receive(ConsumerRecord<?, ?> consumerRecord) {
- LOGGER.info("----------------received payload='{}'", consumerRecord.toString());
- payload = consumerRecord.toString();
- }
复制代码 去掉注解,由于注解需要我们动态加上去,下一步修改bean创建流程
- @PostConstruct
- public void initConfig() throws IOException {
- // scanConsumer();
- if (kafkaMultiProperties == null || kafkaMultiProperties.getRoutes() == null) return;
- kafkaMultiProperties.getRoutes().forEach((k, v) -> {
- //register producer by config
- ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
- beanFactory.registerSingleton(k + "_producer", KafkaConfigUtil.initKafkaTemplate(v));
- //register consumer container factory
- KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory = KafkaConfigUtil.initKafkaListenerContainerFactory(v);
- beanFactory.registerSingleton(k + "_containerFactory", kafkaListenerContainerFactory);
- // beanFactory.registerSingleton(k + "_consumer", consumerMap.get(k + "_consumer"));
- Object obj = initConsumerBean(k);
- beanFactory.registerSingleton(k + "_consumer", obj);
- kafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(obj, k + "_consumer");
- });
- }
- private Object initConsumerBean(String key) {
- try {
- ClassPool pool = ClassPool.getDefault();
- CtClass ct = pool.getCtClass("com.feng.kafka.demo.init.KafkaConsumer");
- //修改类名,避免重复
- ct.setName("com.feng.kafka.demo.init.KafkaConsumer"+key);
- //获取类中的方法
- CtMethod ctMethod = ct.getDeclaredMethod("receive");
- MethodInfo methodInfo = ctMethod.getMethodInfo();
- ConstPool cp = methodInfo.getConstPool();
- //获取注解属性
- AnnotationsAttribute attribute = new AnnotationsAttribute(cp, AnnotationsAttribute.visibleTag);
- Annotation annotation = new Annotation("org.springframework.kafka.annotation.KafkaListener", cp);
- ArrayMemberValue arrayMemberValue = new ArrayMemberValue(cp);
- arrayMemberValue.setValue(new MemberValue[]{new StringMemberValue("embedded-test-topic", cp)});
- annotation.addMemberValue("topics", arrayMemberValue);
- annotation.addMemberValue("beanRef", new StringMemberValue(key+"_listener", cp));
- annotation.addMemberValue("containerFactory", new StringMemberValue(key+"_containerFactory", cp));
- attribute.addAnnotation(annotation);
- methodInfo.addAttribute(attribute);
- byte[] bytes = ct.toBytecode();
- Class<?> clazz = ReflectUtils.defineClass("com.feng.kafka.demo.init.KafkaConsumer" + key, bytes, Thread.currentThread().getContextClassLoader());
- return clazz.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
复制代码 通过字节码天生和动态加载class方式,天生唯一的对象,实现通过配置方式支持多数据源,不需要写一句消费代码。
单位测试

去掉了断言,由于类是动态变化的了。
总结
实际上spring-kafka已经非常完满了,spring-kafka插件的支持也很完满,不需要关注kafka的消费过程,只需要配置即可,但是也为机动性埋下了隐患,当然一样平常而言我们根本上用不到多kafka的环境,也不会做一个kafka网关应用,不外当业务需要的时间,可以设计一套kafka网关应用,分发kafka的消息,起到一个流量网关的能力,解耦业务的应用,实现架构的松耦合。
在Springboot打成fatjar后,由于Springboot的启动时自定义了classloader,会导致找不到类的环境,需要把线程的classloader参加javassist的查找范围
- ClassPool pool = ClassPool.getDefault();
- pool.appendClassPath(new LoaderClassPath(Thread.currentThread().getContextClassLoader()));
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |