Flink使命如何跑起来之 1.DataStream和Transformation

打印 上一主题 下一主题

主题 639|帖子 639|积分 1917

Flink使命如何跑起来之 1.DataStream和Transformation

1. 滥觞

在利用Flink完成业务功能之余,有须要了解下我们的使命是如何跑起来的。知其然,知其所以然。
既然重点是学习应用程序如何跑起来,那么应用程序的内容不重要,越简朴越好。
WordCount示例作为学习数据引擎时hello word程序,再合适不过。接下来便以使命实行顺序为线索开启对源码徐徐学习。
  1. public class WordCount {
  2.     public static void main(String[] args) throws Exception {
  3.         // 初始化执行环境
  4.         Configuration configuration = new Configuration();
  5.         configuration.setString("rest.port", "9091");
  6.         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
  7.         env.setParallelism(1);
  8.         // 业务逻辑转换
  9.         DataStream<String> text = env.fromCollection(Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan")).name("zl-source");
  10.         DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))
  11.                 .returns(Types.TUPLE(Types.STRING, Types.INT))
  12.                 .keyBy(value -> value.f0)
  13.                 .sum(1)
  14.                 .name("counter");
  15.         counts.print().name("print-sink");
  16.         // 执行应用程序
  17.         env.execute("WordCount");
  18.     }
  19. }
复制代码
为了使示例代码足够纯粹(直接复制粘贴后即可跑起来的那种),因此在示例中直接利用List数据作为Source。
最后,计划将自己学习的过程以系列文档的形式作为记录。同时作为自己学习过程的记录,大概存在错误或片面理解,欢迎一起讨论。
2. 头疼的“脚色”

在学习源码或查阅资料的同时,以下单词(但不限于)一定会频仍出现,它们或者直接对应flink源码中的接口、类名,或者是一些概念名称。初次看到难免让人抓狂。如今先对这些单词混个脸熟。
Client
JobManager/JobMaster
TaskManager/TaskExecutor
Transformation
StreamOperator
StreamGraph
JobGraph
ExecutionGraph
Task
StreamTask
……
3. 宏观视角

当使命开始实行后,便可以在WebUI上查看其对应的物理实行拓扑,即Task DAG。从我们编写的应用程序代码到Task DAG势必履历了复杂的解析转换操作,这个过程大要如下所示。

我们编写的应用程序代码首先会转化为Transformation,该实例将作为Flink世界中的起点,开启了之后一系列“旅程”。
4. env.execute()方法做了什么?

在利用DataStream API编写应用程序时,无论业务逻辑如何如何的复杂,但整体结构大致由三部门构成,即
  1. // 1.初始化执行环境
  2. StreamExecutionEnvironment env = ;
  3. // 2.业务逻辑转换,即一系列的DataStream转化
  4. DataStream source = ;
  5. // 3.env.execute()
  6. env.execute();
复制代码
既然最后必须实行 env.execute()方法,那么首先了解下execute都实行了那些操作。
基于1.16版本的源码,并只保存了源码中的关键逻辑。
  1. // 方法1
  2. public JobExecutionResult execute(String jobName) throws Exception {
  3.     final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
  4.     // 生成StreamGraph,最终调用方法4,通过StreamGraphGenerator生成StreamGraph
  5.     StreamGraph streamGraph = getStreamGraph();
  6.     // ...
  7.     try {
  8.         // 调用方法2
  9.         return execute(streamGraph);
  10.     } catch (Throwable t) {
  11.         // ...
  12.     }
  13. }
  14. // 方法2
  15. public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
  16.     // 调用方法3,通过StreamGraph最终得到JobClient
  17.     final JobClient jobClient = executeAsync(streamGraph);
  18.     try {
  19.         final JobExecutionResult jobExecutionResult;
  20.         // ...
  21.         jobListeners.forEach(
  22.                 jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
  23.         return jobExecutionResult;
  24.     } catch (Throwable t) {
  25.         // ...
  26.     }
  27. }
  28. // 方法3
  29. public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
  30.     // 根据启动环境,得到对应环境的Executor实现
  31.     // 如miniCluster环境则对应LocalExecutor
  32.     final PipelineExecutor executor = getPipelineExecutor();
  33.     // 在具体的executor.execute方法中,将StreamGraph先转化成JobGraph,在将JobGraph提交到JobManager中
  34.     CompletableFuture<JobClient> jobClientFuture =
  35.             executor.execute(streamGraph, configuration, userClassloader);
  36.     try {
  37.         JobClient jobClient = jobClientFuture.get();
  38.         jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
  39.         collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
  40.         collectIterators.clear();
  41.         return jobClient;
  42.     } catch (ExecutionException executionException) {
  43.         // ...
  44.     }
  45. }
  46. // 方法4
  47. private StreamGraph getStreamGraph(List<Transformation<?>> transformations) {
  48.     synchronizeClusterDatasetStatus();
  49.     // 根据Transformation生成StreamGraph
  50.     return getStreamGraphGenerator(transformations).generate();
  51. }
