KafkaStream Local Store和Global Store区别和用法

打印 上一主题 下一主题

主题 564|帖子 564|积分 1692

前言

使用kafkaStream举行流式计算时,如果必要对数据举行状态处置惩罚,那么常用的会碰到kafkaStream的store,而store也有Local Store以及Global Store,固然也可以使用其他方案的来举行状态保存,文本主要理清晰kafkaStream中的Local Store以及Global Store之间的区别和用法,以及什么时间选择何种store和当store无法满足我们需求时,应该怎样使用其他方案来举行数据的状态保存
本文所有方法和代码皆只针对kafka-streams的3.7.0版本,pom如下:
  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka-streams</artifactId>
  4.     <version>3.7.0</version>
  5. </dependency>
复制代码
由于不同版本的KafkaStream在使用上有较大区别,也由于KafkaStream不同版本API改动较大,以是如果版本不划一,使用方法甚至是一些核心概念都会跟本文报告有所出入,并且KafkaStream由于相对小众,文档也很少,官网的文档也只是一些简朴介绍,以是必要注意避坑
Local Store和Global Store的共同点和区别点

共同点:

1、都是用于流式计算中举行状态存储的
2、详细结构类似,使用的都是如:KeyValueStore,SessionStore等类
3、实际机制类似,会通过内存、本地目次和kafka Topic的变更记录等方式来举行缓存数据更新和恢复
不同点

1、实用场景不同
Local Store 适合用于单个实例的状态管理,适合处置惩罚单个分区的数据,并且缓存数据不会多个实例共享
Global Store 实用于跨实例共享数据状态,多个实例通过Topic中的更新记录来跟新进程中的数据
2、使用方法不同
Local Store 可以直接在代码中调用对应类型存储(如:KeyValueStore)的put方法举行更新数据,不必要思量数据划一性(由于可见性只有单个实例)
Global Store 不能直接调用对应的put和delete方法,所有更新和删除缓存都必要通过发送数据到Global 配置的topic中,然后自行实现Topic数据消费者(实现:org.apache.kafka.streams.processor.api.Processor类),在消费者类中举行数据更新等操作,同时由于必要本身实现更新实例中的数据逻辑,数据划一性也必要开发者自行处置惩罚,虽然正常来说使用Kafka本身的特性很少出现数据划一性问题,但是如果多实例之间性能差异和网络情况等差异,容易将数据不划一的时长延长,如果要求Store划一性强且容忍数据不划一时限短,则必要注意思量Store更新数据消费者的处置惩罚能力
3、扩展性
Local Store:可以通过增加输入主题的分区数来扩展处置惩罚能力,但每个实例仍然独立运行。
Global Store:必要在多个实例之间共享状态,因此在设计时必要思量怎样高效地管理和同步状态。
常见的Store 类型

  1. org.apache.kafka.streams.state.KeyValueStore
  2. org.apache.kafka.streams.state.SessionStore
  3. org.apache.kafka.streams.state.TimestampedKeyValueStore
  4. org.apache.kafka.streams.state.VersionedKeyValueStore
  5. org.apache.kafka.streams.state.WindowStore
复制代码
必要根据实际使用场景选择符合的状态存储类
用法

Local Store

第一步,天赋生对应类型的StoreBuilder对象,如我必要用KeyValueStore,然后状态存储的名字是:testLocalStore(这个名字不能重复,由于会根据消费者id加储存名称创建对应的Topic,固然如果是不同的KafkaStream步调,消费者id不划一,那么重复就没有关系了),由于是KeyValue类型的储存,以是必要设定对应的Key和Value数据的序列化对象,详细代码如下:
  1. StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), Serdes.String());
复制代码
此中Stores.persistentKeyValueStore代表的我得存储是持久化的,正常都是会用持久化,固然也有存储一些不紧张或者步调重启丢失也无所谓的状态数据,可以使用Stores.inMemoryKeyValueStore以及基于LRU镌汰机制的储存Stores.lruMap,第二个参数Serdes.String()代表存储数据的key是字符串,第三个参数同理,如果是要存储一些对象,也可以使用自定义的序列化类,实现
  1. org.apache.kafka.common.serialization.Serializer
复制代码
序列化类,以及反序列化类
  1. org.apache.kafka.common.serialization.Deserializer
复制代码
然后定义好即可,如:
  1. new Serdes.WrapperSerde<>(new KryoSerializer<>(TestStoreBean.class),
  2.                 new KryoDeserializer<>(TestStoreBean.class)
复制代码
此中KryoSerializer和KryoDeserializer是我自定义的使用Kryo序列化Java对象的类,TestStoreBean是我保存的状态的数据封装bean
KryoSerializer代码如下:
  1. import com.esotericsoftware.kryo.Kryo;import com.esotericsoftware.kryo.io.Output;import org.apache.kafka.common.serialization.Serializer
  2. ;import java.io.ByteArrayOutputStream;/** * kryo序列化类 * @author Raye * @since 2024-6-4 */public class KryoSerializer<T> implements Serializer<T> {    private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal<Kryo>() {        @Override        protected Kryo initialValue() {            Kryo kryo = new Kryo();            /**             * 不要轻易改变这里的配置!更改之后,序列化的格式就会发生变革,             * 上线的同时就必须清除 Redis 里的所有缓存,             * 否则那些缓存再返来反序列化的时间,就会报错             */            //支持对象循环引用(否则会栈溢出)            kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置            //不强制要求注册类(注册举动无法保证多个 JVM 内同一个类的注册编号相同;而且业务体系中大量的 Class 也难以一一注册)            kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置            return kryo;        }    };    /**     * 获适当前线程的 Kryo 实例     *     * @return 当前线程的 Kryo 实例     */    public static Kryo getInstance() {        return KRYO_LOCAL.get();    }    private Class<T> clz;    public KryoSerializer(Class<T> clz) {        this.clz = clz;    }    @Override    public byte[] serialize(String s, T t) {        if(t == null){            return null;        }        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();        Output output = new Output(byteArrayOutputStream);        Kryo kryo = getInstance();        kryo.writeObjectOrNull(output, t,clz);        output.flush();        return byteArrayOutputStream.toByteArray();    }}
复制代码
KryoDeserializer代码如下:
  1. import com.esotericsoftware.kryo.Kryo;import com.esotericsoftware.kryo.io.Input;import org.apache.kafka.common.serialization.Deserializer
  2. ;import java.io.ByteArrayInputStream;/** * kryo反序列化类 * @author Raye * @since 2024-6-4 */public class KryoDeserializer<T> implements Deserializer<T> {    private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal<Kryo>() {        @Override        protected Kryo initialValue() {            Kryo kryo = new Kryo();            /**             * 不要轻易改变这里的配置!更改之后,序列化的格式就会发生变革,             * 上线的同时就必须清除 Redis 里的所有缓存,             * 否则那些缓存再返来反序列化的时间,就会报错             */            //支持对象循环引用(否则会栈溢出)            kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置            //不强制要求注册类(注册举动无法保证多个 JVM 内同一个类的注册编号相同;而且业务体系中大量的 Class 也难以一一注册)            kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置            return kryo;        }    };    /**     * 获适当前线程的 Kryo 实例     *     * @return 当前线程的 Kryo 实例     */    public static Kryo getInstance() {        return KRYO_LOCAL.get();    }    private Class<T> clz;    public KryoDeserializer(Class<T> clz) {        this.clz = clz;    }    @Override    public T deserialize(String s, byte[] bytes) {        if(bytes == null){            return null;        }        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);        Input input = new Input(byteArrayInputStream);        Kryo kryo = getInstance();        try {            return kryo.readObjectOrNull(input, clz);        }catch (Exception e){            e.printStackTrace();        }        return null;    }}
复制代码
同理,使用LocalStore时,可以将代码替换成以下内容:
  1. StoreBuilder<KeyValueStore<String, TestStoreBean>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), new Serdes.WrapperSerde<>(new KryoSerializer<>(TestStoreBean.class),
  2.                 new KryoDeserializer<>(TestStoreBean.class)
  3. );
复制代码
有了StoreBuilder对象之后,直接在StreamsBuilder对象中添加即可
  1. streamsBuilder.addStateStore(kvBuilder);
复制代码
必要使用时,先在处置惩罚数据的Processor类中的init方法获取对应的状态存储对象
  1. this.testLocalStore = context.getStateStore("testLocalStore");
复制代码
然后就可以在process方法中调用testLocalStore的get、put、delete等方法操作状态存储数据了,详细代码如下
  1.         @Slf4j    public static class StreamProcessor implements Processor<String,String,String,String> {        private KeyValueStore<String,String> testLocalStore;        private ProcessorContext context;                private String toTopic;        @Override        public void init(ProcessorContext context) {            this.context = context;            this.testLocalStore = context.getStateStore("testLocalStore");
  2.         }        public StreamProcessor(String toTopic) {            this.toTopic = toTopic;        }        @Override        public void process(Record<String, String> record) {            testLocalStore.put("key1","testValue1");            log.info("testLocalStore key1 : {}",testLocalStore.get("key1"));            testLocalStore.delete("key1");            context.forward(record,toTopic);        }    }
复制代码
此中实现的Processor类全称是:org.apache.kafka.streams.processor.api.Processor,上面代码只是在数据处置惩罚流程中简朴保存了数据,然后获取出来以及删除,没有对流数据做任何处置惩罚,就直接发送到输出的topic了
完整代码如下:
  1.         @Bean    public KStream<String,String> kStreamTestStore(StreamsBuilder streamsBuilder){        log.info("init kStreamTestStore");        StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), Serdes.String());
  2.         streamsBuilder.addStateStore(kvBuilder);
  3.         KStream<String, String> stream = streamsBuilder.stream(fromTopic);        stream.process(()->new StreamProcessor(toTopic), Named.as(fromTopic),"testLocalStore");        streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);        return stream;    }
复制代码
注意:由于使用Store必要通过ProcessorContext对象来获取Store对象,以是在KafkaStream常用的一些map,mapValue,flatMapValues这些流式计算方法中是没办法使用的,只能在一些更底层的Api中去使用,如process
Global Store

同Local Store一样,必要天赋生对应类型的StoreBuilder对象,代码跟Local Store一样
  1. StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore"), Serdes.String(), Serdes.String());
复制代码
然后定义处置惩罚状态更新日志的Processor类,在这个类中,可以对缓存数据举行更新和删除操作(其他地方都是不能直接修改Global Store的)
  1. public class GlobalStoreHandleProcessor<K, V> implements Processor<K,V,Void,Void> {
  2.     private KeyValueStore<K, V> store;
  3.     private String storeName;
  4.     public GlobalStoreHandleProcessor(String storeName) {
  5.         this.storeName = storeName;
  6.     }
  7.    
  8.     @Override
  9.     public void process(Record<K,V> record) {
  10.         if(record == null || record.value() == null) {
  11.             return;
  12.         }
  13.         store.put(record.key(), record.value());
  14.     }
  15.     @Override
  16.     public void init(ProcessorContext context) {
  17.         this.store = context.getStateStore(storeName);
  18.     }
  19. }
复制代码
跟KafkaStream的process是一样的,只必要在process方法中对缓存举行更新或者删除操作即可,我这里只是简朴put操作,详细逻辑可以根据本身情况举行处置惩罚
在StreamsBuilder对象中添加StoreBuilder对象
  1. streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore", Consumed.with(Serdes.String(),Serdes.String()),
  2.                 ()->new GlobalStoreHandleProcessor<>("testGlobalStore"));
复制代码
此中第二个参数testGlobalStore是Global Store绑定的数据变更记录的Topic,如果要更新,则必要通过向这个topic发送数据来举行更新Global Store中的数据
处置惩罚数据的Processor类实例代码
  1. public static class StreamProcessor implements Processor<String,String,String,String> {
  2.         private KeyValueStore<String,String> testGlobalStore;
  3.         private ProcessorContext context;
  4.         private String toTopic;
  5.         @Override
  6.         public void init(ProcessorContext context) {
  7.             this.context = context;
  8.             this.testGlobalStore = context.getStateStore("testGlobalStore");
  9.         }
  10.         public StreamProcessor(String toTopic) {
  11.             this.toTopic = toTopic;
  12.         }
  13.         @Override
  14.         public void process(Record<String, String> record) {
  15.            testLocalStore.put(jsonObject.getString("key"),jsonObject.getString("value"));
  16.             log.info("testLocalStore key1 : {}",testGlobalStore.get("key1"));
  17.             //发送更新Global Store的数据
  18.             context.forward(new Record("testGlobalKey","global value",record.timestamp()),"testGlobalStore");
  19.             context.forward(record,toTopic);
  20.         }
  21.     }
复制代码
与Local Store不同的是,不能在处置惩罚数据流的时间,对缓存举行put操作,只能通过将数据发送到Global Store关联的topic中,在GlobalStoreHandleProcessor中去做更新
完整代码如下:
  1.         @Bean    public KStream<String,String> kStreamTestStore(StreamsBuilder streamsBuilder){        log.info("init kStreamTestStore");        StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore"), Serdes.String(), Serdes.String());
  2.         streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore", Consumed.with(Serdes.String(),Serdes.String()),
  3.                 ()->new GlobalStoreHandleProcessor<>("testGlobalStore"));
  4.         KStream<String, String> stream = streamsBuilder.stream(fromTopic);        stream.process(()->new StreamProcessor(toTopic), Named.as(fromTopic));        streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);        streamsBuilder.build().addSink("testGlobalStore","testGlobalStore",fromTopic);
  5.         return stream;    }
复制代码
与Local Store不同点在于,不必要在process方法中添加store的名字,但是由于要从process方法中直接将更新Store的数据发送到topic,以是必要添加一个Global Store绑定的Topic的输出扩展,也就是下面这行代码
  1. streamsBuilder.build().addSink("testGlobalStore","testGlobalStore",fromTopic);
复制代码
不适合的场景

由于KafkaStream Store 没有自动过期数据和过期数据自动删除的概率(可能是有,但是我没有找到对应文档),以是如果我们存储的key聚集特别大,并且必要自动过期和自动删除,那么就不适合使用Store来处置惩罚了,由于必要我们自行处置惩罚删除逻辑,尤其是有些场景中,并不会对过期的key举行访问,以是采取惰性删除基本上不现实,但是定时删除,由于Store会存储到磁盘,如果存储的key许多,删除对应数据的时间耗时很长,尤其是单次删除大量key的时间,可能会直接超时,并且还必须要本身处置惩罚定时删除的逻辑,想要更好的去删除,就必要大量时间去开发和优化。
虽然使用内存的Store能轻微好点,但是毕竟单个进程内存有限,并且正常流处置惩罚中,如果必要保存状态,那么肯定是希望进程重启之后,能恢复数据,避免计算堕落的,以是如果是有大量不重复key,并且数据必要到期自动删除的话,可以直接使用Redis做状态存储,并且进过我得实际测试,使用Redis并不比Store慢,并且在key量越来越大的情况下,Redis的性能是完全优于Store的(只针对持久化的Store),固然使用Redis,还是会更使用Global Store一样,必要思量数据划一性的问题,不外这个问题可以通过将相同key的数据从Kafka Topic就分配到同一个Topic分区中来避免

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

吴旭华

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表