马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
1.Flink的特点
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。
Flink主要特点如下:
- 高吞吐和低延迟。每秒处理数百万个变乱,毫秒级延迟。
- 效果的正确性。Flink提供了变乱时间(event--time)和处理时间(processing-time)语义。
对于乱序变乱流,变乱时间语义仍然能提供划一且正确的效果。
- 正确一次(exactly-once)的状态划一性包管。
- 可以连接到最常用的外部体系,如Kafka、Hive、JDBC、HDFS、Redis等。
- 高可用。本身高可用的设置,加上与K8s,YARN和Mesos的细密集成,再加上从故障中快速恢复和动态扩展任务的能力,Flink能做到以极少的停机时间7×24全天候运行。
2.分层API
有状态流处理:通过底层API〔处理函数),对最原始数据加工处理。底层API与DataStream API相集成,可以处理复杂的计算。
DataStream API(流处理)和DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transformations,包括map、flatmap等),连接(joins),聚合(aggregations),窗口(windows)利用等。注意:Flink1.l2以后,DataStream API已经实现真正的流批一体,所以DataSet API已经过期。
Table API是以表为中心的声明式编程,此中表可能会动态变化。Table API遵照关系模子:表有二维数据结构,雷同于关系数据库中的表:同时API提供可比较的利用,比方select,project、join,group-by,aggregate等。我们可以在表与DataStream/DataSet之间无缝切换,以答应程序将Table API与DataStream以及DataSet混淆使用。
SQL这一层在语法与表达能力上与Table API雷同,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。
3.流式数据,批量数据
批处理的特点是有界、大量,非常得当必要访问全套记录才气完成的计算工作,一般用于离线统计。
流处理的特点是无界、及时, 无需针对整个数据集执行利用,而是对通过体系传输的每个数据项执行利用,一般用于及时统计。
在spark的天下观中,一切都是由批次构成的,离线数据是一个大批次,而及时数据是由一个一个无穷的小批次构成的。
而在flink的天下观中,一切都是由流构成的,离线数据是有界限的流,及时数据是一个没有界限的流,这就是所谓的有界流和无界流。
无界数据流:
无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须一连处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等候全部数据都到达,由于输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(比方变乱发生的顺序)获取event,以便可以或许推断效果完整性。
有界数据流:
有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取全部数据来处理有界流,处理有界流不必要有序获取,由于可以始终对有界数据集进行排序,有界流的处理也称为批处理。
4.单作业模式
会话模式由于资源共享会导致许多标题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式。
单作业模式,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发TaskManager 执行。作业作业完成后,集群就会关闭,全部资源也会释放。这样一来,每个作业都有它自己的 JobManager管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业。
这些特性使得单作业模式在生产情况运行更加稳定,所以是现实应用的首选模式。
必要注意的是,Flink 本身无法直接这样运行,所以单作业模式一般必要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes。
5.并行度优先级
• 每一个算子( operator )可以包罗一个或多个子任务( operator subtask ),这些子任务在差别的线程、差别的物理机或差别的容器中完全独立地执行。 • 一个特定算子的 子任务(subtask)的个数 被称为其并行度( parallelism )。 一般情况下,一个流程序的并行度,可以认为就是其全部算子中最大的并行度。一个程序中,差别的算子可能具有差别的并行度。
Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,详细是哪一种形式,取决于算子的种类。
stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。雷同于spark中的窄依赖
stream(map()跟keyBy/window之间大概keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到差别的目标任务。比方,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就雷同于Spark中的shuffle过程。雷同于spark中的宽依赖
并行度可以有如下几种指定方式
- Operator Level(算子级别)(可以使用)
一个算子、数据源和sink的并行度可以通过调用 setParallelism()方法来指定
- Execution Environment Level(Env级别)(可以使用)
执行情况(任务)的默认并行度可以通过调用setParallelism()方法指定。为了以并行度3来执行全部的算子、数据源和data sink, 可以通过如下的方式设置执行情况的并行度:执行情况的并行度可以通过显式设置算子的并行度而被重写
- Client Level(客户端级别,保举使用)(可以使用)
并行度可以在客户端将job提交到Flink时设定。
对于CLI客户端,可以通过-p参数指定并行度 ./bin/flink run -p 10 WordCount-java.jar
- System Level(体系默认级别,只管不使用)
在体系级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定全部执行情况的默认并行度
并行度的优先级:算子级别 > env级别 > Client级别 > 体系默认级别 (越靠前详细的代码并行度的优先级越高)
6.任务调度执行图
逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)。
1)逻辑流图(StreamGraph)
这是根据用户通过 DataStream API编写的代码生成的最初的DAG图,用来表示程序的拓扑结构。这一步一般在客户端完成。
2)作业图(JobGraph)
StreamGraph经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中全部任务的划分。主要的优化为:将多个符合条件的节点链接在一起归并成一个任务节点,形成算子链,这样可以淘汰数据交换的斲丧。JobGraph一般也是在客户端生成的,在作业提交时传递给JobMaster。
我们提交作业之后,打开Flink自带的Web UI,点击作业就能看到对应的作业图。
3)执行图(ExecutionGraph)
JobMaster收到JobGraph后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式。
4)物理图(Physical Graph)
JobMaster生成执行图后,会将它分发给TaskManager;各个TaskManager会根据执行图摆设任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。这只是详细执行层面的图,并不是一个详细的数据结构。
物理图主要就是在执行图的根本上,进一步确定命据存放的位置和收发的详细方式。有了物理图,TaskManager就可以对传递来的数据进行处理计算了。
7.转换算子
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
根本转换算子
1. 映射(map)
map 是各人非常熟悉的大数据利用算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一 一映射”,消费一个元素就产出一个元素
我们只必要基于 DataStream 调用 map()方法就可以进行转换处理。方法必要传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream。
- public class TransMapTest {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DataStreamSource<Event> stream = env.fromElements(
- new Event("Mary", "./home", 1000L),
- new Event("Bob", "./cart", 2000L)
- );
- //1.自定义类,实现mapfunction接口
- SingleOutputStreamOperator<String> result1 = stream.map(new UserExtractor());
- // 2.传入匿名类,实现MapFunction
- SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() {
- @Override
- public String map(Event value) throws Exception {
- return value.user;
- }
- });
- //3.传入lambda表达式
- SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);
- result1.print();
- //result2.print();
- //result3.print();
- env.execute();
- }
- public static class UserExtractor implements MapFunction<Event, String> {
- @Override
- public String map(Event value) throws Exception {
- return value.user;
- }
- }
- }
复制代码MapFunction 实现类的泛型类型,与输入数据类型和输出数据的类型有关。
在实现 MapFunction 接口的时候,必要指定两个泛型,分别是输入变乱和输出变乱的类型,还
必要重写一个 map()方法,定义从一个输入变乱转换为另一个输出变乱的详细逻辑。
2. 过滤(filter)
filter转换利用,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换必要传入的参数必要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相称于一个返回布尔类型的条件表达式。
- public class TransFilterTest {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DataStreamSource<Event> stream = env.fromElements(
- new Event("Mary", "./home", 1000L),
- new Event("Bob", "./cart", 2000L)
- );
- //1.传入实现filterfunction的自定义类
- SingleOutputStreamOperator<Event> result1 = stream.filter(new UserFilter());
- // 2.传入匿名类实现FilterFunction
- SingleOutputStreamOperator<Event> result2 = stream.filter(new FilterFunction<Event>() {
- @Override
- public boolean filter(Event e) throws Exception {
- return e.user.equals("Bob");
- }
- });
- //3.传入lambda表达式
- SingleOutputStreamOperator<Event> result3 = stream.filter(data -> data.user.equals("Bob"));
- result1.print();
- // result2.print();
- // result3.print();
- env.execute();
- }
- public static class UserFilter implements FilterFunction<Event> {
- @Override
- public boolean filter(Event e) throws Exception {
- return e.user.equals("Mary");
- }
- }
- }
复制代码3. 扁平映射(flatMap))
flatMap 利用又称为扁平映射,主要是将数据流中的团体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步利用的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理.
同 map 一样,flatMap 也可以使用 Lambda 表达式大概 FlatMapFunction 接口实现类的方式
来进行传参,返回值类型取决于所传参数的详细逻辑,可以与原数据流相同,也可以差别。
- public class TransFlatmapTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DataStreamSource<Event> stream = env.fromElements(
- new Event("Mary", "./home", 1000L),
- new Event("Bob", "./cart", 2000L)
- );
- //1.实现自定义的flatmapfunction
- stream.flatMap(new MyFlatMap()).print("1");
- //2.传入lambda表达式
- stream.flatMap((Event value,Collector<String> out)-> {
- if (value.user.equals("Marry"))
- out.collect(value.url);
- else if(value.user.equals("Bob"))
- out.collect(value.user);
- out.collect(value.url);
- out.collect(value.timestamp.toString());
- }).returns(new TypeHint<String>() {
- })
- .print("2");
- env.execute();
- }
- public static class MyFlatMap implements FlatMapFunction<Event, String> {
- @Override
- public void flatMap(Event value, Collector<String> out) throws Exception {
- out.collect(value.user);
- out.collect(value.url);
- out.collect(value.timestamp.toString());
- }
- }
- }
复制代码flatMap 利用会应用在每一个输入变乱上面,FlatMapFunction 接口中定义了 flatMap 方法,
用户可以重写这个方法,在这个方法中对输入数据进行处理,并决定是返回 0 个、1 个或多个
效果数据。因此 flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来
指定输出。渴望输出效果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调
用,也可以不调用。所以 flatMap 方法也可以实现 map 方法和 filter 方法的功能,当返回效果
是 0 个的时候,就相称于对数据进行了过滤,当返回效果是 1 个的时候,相称于对数据进行了
简单的转换利用。
聚合算子(Aggregation)
1. 按键分区(keyBy)
对于Flink而言,DataStream是没有直接进行聚合的API的。由于我们对海量数据做聚合肯定要进行分区并行处理,这样才气进步效率。所以在Flink中,要做聚合,必要先进行分区;这个利用就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成差别的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
基于差别的key,流中的数据将被分配到差别的分区中去;这样一来,全部具有相同的key的数据,都将被发往同一个分区。
2. 简单聚合
有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合利用了。Flink为我们内置实现了一些最根本、最简单的聚合API,主要有以下几种:
- sum():在输入流上,对指定的字段做叠加求和的利用。
- min():在输入流上,对指定的字段求最小值。
- max():在输入流上,对指定的字段求最大值。
- minBy():与min()雷同,在输入流上针对指定字段求最小值。差别的是,min()只计算指定字段的最小值,其他字段会保存最初第一个数据的值;而minBy()则会返回包罗字段最小值的整条数据。
- maxBy():与max()雷同,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全划一。
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也必要传入参数;但并不像根本转换算子那样必要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
- public class TransformSimpleAggTest {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DataStreamSource<Event> stream = env.fromElements(
- new Event("Mary", "./home", 1000L),
- new Event("Bob", "./cart", 2000L),
- new Event("Alice","./prod?id=100", 3000L),
- new Event("Bob","./prod?id=1", 3300L),
- new Event("Bob", "./home", 3500L),
- new Event("Alice","./prod?id=200", 3200L),
- new Event("Bob","./prod?id=2", 3800L),
- new Event("Bob","./prod?id=3", 4200L)
- );
- // 按键分组之后进行聚合,提取当前用户最后一次访问数据
- stream.keyBy(new KeySelector<Event, String>() {
- @Override
- public String getKey(Event value) throws Exception {
- return value.user;
- }
- }).max("timestamp")
- .print("max:");
- stream.keyBy(data -> data.user)
- .maxBy("timestamp")
- .print("maxBy:");
- env.execute();
- }
- }
复制代码输出效果为:
- max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
- maxBy:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
- max:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
- maxBy:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
- max:> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
- maxBy:> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
- max:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:03.3}
- maxBy:> Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3}
- max:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:03.5}
- maxBy:> Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5}
- max:> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.2}
- maxBy:> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.2}
- maxBy:> Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8}
- maxBy:> Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2}
- max:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:03.8}
- max:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:04.2}
复制代码3. 归约聚合(reduce)
与简单聚合雷同,reduce 利用也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
调用 KeyedStream 的 reduce 方法时,必要传入一个参数,实现 ReduceFunction 接口。接口在源码中的定义如下:
- public interface ReduceFunction<T> extends Function, Serializable {
- T reduce(T value1, T value2) throws Exception;
- }
复制代码ReduceFunction 接口里必要实现 reduce()方法,这个方法接收两个输入变乱,经过转换处理之后输出一个相同类型的变乱;所以,对于一组数据,我们可以先取两个进行归并,然后再将归并的效果看作一个数据、再跟后面的数据归并,最终会将它“简化”成唯一的一个数据,这也就是 reduce“归约”的寄义。在流处理的底层实现过程中,现实上是将中心“归并的效果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
- public class TransformReduceTest {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DataStreamSource<Event> stream = env.fromElements(
- new Event("Mary", "./home", 1000L),
- new Event("Bob", "./cart", 2000L),
- new Event("Alice","./prod?id=100", 3000L),
- new Event("Bob","./prod?id=1", 3300L),
- new Event("Alice","./prod?id=200", 3200L),
- new Event("Bob", "./home", 3500L),
- new Event("Bob","./prod?id=2", 3800L),
- new Event("Bob","./prod?id=3", 4200L)
- );
- //1. 统计每个用户的访问频次
- SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
- @Override
- public Tuple2<String, Long> map(Event value) throws Exception {
- return Tuple2.of(value.user, 1L);
- }
- }).keyBy(data -> data.f0)
- .reduce(new ReduceFunction<Tuple2<String, Long>>() {
- @Override
- public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
- return Tuple2.of(value1.f0, value1.f1 + value2.f1);
- }
- });
- //2. 选取当前最活跃的用户
- SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key")
- .reduce(new ReduceFunction<Tuple2<String, Long>>() {
- @Override
- public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
- return value1.f1 > value2.f1 ? value1 : value2;
- }
- });
- result.print();
- env.execute();
- }
- }
复制代码 输出效果为:
- (Mary,1)
- (Bob,1)
- (Alice,1)
- (Bob,2)
- (Alice,2)
- (Bob,3)
- (Bob,4)
- (Bob,5)
复制代码 8.输出算子sink
连接到外部体系
Flink作为数据处理框架,最终还是要把计算处理的效果写入外部存储,为外部应用提供支持。
Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource雷同,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部体系连接、并将数据提交写入的;Flink程序中全部对外的输出利用,一般都是使用Sink算子完成的。
Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。岂论怎样理解,Sink 在 Flink 中代表了将效果数据收集起来、输出到外部的意思,所以我们这里同一把它直观地叫作“输出算子”。
与 Source 算子非常雷同,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSource 的参数必要实现一个 SourceFunction 接口;雷同地,addSink 方法同样必要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只必要重写一个方法 invoke(),用来将指定的值写入到外部体系中。这个方法在每条数据记录到来时都会调用:
default void invoke(IN value, Context context) throws Exception
SinkFuntion 多数情况下同样并不必要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。
列出了 Flink 官方如今支持的第三方体系连接器:
除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一些其他第三方体系与 Flink 的连接器:
除此以外,就必要用户自定义实现 sink 连接器了。
输出到文件
Flink 为此专门提供了一个流式文件体系的连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来包管正确一次(exactly once)的划一性语义。
StreamingFileSink 为批处理和流处理提供了一个同一的 Sink,它可以将分区文件写入 Flink支持的文件体系。它可以包管正确一次的状态划一性,大大改进了之前流式文件 Sink 的方式。它的主要利用是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种设置来控制“分桶”的利用;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。
StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种差别的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:
• 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。 • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。 - public class SinkToFileTest {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
- DataStream<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
- new Event("Bob", "./cart", 2000L),
- new Event("Alice", "./prod?id=100", 3000L),
- new Event("Bob", "./prod?id=1", 3300L),
- new Event("Alice", "./prod?id=300", 3200L),
- new Event("Bob", "./home", 3500L),
- new Event("Bob", "./prod?id=2", 3800L),
- new Event("Bob", "./prod?id=3", 4200L));
- StreamingFileSink<String> fileSink = StreamingFileSink
- .<String>forRowFormat(new Path("./output"),
- new SimpleStringEncoder<>("UTF-8"))
- .withRollingPolicy(
- DefaultRollingPolicy.builder()
- .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
- .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
- .withMaxPartSize(1024 * 1024 * 1024)
- .build())
- .build();
- // 将Event转换成String写入文件
- stream.map(data -> data.toString()).addSink(fileSink);
- env.execute();
- }
- }
复制代码 输出到Kafka
我们要将数据输出到 Kafka,整个数据处理的闭环已经形成,所以可以完整测试如下:
(1)添加 Kafka 连接器依赖
由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复
先容了。
(2)启动 Kafka 集群
(3)编写输出到 Kafka 的示例代码
1.启动zookeeper
命令:bin/zkServer.sh start
2.启动kafka
命令:bin/kafka-server-start.sh -daemon config/server.properties
3.创建生产者
命令:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,由于必要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的变乱性包管,可以或许真正做到正确一次(exactly once)的状态划一性。
4. 运行代码,另开启一个master01窗口,启动一个消费者, 查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events
- public class SinkToKafkaTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "master01:9092");
- DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(
- "clicks",
- new SimpleStringSchema(),
- properties
- ));
- SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
- @Override
- public String map(String value) throws Exception {
- String[] fields = value.split(",");
- return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
- }
- });
- result.addSink(new FlinkKafkaProducer<String>("master01:9092", "events", new SimpleStringSchema()));
- env.execute();
- }
- }
复制代码 效果:
2025年的第一天我居然还在复习【枯死】
新的一年,梦虽遥,追则能达。愿虽艰,持则可圆。祝各人所愿皆所成,多喜乐,长安宁。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |