Spark等大数据处置惩罚框架的Java API

打印 上一主题 下一主题

主题 916|帖子 916|积分 2748

Apache Spark 是一个非常流行的大数据处置惩罚框架,以其高性能和机动性著称。Spark 支持多种编程语言,包括 Scala、Java 和 Python。本节将重点介绍 Spark 的 Java API,以及如何使用这些 API 举行大数据处置惩罚。
Spark 的主要组件


  • Spark Core:提供基础的分布式计算能力,包括使命调度、内存管理、容错恢复等。
  • Spark SQL:用于处置惩罚结构化数据,支持 SQL 查询和 DataFrame API。
  • Spark Streaming:用于处置惩罚及时流数据。
  • MLlib:用于呆板学习算法的库。
  • GraphX:用于图计算。
Spark Core Java API

创建 SparkConf 和 SparkContext


  • 创建 SparkConf
  1. import org.apache.spark.SparkConf;
  2. public class SparkConfExample {
  3.     public static void main(String[] args) {
  4.         SparkConf conf = new SparkConf()
  5.                 .setAppName("SparkCoreExample")
  6.                 .setMaster("local[*]");
  7.     }
  8. }
复制代码

  • 创建 SparkContext
  1. import org.apache.spark.api.java.JavaSparkContext;
  2. public class SparkContextExample {
  3.     public static void main(String[] args) {
  4.         SparkConf conf = new SparkConf()
  5.                 .setAppName("SparkCoreExample")
  6.                 .setMaster("local[*]");
  7.         
  8.         JavaSparkContext sc = new JavaSparkContext(conf);
  9.     }
  10. }
复制代码
使用 RDD(Resilient Distributed Datasets)


  • 创建 RDD
  1. import org.apache.spark.api.java.JavaRDD;
  2. import org.apache.spark.api.java.JavaSparkContext;
  3. public class RDDCreationExample {
  4.     public static void main(String[] args) {
  5.         SparkConf conf = new SparkConf()
  6.                 .setAppName("RDDCreationExample")
  7.                 .setMaster("local[*]");
  8.         
  9.         JavaSparkContext sc = new JavaSparkContext(conf);
  10.         
  11.         JavaRDD<String> lines = sc.textFile("data/input.txt");
  12.     }
  13. }
复制代码

  • 转换操纵
  1. import org.apache.spark.api.java.JavaRDD;
  2. import org.apache.spark.api.java.JavaSparkContext;
  3. public class TransformationExample {
  4.     public static void main(String[] args) {
  5.         SparkConf conf = new SparkConf()
  6.                 .setAppName("TransformationExample")
  7.                 .setMaster("local[*]");
  8.         
  9.         JavaSparkContext sc = new JavaSparkContext(conf);
  10.         
  11.         JavaRDD<String> lines = sc.textFile("data/input.txt");
  12.         JavaRDD<String> words = lines.flatMap(line -> line.split("\\s+"));
  13.         JavaRDD<Integer> wordLengths = words.map(word -> word.length());
  14.     }
  15. }
复制代码

  • 办法操纵
  1. import org.apache.spark.api.java.JavaRDD;
  2. import org.apache.spark.api.java.JavaSparkContext;
  3. public class ActionExample {
  4.     public static void main(String[] args) {
  5.         SparkConf conf = new SparkConf()
  6.                 .setAppName("ActionExample")
  7.                 .setMaster("local[*]");
  8.         
  9.         JavaSparkContext sc = new JavaSparkContext(conf);
  10.         
  11.         JavaRDD<String> lines = sc.textFile("data/input.txt");
  12.         JavaRDD<String> words = lines.flatMap(line -> line.split("\\s+"));
  13.         JavaRDD<Integer> wordLengths = words.map(word -> word.length());
  14.         
  15.         long totalWords = words.count();
  16.         int maxLength = wordLengths.reduce((a, b) -> Math.max(a, b));
  17.         System.out.println("Total words: " + totalWords);
  18.         System.out.println("Max length: " + maxLength);
  19.     }
  20. }
复制代码
Spark SQL Java API

创建 SparkSession


  • 创建 SparkSession
  1. import org.apache.spark.sql.SparkSession;
  2. public class SparkSessionExample {
  3.     public static void main(String[] args) {
  4.         SparkSession spark = SparkSession.builder()
  5.                 .appName("SparkSQLExample")
  6.                 .master("local[*]")
  7.                 .getOrCreate();
  8.     }
  9. }
复制代码
处置惩罚 DataFrame


  • 创建 DataFrame
  1. import org.apache.spark.sql.Dataset;
  2. import org.apache.spark.sql.Row;
  3. import org.apache.spark.sql.SparkSession;
  4. public class DataFrameCreationExample {
  5.     public static void main(String[] args) {
  6.         SparkSession spark = SparkSession.builder()
  7.                 .appName("DataFrameCreationExample")
  8.                 .master("local[*]")
  9.                 .getOrCreate();
  10.         
  11.         Dataset<Row> df = spark.read().format("csv")
  12.                 .option("header", "true")
  13.                 .load("data/input.csv");
  14.     }
  15. }
复制代码

  • 实行 SQL 查询
  1. import org.apache.spark.sql.Dataset;
  2. import org.apache.spark.sql.Row;
  3. import org.apache.spark.sql.SparkSession;
  4. public class SQLQueryExample {
  5.     public static void main(String[] args) {
  6.         SparkSession spark = SparkSession.builder()
  7.                 .appName("SQLQueryExample")
  8.                 .master("local[*]")
  9.                 .getOrCreate();
  10.         
  11.         Dataset<Row> df = spark.read().format("csv")
  12.                 .option("header", "true")
  13.                 .load("data/input.csv");
  14.         
  15.         df.createOrReplaceTempView("people");
  16.         
  17.         Dataset<Row> result = spark.sql("SELECT name FROM people WHERE age > 30");
  18.         result.show();
  19.     }
  20. }
复制代码
Spark Streaming Java API

创建 StreamingContext


  • 创建 StreamingContext
  1. import org.apache.spark.streaming.Duration;
  2. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  3. public class StreamingContextExample {
  4.     public static void main(String[] args) {
  5.         SparkConf conf = new SparkConf()
  6.                 .setAppName("StreamingContextExample")
  7.                 .setMaster("local[*]");
  8.         
  9.         JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); // Batch interval of 1 second
  10.     }
  11. }
复制代码
处置惩罚流数据


  • 从 Socket 吸收数据
  1. import org.apache.spark.api.java.JavaPairRDD;
  2. import org.apache.spark.api.java.JavaRDD;
  3. import org.apache.spark.api.java.function.Function;
  4. import org.apache.spark.streaming.Duration;
  5. import org.apache.spark.streaming.api.java.JavaDStream;
  6. import org.apache.spark.streaming.api.java.JavaPairDStream;
  7. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  8. import scala.Tuple2;
  9. public class SocketStreamExample {
  10.     public static void main(String[] args) throws InterruptedException {
  11.         SparkConf conf = new SparkConf()
  12.                 .setAppName("SocketStreamExample")
  13.                 .setMaster("local[*]");
  14.         
  15.         JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); // Batch interval of 1 second
  16.         
  17.         JavaDStream<String> lines = ssc.socketTextStream("localhost", 9999);
  18.         JavaDStream<String> words = lines.flatMap(line -> line.split("\\s+"));
  19.         
  20.         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
  21.                 .reduceByKey((a, b) -> a + b);
  22.         
  23.         wordCounts.print();
  24.         
  25.         ssc.start();
  26.         ssc.awaitTermination();
  27.     }
  28. }
复制代码
Spark MLlib Java API

创建 SparkSession


  • 创建 SparkSession
  1. import org.apache.spark.sql.SparkSession;
  2. public class SparkSessionExample {
  3.     public static void main(String[] args) {
  4.         SparkSession spark = SparkSession.builder()
  5.                 .appName("SparkMLlibExample")
  6.                 .master("local[*]")
  7.                 .getOrCreate();
  8.     }
  9. }
复制代码
训练呆板学习模型


  • 训练线性回归模型
  1. import org.apache.spark.ml.regression.LinearRegression;
  2. import org.apache.spark.ml.regression.LinearRegressionModel;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Row;
  5. import org.apache.spark.sql.SparkSession;
  6. public class LinearRegressionExample {
  7.     public static void main(String[] args) {
  8.         SparkSession spark = SparkSession.builder()
  9.                 .appName("LinearRegressionExample")
  10.                 .master("local[*]")
  11.                 .getOrCreate();
  12.         
  13.         Dataset<Row> data = spark.read().format("libsvm").load("data/sample_linear_regression_data.txt");
  14.         
  15.         LinearRegression lr = new LinearRegression()
  16.                 .setMaxIter(100)
  17.                 .setRegParam(0.3)
  18.                 .setElasticNetParam(0.8);
  19.         
  20.         LinearRegressionModel model = lr.fit(data);
  21.         
  22.         model.summary().r2();
  23.     }
  24. }
复制代码
Spark GraphX Java API

创建 SparkSession


  • 创建 SparkSession
  1. import org.apache.spark.sql.SparkSession;
  2. public class SparkSessionExample {
  3.     public static void main(String[] args) {
  4.         SparkSession spark = SparkSession.builder()
  5.                 .appName("SparkGraphXExample")
  6.                 .master("local[*]")
  7.                 .getOrCreate();
  8.     }
  9. }
复制代码
创建图


  • 创建 VertexRDD 和 EdgeRDD
  1. import org.apache.spark.api.java.JavaRDD;
  2. import org.apache.spark.graphx.Graph;
  3. import org.apache.spark.graphx.VertexRDD;
  4. import org.apache.spark.sql.SparkSession;
  5. import scala.Tuple2;
  6. public class GraphXExample {
  7.     public static void main(String[] args) {
  8.         SparkSession spark = SparkSession.builder()
  9.                 .appName("GraphXExample")
  10.                 .master("local[*]")
  11.                 .getOrCreate();
  12.         
  13.         JavaRDD<Tuple2<Long, String>> vertices = spark.sparkContext().parallelize(
  14.                 new Tuple2<>(1L, "Alice"),
  15.                 new Tuple2<>(2L, "Bob"),
  16.                 new Tuple2<>(3L, "Charlie")
  17.         ).toJavaRDD();
  18.         
  19.         JavaRDD<Tuple2<Long, Long>> edges = spark.sparkContext().parallelize(
  20.                 new Tuple2<>(1L, 2L),
  21.                 new Tuple2<>(2L, 3L)
  22.         ).toJavaRDD();
  23.         
  24.         VertexRDD<String> vertexRDD = JavaVertexRDD.fromJavaRDD(vertices);
  25.         VertexRDD<Long> edgeRDD = JavaEdgeRDD.fromJavaRDD(edges);
  26.         
  27.         Graph<String, Long> graph = Graph.apply(vertexRDD, edgeRDD, null);
  28.         
  29.         System.out.println(graph.vertices.collect());
  30.         System.out.println(graph.edges.collect());
  31.     }
  32. }
复制代码
总结

Apache Spark 提供了丰富的 Java API,用于处置惩罚大规模数据集。以下是 Spark 的主要组件及其 Java API:

  • Spark Core:提供了基础的分布式计算能力,包括使命调度、内存管理、容错恢复等。
  • Spark SQL:用于处置惩罚结构化数据,支持 SQL 查询和 DataFrame API。
  • Spark Streaming:用于处置惩罚及时流数据。
  • MLlib:用于呆板学习算法的库。
  • GraphX:用于图计算。
通过使用这些 Java API,可以有效地管理和处置惩罚大规模数据集。这些组件相互共同,可以实现复杂的大数据处置惩罚使命。掌握了这些组件的 Java API 后,可以更好地利用 Spark 来构建高性能、高可靠性的大数据处置惩罚系统。
这些示例涵盖了从创建 SparkContext 和 SparkSession 到处置惩罚 RDD、DataFrame、流数据、呆板学习模型和图数据的根本操纵。通过这些示例,你可以更好地理解和使用 Spark 的 Java API。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我可以不吃啊

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表