EventListenerProvider初始
keycloak提供的事件处理机制,可以通过实现EventListenerProvider接口来实现自定义的事件处理逻辑。在keycloak启动时,会通过ServiceLoader机制加载所有的EventListenerProvider实现类,并将其注册到keycloak的事件处理机制中。
- 构造方法,在每个keycloak后台操作时,它都会重新构建实例
- OnEvent方法,在事件发生时执行,不会出现类加载问题,因为这样类已经被加载了
EventListenerProviderFactory
EventListenerProviderFactory是进行事件处理器的生产工厂,用于创建EventListenerProvider实例。在keycloak启动时,会通过ServiceLoader机制加载所有的EventListenerProviderFactory实现类,并将其注册到keycloak的事件处理机制中。
- init方法:keycloak启动时会执行,用于初始化EventListenerProviderFactory实例,可以在此方法中进行一些初始化操作。
- postInit方法:keycloak启动时会执行,在init方法之后,会执行这个方法
- create方法:在kc后台开启这个EventListenerProviderFactory之后,每次请求都会执行这个create方法,对于它生产的provider对象,可能考虑使用单例的方式, 避免每次请求都创建一个新的对象
- close方法:在keycloak程序关闭后或者当前事件被注册时,这个方法才会执行
问题
- 问题描述:在EventListenerProviderFactory的init方法中,通过kafka发送消息,会出现类加载问题,因为在keycloak启动时,kafka的类的加载器还没有被加载,所以会出现类加载问题。
- 解决:需要将类加载器这块,修改成当前类加载器去加载对应的文件,如下代码解决了类无法加载的问题
- @Override
- public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
- try {
- this.executorService = Executors.newFixedThreadPool(2);
- Properties kafkaProperties = new Properties();
- kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- ConfigFactory.getInstance().getStrPropertyValue("kafka.host"));
- kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "kc-ListenerProviderFactory");
- kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- kafkaProperties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
- kafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
- kafkaProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
- kafkaProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
- // 需要使用当前类加载器,否则会出现无法加载StringDeserializer的情况
- Class<?> stringDeserializerClass =
- getClass().getClassLoader().loadClass("org.apache.kafka.common.serialization.StringDeserializer");
- kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
- kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
- this.kafkaConsumerAdd = new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_add");
- executorService.submit(kafkaConsumerAdd);
- this.kafkaConsumerRemove=new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_remove");
- executorService.submit(kafkaConsumerRemove);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
复制代码
- 对于kafka-clients实现消费者的话,代码还是比较简单的
- public class KcKafkaConsumer implements Runnable {
- private static final Logger logger = Logger.getLogger(ConfigFactory.class);
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final KeycloakSessionFactory keycloakSessionFactory;
- private KafkaConsumer<String, String> kafkaConsumer;
- public KcKafkaConsumer(KeycloakSessionFactory keycloakSessionFactory, Properties properties, String topic)
- throws ClassNotFoundException {
- this.keycloakSessionFactory = keycloakSessionFactory;
- this.kafkaConsumer = new KafkaConsumer<>(properties);
- this.kafkaConsumer.subscribe(Collections.singleton(topic));
- }
- @Override
- public void run() {
- try {
- while (!closed.get()) {
- ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
- // 处理Kafka消息
- for (ConsumerRecord<String, String> record : records) {
- System.out.println("Topic:" + record.topic() + ",Received message: " + record.value());
- //TODO: 处理Kafka消息的具体逻辑
- }
- }
- } finally {
- kafkaConsumer.close();
- }
- }
- public void shutdown() {
- closed.set(true);
- kafkaConsumer.close();
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |