Apache Spark 是一个非常流行的大数据处置惩罚框架,以其高性能和机动性著称。Spark 支持多种编程语言,包括 Scala、Java 和 Python。本节将重点介绍 Spark 的 Java API,以及如何使用这些 API 举行大数据处置惩罚。
Spark 的主要组件
- Spark Core:提供基础的分布式计算能力,包括使命调度、内存管理、容错恢复等。
- Spark SQL:用于处置惩罚结构化数据,支持 SQL 查询和 DataFrame API。
- Spark Streaming:用于处置惩罚及时流数据。
- MLlib:用于呆板学习算法的库。
- GraphX:用于图计算。
Spark Core Java API
创建 SparkConf 和 SparkContext
- import org.apache.spark.SparkConf;
- public class SparkConfExample {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setAppName("SparkCoreExample")
- .setMaster("local[*]");
- }
- }
复制代码- import org.apache.spark.api.java.JavaSparkContext;
- public class SparkContextExample {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setAppName("SparkCoreExample")
- .setMaster("local[*]");
-
- JavaSparkContext sc = new JavaSparkContext(conf);
- }
- }
复制代码 使用 RDD(Resilient Distributed Datasets)
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- public class RDDCreationExample {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setAppName("RDDCreationExample")
- .setMaster("local[*]");
-
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- JavaRDD<String> lines = sc.textFile("data/input.txt");
- }
- }
复制代码- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- public class TransformationExample {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setAppName("TransformationExample")
- .setMaster("local[*]");
-
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- JavaRDD<String> lines = sc.textFile("data/input.txt");
- JavaRDD<String> words = lines.flatMap(line -> line.split("\\s+"));
- JavaRDD<Integer> wordLengths = words.map(word -> word.length());
- }
- }
复制代码- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- public class ActionExample {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setAppName("ActionExample")
- .setMaster("local[*]");
-
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- JavaRDD<String> lines = sc.textFile("data/input.txt");
- JavaRDD<String> words = lines.flatMap(line -> line.split("\\s+"));
- JavaRDD<Integer> wordLengths = words.map(word -> word.length());
-
- long totalWords = words.count();
- int maxLength = wordLengths.reduce((a, b) -> Math.max(a, b));
- System.out.println("Total words: " + totalWords);
- System.out.println("Max length: " + maxLength);
- }
- }
复制代码 Spark SQL Java API
创建 SparkSession
- import org.apache.spark.sql.SparkSession;
- public class SparkSessionExample {
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("SparkSQLExample")
- .master("local[*]")
- .getOrCreate();
- }
- }
复制代码 处置惩罚 DataFrame
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- public class DataFrameCreationExample {
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("DataFrameCreationExample")
- .master("local[*]")
- .getOrCreate();
-
- Dataset<Row> df = spark.read().format("csv")
- .option("header", "true")
- .load("data/input.csv");
- }
- }
复制代码- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- public class SQLQueryExample {
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("SQLQueryExample")
- .master("local[*]")
- .getOrCreate();
-
- Dataset<Row> df = spark.read().format("csv")
- .option("header", "true")
- .load("data/input.csv");
-
- df.createOrReplaceTempView("people");
-
- Dataset<Row> result = spark.sql("SELECT name FROM people WHERE age > 30");
- result.show();
- }
- }
复制代码 Spark Streaming Java API
创建 StreamingContext
- import org.apache.spark.streaming.Duration;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- public class StreamingContextExample {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setAppName("StreamingContextExample")
- .setMaster("local[*]");
-
- JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); // Batch interval of 1 second
- }
- }
复制代码 处置惩罚流数据
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.streaming.Duration;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaPairDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import scala.Tuple2;
- public class SocketStreamExample {
- public static void main(String[] args) throws InterruptedException {
- SparkConf conf = new SparkConf()
- .setAppName("SocketStreamExample")
- .setMaster("local[*]");
-
- JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); // Batch interval of 1 second
-
- JavaDStream<String> lines = ssc.socketTextStream("localhost", 9999);
- JavaDStream<String> words = lines.flatMap(line -> line.split("\\s+"));
-
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
- .reduceByKey((a, b) -> a + b);
-
- wordCounts.print();
-
- ssc.start();
- ssc.awaitTermination();
- }
- }
复制代码 Spark MLlib Java API
创建 SparkSession
- import org.apache.spark.sql.SparkSession;
- public class SparkSessionExample {
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("SparkMLlibExample")
- .master("local[*]")
- .getOrCreate();
- }
- }
复制代码 训练呆板学习模型
- import org.apache.spark.ml.regression.LinearRegression;
- import org.apache.spark.ml.regression.LinearRegressionModel;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- public class LinearRegressionExample {
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("LinearRegressionExample")
- .master("local[*]")
- .getOrCreate();
-
- Dataset<Row> data = spark.read().format("libsvm").load("data/sample_linear_regression_data.txt");
-
- LinearRegression lr = new LinearRegression()
- .setMaxIter(100)
- .setRegParam(0.3)
- .setElasticNetParam(0.8);
-
- LinearRegressionModel model = lr.fit(data);
-
- model.summary().r2();
- }
- }
复制代码 Spark GraphX Java API
创建 SparkSession
- import org.apache.spark.sql.SparkSession;
- public class SparkSessionExample {
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("SparkGraphXExample")
- .master("local[*]")
- .getOrCreate();
- }
- }
复制代码 创建图
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.graphx.Graph;
- import org.apache.spark.graphx.VertexRDD;
- import org.apache.spark.sql.SparkSession;
- import scala.Tuple2;
- public class GraphXExample {
- public static void main(String[] args) {
- SparkSession spark = SparkSession.builder()
- .appName("GraphXExample")
- .master("local[*]")
- .getOrCreate();
-
- JavaRDD<Tuple2<Long, String>> vertices = spark.sparkContext().parallelize(
- new Tuple2<>(1L, "Alice"),
- new Tuple2<>(2L, "Bob"),
- new Tuple2<>(3L, "Charlie")
- ).toJavaRDD();
-
- JavaRDD<Tuple2<Long, Long>> edges = spark.sparkContext().parallelize(
- new Tuple2<>(1L, 2L),
- new Tuple2<>(2L, 3L)
- ).toJavaRDD();
-
- VertexRDD<String> vertexRDD = JavaVertexRDD.fromJavaRDD(vertices);
- VertexRDD<Long> edgeRDD = JavaEdgeRDD.fromJavaRDD(edges);
-
- Graph<String, Long> graph = Graph.apply(vertexRDD, edgeRDD, null);
-
- System.out.println(graph.vertices.collect());
- System.out.println(graph.edges.collect());
- }
- }
复制代码 总结
Apache Spark 提供了丰富的 Java API,用于处置惩罚大规模数据集。以下是 Spark 的主要组件及其 Java API:
- Spark Core:提供了基础的分布式计算能力,包括使命调度、内存管理、容错恢复等。
- Spark SQL:用于处置惩罚结构化数据,支持 SQL 查询和 DataFrame API。
- Spark Streaming:用于处置惩罚及时流数据。
- MLlib:用于呆板学习算法的库。
- GraphX:用于图计算。
通过使用这些 Java API,可以有效地管理和处置惩罚大规模数据集。这些组件相互共同,可以实现复杂的大数据处置惩罚使命。掌握了这些组件的 Java API 后,可以更好地利用 Spark 来构建高性能、高可靠性的大数据处置惩罚系统。
这些示例涵盖了从创建 SparkContext 和 SparkSession 到处置惩罚 RDD、DataFrame、流数据、呆板学习模型和图数据的根本操纵。通过这些示例,你可以更好地理解和使用 Spark 的 Java API。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |