Flink流应用程序处置惩罚的是以数据对象表示的事件流。所以在Flink内部,我么需要能够处置惩罚这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和生存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处置惩罚的数据类型。并为每个数据类型生成特定的序列化器、反序列化器和比较器。Flink支持非常完善的数据类型,数据类型描述信息都是由TypeInformation定义,比较常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo类等。TypeInformation主要作用是为了在 Flink系统内有效地对数据结构类型进行管理,能够在分布式盘算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。Flink能够支持任意的Java或Scala的数据类型,不用像Hadoop中的org.apache.hadoop.io.Writable而实现特定的序列化和反序列化接口,从而让用户能够更加轻易使用已有的数据结构类型。另外使用TypeInformation管理数据类型信息,能够在数据处置惩罚之前将数据类型推断出来,而不是真正在触发盘算后才辨认出,这样能够实时有效地避免用户在使用Flink编写应用的过程中的数据类型题目。
原生数据类型
Flink通过实现BasicTypeInfo数据类型,能够支持任意Java 原生根本类型(装箱)或String类型,例如Integer、String、Double等,如以下代码所示,通过从给定的元素集中创建DataStream数据集。
- //创建 Int 类型的数据集
- DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3, 4, 5);
- //创建 String 的类型的数据集
- DataStreamSource<String> stringDataStreamSource = env.fromElements("Java", "Scala");
复制代码 Flink实现另外一种TypeInfomation是BasicArrayTypeInfo,对应的是Java根本类型数组(装箱)或String对象的数组,如下代码通过使用 Array数组和List集合创建DataStream数据集。
- List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
- //通过 List 集合创建数据集
- DataStreamSource<Integer> integerDataStreamSource1 = env.fromCollection(integers);
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |