Flink实时流处理入门与实践
一、引言1.1 实时流处理的重要性
在当今数据驱动的期间,实时数据处理变得越来越重要。企业需要从不停产生的大量数据中快速提取有价值的信息,以支持决策制定和业务优化。实时流处理技术能够实时处理数据流,提供即时的洞察和响应,从而提高业务效率和竞争力。
1.2 Flink简介
Apache Flink 是一个开源的分布式流处理框架,支持批处理和流处理。Flink 提供了高吞吐量、低延迟和准确一次(exactly-once)的语义,适用于各种实时数据处理场景,如实时数据分析、实时监控、实时推荐系统等。
1.3 Flink与其他流处理框架的比较
[*]Apache Kafka Streams:专注于轻量级的流处理,恰当简单的流处理任务。
[*]Apache Storm:基于流的分布式计算框架,恰当低延迟的流处理。
[*]Apache Spark Streaming:基于Spark的流处理扩展,恰当大数据处理场景。
[*]Apache Flink:提供高吞吐量、低延迟和准确一次的语义,支持复杂的流处理任务。
二、Flink核心概念
2.1 流处理模型
流处理模型将数据视为连续的、无界的流。Flink 支持变乱时间、处理时间和摄入时间三种时间语义,并提供了丰富的窗口操作来处理数据流。
2.2 Flink架构
2.2.1 JobManager
JobManager 是 Flink 集群的主节点,负责作业的调理和资源管理。
2.2.2 TaskManager
TaskManager 是 Flink 集群的从节点,负责执行任务并管理内存和网络资源。
2.2.3 ZooKeeper
ZooKeeper 用于调和 JobManager 和 TaskManager,确保集群的高可用性。
2.3 Flink API
2.3.1 DataStream API
DataStream API 是 Flink 的核心 API,用于处理无界和有界数据流。
2.3.2 Table API & SQL
Table API 和 SQL 提供了声明式的数据处理方式,适用于复杂的数据查询和转换。
2.3.3 Flink ML
Flink ML 提供了机器学习库,支持在 Flink 上进行大规模机器学习任务。
三、Flink环境搭建
3.1 安装Java
确保已安装 Java 8 或更高版本。可以通过以下下令查抄 Java 版本:
java -version
3.2 下载并安装Flink
从 Flink 官方网站 下载最新版本的 Flink,并解压到当地目次。
tar -xzf flink-*.tgz
cd flink-*
3.3 启动Flink集群
启动 Flink 集群,包括 JobManager 和 TaskManager。
./bin/start-cluster.sh
3.4 设置Flink
在 conf/flink-conf.yaml 文件中设置 Flink 参数,如并行度、内存等。
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
taskmanager.numberOfTaskSlots: 2
四、Flink核心API使用
4.1 DataStream API
4.1.1 创建DataStream
创建一个简单的 DataStream 并进行根本操作。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件读取数据
DataStream<String> text = env.readTextFile("path/to/input/file");
// 处理数据流
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
// 输出结果
counts.print();
// 执行作业
env.execute("WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
4.1.2 数据转换操作
使用 map、filter 等操作对数据流进行转换。
DataStream<String> filteredText = text
.flatMap(new Tokenizer())
.filter(word -> word.f0.length() > 3)
.map(word -> word.f0.toUpperCase());
4.1.3 数据分区操作
使用 keyBy 和 shuffle 等操作对数据流进行分区。
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
4.1.4 状态管理
使用 Flink 的状态管理功能来维护状态。
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class StatefulWordCount extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> count;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
"count",
Types.INT
);
count = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
Integer currentCount = count.value();
if (currentCount == null) {
currentCount = 0;
}
currentCount += value.f1;
count.update(currentCount);
out.collect(new Tuple2<>(value.f0, currentCount));
}
}
4.1.5 时间语义与窗口操作
使用变乱时间、处理时间和摄入时间,并进行窗口操作。
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1);
4.2 Table API & SQL
4.2.1 创建TableEnvironment
创建一个 TableEnvironment 并注册表。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TableApiExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
// 注册表
tableEnv.executeSql("CREATE TABLE myTable (" +
"id INT," +
"name STRING," +
"age INT," +
"time TIMESTAMP(3)," +
"WATERMARK FOR time AS time - INTERVAL '5' SECOND" +
") WITH (" +
"'connector' = 'filesystem'," +
"'path' = 'path/to/input/file'," +
"'format' = 'csv'" +
")");
}
}
4.2.2 数据转换操作
使用 Table API 和 SQL 进行数据转换。
// 使用 Table API
Table resultTable = tableEnv.from("myTable")
.filter($("age").isGreater(18))
.select($("id"), $(
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]