ToB企服应用市场:ToB评测及商务社交产业平台

标题: keycloak~EventListenerProvider初始化kafka引出的类加载问题 [打印本页]

作者: 种地    时间: 2023-7-19 17:37
标题: keycloak~EventListenerProvider初始化kafka引出的类加载问题
EventListenerProvider初始

keycloak提供的事件处理机制,可以通过实现EventListenerProvider接口来实现自定义的事件处理逻辑。在keycloak启动时,会通过ServiceLoader机制加载所有的EventListenerProvider实现类,并将其注册到keycloak的事件处理机制中。
EventListenerProviderFactory

EventListenerProviderFactory是进行事件处理器的生产工厂,用于创建EventListenerProvider实例。在keycloak启动时,会通过ServiceLoader机制加载所有的EventListenerProviderFactory实现类,并将其注册到keycloak的事件处理机制中。
问题

  1.   @Override
  2.   public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
  3.     try {
  4.       this.executorService = Executors.newFixedThreadPool(2);
  5.       Properties kafkaProperties = new Properties();
  6.       kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  7.           ConfigFactory.getInstance().getStrPropertyValue("kafka.host"));
  8.       kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "kc-ListenerProviderFactory");
  9.       kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  10.       kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  11.       kafkaProperties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
  12.       kafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
  13.       kafkaProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
  14.       kafkaProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
  15.       // 需要使用当前类加载器,否则会出现无法加载StringDeserializer的情况
  16.       Class<?> stringDeserializerClass =
  17.           getClass().getClassLoader().loadClass("org.apache.kafka.common.serialization.StringDeserializer");
  18.       kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
  19.       kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
  20.       this.kafkaConsumerAdd = new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_add");
  21.       executorService.submit(kafkaConsumerAdd);
  22.       this.kafkaConsumerRemove=new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_remove");
  23.       executorService.submit(kafkaConsumerRemove);
  24.     } catch (ClassNotFoundException e) {
  25.       throw new RuntimeException(e);
  26.     }
  27.   }
复制代码
  1. public class KcKafkaConsumer implements Runnable {
  2.   private static final Logger logger = Logger.getLogger(ConfigFactory.class);
  3.   private final AtomicBoolean closed = new AtomicBoolean(false);
  4.   private final KeycloakSessionFactory keycloakSessionFactory;
  5.   private KafkaConsumer<String, String> kafkaConsumer;
  6.   public KcKafkaConsumer(KeycloakSessionFactory keycloakSessionFactory, Properties properties, String topic)
  7.       throws ClassNotFoundException {
  8.     this.keycloakSessionFactory = keycloakSessionFactory;
  9.     this.kafkaConsumer = new KafkaConsumer<>(properties);
  10.     this.kafkaConsumer.subscribe(Collections.singleton(topic));
  11.   }
  12. @Override
  13.   public void run() {
  14.     try {
  15.       while (!closed.get()) {
  16.         ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
  17.         // 处理Kafka消息
  18.         for (ConsumerRecord<String, String> record : records) {
  19.           System.out.println("Topic:" + record.topic() + ",Received message: " + record.value());
  20.           //TODO: 处理Kafka消息的具体逻辑
  21.         }
  22.       }
  23.     } finally {
  24.       kafkaConsumer.close();
  25.     }
  26.   }
  27.   public void shutdown() {
  28.     closed.set(true);
  29.     kafkaConsumer.close();
  30.   }
  31. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4