flink项目的核心目标:数据流上的有状态盘算(Stateful Computations over Data Streams)。
时间驱动:来一个事件处置惩罚一个事件;流处置惩罚的流水线;流&批数据分析
具体定位是:Apache Flink 是一个框架和分布式处置惩罚引擎,用于对无界和有界数据流进行有状态盘算。Flink 被计划在全部常见的集群情况中运行,以内存执行速度和恣意规模来执行盘算。
⑤ 在分组之后调用 sum 方法进行聚合,同样只能指定聚合字段的位置索引或属性名称。
(4) 运行程序,控制台会打印出结果:
可以看到,我们将文档中的全部单词的频次,全部统计出来,以二元组的情势在控制台打印输出了。需要注意的是,这种代码的实现方式,是基于 DataSet API 的,也就是我们对数据的处置惩罚转换,是看作数据集来进行操作的。事实上 Flink 本身是流批统一的处置惩罚架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。所以从 Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处置惩罚:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
这样,DataSet API 就已经处于“软弃用”(soft deprecated)的状态,在现实应用中我们只要维护一套 DataStream API 就可以了。这里只是为了方便各人理解,我们依然用 DataSet API做了批处置惩罚的实现。
2.3.2 流处置惩罚
用 DataSet API 可以很轻易地实现批处置惩罚;与之对应,流处置惩罚当然可以用DataStream API 来实现。对于 Flink 而言,流才是整个处置惩罚逻辑的底层核心,所以流批统一之后的 DataStream API 更加强盛,可以直接处置惩罚批处置惩罚和流处置惩罚的全部场景
1. 读取文件(有界流)
(1) 新建 Java 类 BoundedStreamWordCount,在静态 main 方法中编写测试代码。具体代码实现如下:
Caused by: java.io.IOException: Insufficient number of network buffers: required 65, but only 33 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:372)
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:350)
at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:280)
at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:150)
at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.setup(BufferWritingResultPartition.java:95)
at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:943)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:652)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)