ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【大数据】Spark使用大全:下载安装、RDD利用、JAVA编程、SQL [打印本页]

作者: 瑞星    时间: 2024-6-19 07:01
标题: 【大数据】Spark使用大全:下载安装、RDD利用、JAVA编程、SQL

目录
前言
1.下载安装
2.RDD利用
3.JAVA编程示例
4.Spark SQL


前言

本文是作者大数据系列中的一文,专栏所在:
https://blog.csdn.net/joker_zjn/category_12631789.html?spm=1001.2014.3001.5482
该系列会成体系的聊一聊整个大数据的技术栈,绝对干货,接待订阅。
1.下载安装

前置环境:

下载所在:
Downloads | Apache Spark
往下拉找到Spark release archives.
由于前面我们已经搭建好了hadoop环境,以是这里选择with out hadoop的版本。

配置config目录下有一个配置模板spark-env.sh.template:
将这个模板修改大概复制为spark-env.sh然后在里面:
   export SPARK_DIST_CLASSPATH=${Hadoop的安装路径/bin classpath}
  由于Spark只是个计算引擎,具体要去操尴尬刁难应的分部署文件系统的,以是将Spark的类路径指向了hadoop。也就是通过这个配置将Spark要利用的数据源设置为了HDFS。
启动:
bin目录下:
./run-exmaple SparkPi
这是一个Spark自带的demo,如果跑起来不报错,说明就没什么问题了。
2.RDD利用

可以用Spark自带的Spark  shell来进行RDD利用:
./bin/spark-shell
RDD利用分为两类:

map - 应用于RDD的每个元素,产生一个新的RDD。
   val numbersRdd = spark.sparkContext.parallelize(Array(1, 2, 3, 4))
val squaredRdd = numbersRdd.map(x => x * x) 
  filter - 根据函数条件过滤RDD中的元素。
   val evenNumbersRdd = numbersRdd.filter(_ % 2 == 0)
  flatMap - 对RDD中的每个元素应用函数并展平结果。
   val wordsRdd = spark.sparkContext.textFile("hdfs://path/to/textfile.txt")
val wordsFlatMapped = wordsRdd.flatMap(line => line.split(" "))
  mapPartitions - 对每个分区应用一个函数。
   val incrementedRdd = numbersRdd.mapPartitions(iter => iter.map(x => x + 1))
  union - 合并两个RDD。
   val rdd1 = spark.sparkContext.parallelize(Array(1, 2))
val rdd2 = spark.sparkContext.parallelize(Array(3, 4))
val combinedRdd = rdd1.union(rdd2)
  distinct - 返回RDD中不重复的元素。
   val uniqueNumbers = numbersRdd.distinct()
  join - 对两个键值对RDD进行内毗连。
   val rddA = spark.sparkContext.parallelize(Array((1, "a"), (2, "b")))
val rddB = spark.sparkContext.parallelize(Array((1, "x"), (3, "y")))
val joinedRdd = rddA.join(rddB)
  reduce - 通过函数聚合RDD中的所有元素。
   val sum = numbersRdd.reduce(_ + _)
  collect - 返回RDD的所有元素到Driver作为数组。
   val allElements = numbersRdd.collect()
  count - 返回RDD中元素的数量。
   val count = numbersRdd.count()
  first - 返回RDD的第一个元素。
   val firstElement = numbersRdd.first()
  take(n) - 返回RDD的前n个元素。
   val topThree = numbersRdd.take(3)
  saveAsTextFile - 将RDD的内容保存为文本文件。
   wordsRdd.saveAsTextFile("hdfs://path/to/output")
  foreach - 对RDD的每个元素应用函数,常用于副作用利用。
   numbersRdd.foreach(println)
  3.JAVA编程示例

依靠:
  1. <dependencies>
  2.         <dependency> <!-- Spark dependency -->
  3.             <groupId>org.apache.spark</groupId>
  4.             <artifactId>spark-core_2.11</artifactId>
  5.             <version>2.4.0</version>
  6.         </dependency>
  7.     </dependencies>
复制代码
 编码:
  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaRDD;
  3. import org.apache.spark.api.java.JavaSparkContext;
  4. import scala.Tuple2;
  5. import java.util.Arrays;
  6. import java.util.List;
  7. import java.util.stream.Collectors;
  8. public class WordCountFromHDFS {
  9.     public static void main(String[] args) {
  10.         if (args.length != 1) {
  11.             System.err.println("Usage: WordCountFromHDFS <input path>");
  12.             System.exit(1);
  13.         }
  14.         // 初始化Spark配置
  15.         SparkConf conf = new SparkConf().setAppName("WordCountFromHDFS").setMaster("local"); // 本地模式运行,根据实际情况可改为yarn等
  16.         // 创建SparkContext实例
  17.         JavaSparkContext sc = new JavaSparkContext(conf);
  18.         // HDFS文件路径,这里直接从命令行参数获取
  19.         String inputPath = args[0];
  20.         // 从HDFS读取文件内容
  21.         JavaRDD<String> lines = sc.textFile(inputPath);
  22.         // 每行分割成单词,然后扁平化,最后统计每个单词出现的次数
  23.         JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator());
  24.         JavaPairRDD<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
  25.                                                      .reduceByKey((a, b) -> a + b);
  26.         // 收集结果并打印
  27.         List<Tuple2<String, Integer>> results = wordCounts.collect();
  28.         for (Tuple2<String, Integer> result : results) {
  29.             System.out.println(result._1() + ": " + result._2());
  30.         }
  31.         // 停止SparkContext
  32.         sc.stop();
  33.     }
  34. }
复制代码
4.Spark SQL

park SQL是Spark的一个组件,它从Spark 1.3.0版本开始被引入,并在后续版本中不绝得到加强和发展。Spark SQL答应用户使用SQL大概DataFrame API来处理结构化和半结构化的数据。下面做个小小的演示。
假设我们有一个CSV文件位于HDFS上,我们可以用以下下令加载它:
      val df = spark.read
     .option("header", "true")
     .csv("hdfs://localhost:9000/path/to/yourfile.csv")
  创建临时视图:
      df.createOrReplaceTempView("my_table")
  执行sql:
   val result = spark.sql("SELECT column_name FROM my_table WHERE condition")
  joinResult.show()
  连表查询:
   // 假设dfOrders和dfCustomers分别是orders和customers的DataFrame
dfOrders.createOrReplaceTempView("orders")
dfCustomers.createOrReplaceTempView("customers")
  val joinResult = spark.sql(
  """
    SELECT orders.order_id, customers.customer_name
    FROM orders
    INNER JOIN customers
    ON orders.customer_id = customers.customer_id
  """
)
  joinResult.show()
  固然Spark SQL也有对应的JAVA API,支持编程的方式来利用,用到的时间查一下就是,此处就不展开了。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4