复制代码
通过上述源码调用链可以,完成从DataStream API->Transformation->StreamGraph->JobGraph的转化。最后将JobGraph提交到了JobManager中并,实行后续操作。
从上述方法4getStreamGraph(List<Transformation<?>> transformations)可知,StreamGraph由Transformation演变而来,此处不禁会产生一个新的疑问,Transformation又从何而来?
WordCount示例代码中并没有与Transformation直接相关的代码。通过查看getSreamGraph方法的完成调用链可知其入参直接来自是StreamExecutionEnvironment类中的transformations成员属性值。在应用程序第一步便生成了StreamExecutionEnvironment的实例,接下来通过env得到DataStream并进行了一系列的转化操作,而在最后的execute方法中便已直接利用transformations属性值了,那么该属性中一定是前面2个过程中实际赋值的。
  1. protected final List<Transformation<?>> transformations = new ArrayList<>();
复制代码
5. Transformation何时生成?

从StreamExecutionEnvironment的源码中可知,transformations属性只有addOperator方法会实行聚集的add操作,其余地方均为聚集的get操作。
然而addOperator方法有诸多调用方,且均为其他类中的调用,继续往上查看调用方有些困难,因此这里临时记下addOperator方法唯一对transformations聚集中实行add操作的结论。
  1. // 该方法不适合用户使用。创建operator的api方法必须调用此方法
  2. @Internal
  3. public void addOperator(Transformation<?> transformation) {
  4.     Preconditions.checkNotNull(transformation, "transformation must not be null.");
  5.     this.transformations.add(transformation);
  6. }
复制代码
通过查看StreamExecutionEnvironment实例的创建过程,可以发如今创建过程中并无transformations的add操作,因此是在DataStream转换操作中对transformations实行了add操作。
5.1. DataStream

在Flink中利用DataStream表现数据流。其仅用于表达业务转化逻辑,实际上并没有真正的存储数据。
DataSteam是顶层封装类,其子类如下

DataStream类中只有两个成员属性,分别是StreamExecutionEnvironment和Transformation,并在构造方法中对其进行初始化。因此实例化DataStream的同时除实行环境外,还必须传入Transformation的实例。
  1. public class DataStream<T> {
  2.     protected final StreamExecutionEnvironment environment;
  3.     protected final Transformation<T> transformation;
  4.     public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) {
  5.         this.environment =
  6.                 Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
  7.         this.transformation =
  8.                 Preconditions.checkNotNull(
  9.                         transformation, "Stream Transformation must not be null.");
  10.     }
  11.     // ...
  12. }
复制代码
回到WordCount示例代码中,从聚集到DataStream的过程,封装示意如下。

注意,Transformation中并不是直接持有了AbstractUdfStreamOperator的引用,而是对应的工厂。
源码中关键步调如下
  1. // 步骤1,从List到Function
  2. public <OUT> DataStreamSource<OUT> fromCollection(
  3.         Collection<OUT> data, TypeInformation<OUT> typeInfo) {
  4.     // ...
  5.     // 创建SourceFunction实例,SourceFunction是Function的实现
  6.     SourceFunction<OUT> function = new FromElementsFunction<>(data);
  7.     return addSource(function, "Collection Source", typeInfo, Boundedness.BOUNDED)
  8.             .setParallelism(1);
  9. }
  10. // 步骤2,从Function到StreamOperator
  11. private <OUT> DataStreamSource<OUT> addSource(
  12.         final SourceFunction<OUT> function,
  13.         final String sourceName,
  14.         @Nullable final TypeInformation<OUT> typeInfo,
  15.         final Boundedness boundedness) {
  16.     // ...
  17.     // 创建StreamSource实例,StreamSource是AbstractUdfStreamOperator的子类,Flink中算子的表示
  18.     final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
  19.     return new DataStreamSource<>(
  20.             this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
  21. }
  22. // 步骤3,从StreamOperator到Transformation,再到DataStream
  23. public DataStreamSource(
  24.         StreamExecutionEnvironment environment,
  25.         TypeInformation<T> outTypeInfo,
  26.         StreamSource<T, ?> operator,
  27.         boolean isParallel,
  28.         String sourceName,
  29.         Boundedness boundedness) {
  30.     super(
  31.             environment,
  32.             // 创建Transformation实例,Transformation是PhysicalTransformation的子类
  33.             new LegacySourceTransformation<>(
  34.                     sourceName,
  35.                     // 将StreamSource封装到Transformation中
  36.                     operator,
  37.                     outTypeInfo,
  38.                     environment.getParallelism(),
  39.                     boundedness));
  40.     // ...
  41. }
复制代码
继续查看DataStream的map操作可以可以发现,焦点流程和上述由聚集创建DataStream的过程基本一致:


  • 首先创建Function实例
  • 其次由Function实例创建AbstractUdfStreamOperator实例
  • 然后将AbstractUdfStreamOperator实例封装到Transformation实例中
  • 最后由Transformation和StreamExecutionEnvironment实例创建DataStream实例
不同之处在于,map操作最后将得到的PhysicalTransformation实例添加到StreamExecutionEnvironment实例中的transformations聚集中去了。这点差别其实和Transformation实例表现的寄义有关,放在文章末尾解释。
  1. protected <R> SingleOutputStreamOperator<R> doTransform(
  2.         String operatorName,
  3.         TypeInformation<R> outTypeInfo,
  4.         StreamOperatorFactory<R> operatorFactory) {
  5.     // ...
  6.     OneInputTransformation<T, R> resultTransform =
  7.             new OneInputTransformation<>(
  8.                     this.transformation,
  9.                     operatorName,
  10.                     operatorFactory,
  11.                     outTypeInfo,
  12.                     environment.getParallelism());
  13.     SingleOutputStreamOperator<R> returnStream =
  14.             new SingleOutputStreamOperator(environment, resultTransform);
  15.     // 区别:添加Transformation到StreamExecutionEnvironment中
  16.     getExecutionEnvironment().addOperator(resultTransform);
  17.     return returnStream;
  18. }
复制代码
但并不是全部的DataStream转化操作都需要履历上述将Function实例封装成AbstractUdfStreamOperator实例,然后将AbstractUdfStreamOperator实例封装到PhysicalTransformation实例的过程。如示例代码中的keyBy和sum操作。其中keyBy并未直接涉及Function,而sum操作直接将得到的SumAggregator函数实例封装到了ReduceTransformation实例中,然后由ReduceTransformation实例得到DataStream实例。
5.2. Transformation

DataStream面向开发者,而Transformation面向flink内核。
每个DataStream实例中都包含一个Transformation实例,表现当前Datastream从上游的DataStream利用该Transformation而来。而全部DataStream中Transformation又都添加到了StreamExecutionEnvironment实例中的transformations聚集中去,用于接下来的StreamGraph实例的生成。
Transformation中记录了上游的数据泉源,但其并关心数据的物理泉源、序列化、转发等问题。
Transformatio是顶层抽象类,有众多的子类,涵盖了DataStream的全部转换,其直接子类如下,可以分为两大类


  • PhysicalTransformation,将会转换成后续graph中节点信息
  • 非PhysicalTransformation,将会转换成后续graph中的边信息

Transformation中属性如下所示,其中Optional<SlotSharingGroup>表现共享槽位信息,只有开启了答应共享槽位后,该属性才会被设置值。
其构造方法如下,除name外还需要输出范例和并行度两个参数。
  1. public Transformation(String name, TypeInformation<T> outputType, int parallelism) {
  2.     this.id = getNewNodeId();
  3.     this.name = Preconditions.checkNotNull(name);
  4.     this.outputType = outputType;
  5.     this.parallelism = parallelism;
  6.     this.slotSharingGroup = Optional.empty();
  7. }
复制代码
PhysicalTransformation仅在其父类的底子上增加了设置ChainingStrategy的方法,用于表现生成算子链的计谋。
  1. @Internal
  2. public abstract class PhysicalTransformation<T> extends Transformation<T> {
  3.     PhysicalTransformation(String name, TypeInformation<T> outputType, int parallelism) {
  4.         super(name, outputType, parallelism);
  5.     }
  6.     /** Sets the chaining strategy of this {@code Transformation}. */
  7.     public abstract void setChainingStrategy(ChainingStrategy strategy);
  8. }
复制代码
PhysicalTransformation中有众多的实现子类,全部子类继续关系如下。

其中以下几个子类进场频率相对更高一些,其他子类只有我们的业务逻辑比较复杂时才会用到。


  • LegacySourceTransformation 表现Source的Transformation
  • LegacySinkTransformation 表现Sink的Transformation
  • SourceTransformation
  • SinkTransformation
  • OneInputTransformation 表现单个输入流的Transformation,如常见的map、flatMap、fliter等
  • TwoInputTransformation 表现两个输入流的Transformation,如concat
疑问:为什么Source和Sink都各自分别有两个Transformation子类?
通过名称也可以看出一些端倪,新老两种实现。
在1.14版本之前,分别通过env.addSource(SourceFunction)和DataStream.addSink(SinkFunction)方法生成source和sink
从1.14版本开始新增了env.fromSource(Source)和DataStream.sinkTo(Sink)的方式生成source和sink。
新旧方法中入参范例不同,因此导致了两种不同的Transformation实现,从各自的实现类中也可以体现这一点,如下所示。
  1. public class LegacySourceTransformation<T> extends PhysicalTransformation<T>
  2.         implements WithBoundedness {
  3.     // sourceFunction的引用
  4.     private final StreamOperatorFactory<T> operatorFactory;
  5.     // ...
  6. }
  7. public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
  8.         extends PhysicalTransformation<OUT> implements WithBoundedness {
  9.     // source的引用
  10.     private final Source<OUT, SplitT, EnumChkT> source;
  11.     // ...
  12. }
  13. public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {
  14.     private final Transformation<T> input;
  15.     // sinkFunction的引用
  16.     private final StreamOperatorFactory<Object> operatorFactory;
  17.     // ...
  18. }
  19. public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<OutputT> {
  20.     private final DataStream<InputT> inputStream;
  21.     // sink的引用
  22.     private final Sink<InputT> sink;
  23.     private final Transformation<InputT> input;
  24.     // ...
  25. }
复制代码
Source作为整个数据流的头部,不存在上游,因此其Transformation实现中没有上游Transformation的引用,除此之外其余的Transformation子类中,均持有一个表现上游Transformation的引用,如上述sink中的input属性。
最后解释下,前面提到的为什么没有将表现Source的DataStream中的Transformation加入到env中表现Transformation的聚集中,而接下来的转化中,将对应的Transformation加入到了env中。因为Source作为数据源的头部,不会存在上游,而Source作为其他DataSteam的上游,一定会加入到其Transformation的input中,因此没须要单独将Source的transformation加入到env中。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

雁过留声

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表