愛在花開的季節 发表于 前天 18:59

spark streaming

Spark Streaming

5.1 流处理简介

流处理是一种处理连续数据流的技术,适用于及时数据分析、监控和报警等场景。Spark Streaming 是 Spark 生态系统中的一个模块,用于处理及时数据流。
重要特性:


[*]高吞吐量:支持大规模数据流的处理。
[*]低耽误:提供靠近及时的处理能力。
[*]容错性:通过 RDD 的 lineage 信息实现容错。
[*]与 Spark 集成:与 Spark Core、Spark SQL、MLlib 等模块无缝集成。
5.2 DStream(Discretized Stream)概念

DStream 是 Spark Streaming 的根本抽象,代表一个连续的数据流。DStream 由一系列 RDD 构成,每个 RDD 包罗一段时间内的数据。
创建 DStream:
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.Durations;

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
5.3 输入源与输出操纵

5.3.1 输入源
Spark Streaming 支持多种输入源,包罗:


[*]Socket:从网络套接字读取数据。JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);

[*]Kafka:从 Kafka 读取数据。JavaDStream<String> lines = jssc.kafkaStream(topics, kafkaParams);

[*]文件系统:从 HDFS 或当地文件系统读取数据。JavaDStream<String> lines = jssc.textFileStream("hdfs://path/to/input");

5.3.2 输出操纵
输出操纵用于将处理结果保存到外部系统。


[*]print:打印前 10 条记录。lines.print();

[*]saveAsTextFiles:保存为文本文件。lines.saveAsTextFiles("hdfs://path/to/output");

[*]foreachRDD:对每个 RDD 执行自界说操纵。lines.foreachRDD(rdd -> {
    rdd.foreach(record -> {
      // 自定义操作
    });
});

5.4 窗口操纵与状态管理

5.4.1 窗口操纵
窗口操纵允许对一段时间内的数据进行处理。


[*]window:界说窗口长度和滑动隔断。JavaDStream<String> windowedLines = lines.window(Durations.seconds(30), Durations.seconds(10));

[*]reduceByWindow:对窗口内的数据进行聚合。JavaDStream<Integer> wordCounts = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByWindow((x, y) -> x + y, Durations.seconds(30), Durations.seconds(10));

5.4.2 状态管理
状态管理用于维护跨批次的状态信息。


[*]updateStateByKey:根据键更新状态。JavaPairDStream<String, Integer> wordCounts = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .updateStateByKey((values, state) -> {
      int sum = state.orElse(0);
      for (int value : values) {
            sum += value;
      }
      return Optional.of(sum);
    });

5.5 Structured Streaming

Structured Streaming 是 Spark 2.0 引入的流处理引擎,提供了更高级的 API 和更好的性能。
重要特性:


[*]事件时间处理:支持基于事件时间的窗口操纵。
[*]端到端一致性:提供 Exactly-Once 语义。
[*]与 Spark SQL 集成:可以使用 DataFrame 和 Dataset API 进行流处理。
示例:
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> lines = spark.readStream()
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load();

Dataset<Row> words = lines.as(Encoders.STRING()).flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

Dataset<Row> wordCounts = words.groupBy("value").count();

StreamingQuery query = wordCounts.writeStream()
    .outputMode("complete")
    .format("console")
    .start();

query.awaitTermination();
5.6 容错与 Exactly-Once 语义

Spark Streaming 通过以下机制实现容错和 Exactly-Once 语义:


[*]Checkpointing:定期将状态信息保存到可靠的存储系统。
[*]Write Ahead Log (WAL):记录输入数据,确保数据不丢失。
[*]幂等输出:确保输出操纵可以重复执行而不产生副作用。
设置 Checkpoint:
jssc.checkpoint("hdfs://path/to/checkpoint");
5.7 性能优化



[*]并行度:增长分区数,进步并行处理能力。
[*]序列化:使用高效的序列化格式(如 Kryo)。
[*]资源调优:公道分配 CPU 和内存资源。
5.8 Spark Streaming 的应用场景



[*]及时监控:及时监控系统状态和性能指标。
[*]及时保举:根据用户举动及时生成保举结果。
[*]日志分析:及时分析日志数据,检测异常举动。
通过以上内容,你可以深入明白 Spark Streaming 的焦点概念和操纵,并把握如安在实际应用中使用 Spark Streaming 进行高效的及时数据处理。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: spark streaming