ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark编程基础 [打印本页]

作者: 十念    时间: 2023-6-28 21:03
标题: Spark编程基础
Scala编写Spark的WorkCount

创建一个Maven项目

在pom.xml中添加依赖和插件
  1. <properties>
  2.     <maven.compiler.source>8</maven.compiler.source>
  3.     <maven.compiler.target>8</maven.compiler.target>
  4.     <encoding>UTF-8</encoding>
  5.     <spark.version>3.2.3</spark.version>
  6.     <scala.version>2.12.15</scala.version>
  7. </properties>
  8. <dependencies>
  9.    
  10.     <dependency>
  11.         <groupId>org.scala-lang</groupId>
  12.         <artifactId>scala-library</artifactId>
  13.         <version>${scala.version}</version>
  14.     </dependency>
  15.    
  16.     <dependency>
  17.         <groupId>org.apache.spark</groupId>
  18.         <artifactId>spark-core_2.12</artifactId>
  19.         <version>${spark.version}</version>
  20.     </dependency>
  21. </dependencies>
  22. <repositories>
  23.     <repository>
  24.         <id>nexus-aliyun</id>
  25.         <name>Nexus aliyun</name>
  26.         <layout>default</layout>
  27.         <url>http://maven.aliyun.com/nexus/content/groups/public</url>
  28.         <snapshots>
  29.             <enabled>false</enabled>
  30.             <updatePolicy>never</updatePolicy>
  31.         </snapshots>
  32.         <releases>
  33.             <enabled>true</enabled>
  34.             <updatePolicy>never</updatePolicy>
  35.         </releases>
  36.     </repository>
  37. </repositories>
  38. <pluginRepositories>
  39.     <pluginRepository>
  40.         <id>ali-plugin</id>
  41.         <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  42.         <snapshots>
  43.             <enabled>false</enabled>
  44.             <updatePolicy>never</updatePolicy>
  45.         </snapshots>
  46.         <releases>
  47.             <enabled>true</enabled>
  48.             <updatePolicy>never</updatePolicy>
  49.         </releases>
  50.     </pluginRepository>
  51. </pluginRepositories>
  52. <build>
  53.     <pluginManagement>
  54.         <plugins>
  55.             
  56.             <plugin>
  57.                 <groupId>net.alchim31.maven</groupId>
  58.                 <artifactId>scala-maven-plugin</artifactId>
  59.                 <version>3.2.2</version>
  60.             </plugin>
  61.             
  62.             <plugin>
  63.                 <groupId>org.apache.maven.plugins</groupId>
  64.                 <artifactId>maven-compiler-plugin</artifactId>
  65.                 <version>3.5.1</version>
  66.             </plugin>
  67.         </plugins>
  68.     </pluginManagement>
  69.     <plugins>
  70.         <plugin>
  71.             <groupId>net.alchim31.maven</groupId>
  72.             <artifactId>scala-maven-plugin</artifactId>
  73.             <executions>
  74.                 <execution>
  75.                     <id>scala-compile-first</id>
  76.                     <phase>process-resources</phase>
  77.                     <goals>
  78.                         <goal>add-source</goal>
  79.                         <goal>compile</goal>
  80.                     </goals>
  81.                 </execution>
  82.                 <execution>
  83.                     <id>scala-test-compile</id>
  84.                     <phase>process-test-resources</phase>
  85.                     <goals>
  86.                         <goal>testCompile</goal>
  87.                     </goals>
  88.                 </execution>
  89.             </executions>
  90.         </plugin>
  91.         <plugin>
  92.             <groupId>org.apache.maven.plugins</groupId>
  93.             <artifactId>maven-compiler-plugin</artifactId>
  94.             <executions>
  95.                 <execution>
  96.                     <phase>compile</phase>
  97.                     <goals>
  98.                         <goal>compile</goal>
  99.                     </goals>
  100.                 </execution>
  101.             </executions>
  102.         </plugin>
  103.         
  104.         <plugin>
  105.             <groupId>org.apache.maven.plugins</groupId>
  106.             <artifactId>maven-shade-plugin</artifactId>
  107.             <version>2.4.3</version>
  108.             <executions>
  109.                 <execution>
  110.                     <phase>package</phase>
  111.                     <goals>
  112.                         <goal>shade</goal>
  113.                     </goals>
  114.                     <configuration>
  115.                         <filters>
  116.                             <filter>
  117.                                 <artifact>*:*</artifact>
  118.                                 <excludes>
  119.                                     <exclude>META-INF/*.SF</exclude>
  120.                                     <exclude>META-INF/*.DSA</exclude>
  121.                                     <exclude>META-INF/*.RSA</exclude>
  122.                                 </excludes>
  123.                             </filter>
  124.                         </filters>
  125.                     </configuration>
  126.                 </execution>
  127.             </executions>
  128.         </plugin>
  129.     </plugins>
  130. </build>
复制代码
创建一个scala目录

选择scala目录,右键,将目录转成源码包,或者点击maven的刷新按钮

编写Spark程序
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4.   * 1.创建SparkContext
  5.   * 2.创建RDD
  6.   * 3.调用RDD的Transformation(s)方法
  7.   * 4.调用Action
  8.   * 5.释放资源
  9.   */
  10. object WordCount {
  11.   def main(args: Array[String]): Unit = {
  12.     val conf: SparkConf = new SparkConf().setAppName("WordCount")
  13.     //创建SparkContext,使用SparkContext来创建RDD
  14.     val sc: SparkContext = new SparkContext(conf)
  15.     //spark写Spark程序,就是对抽象的神奇的大集合【RDD】编程,调用它高度封装的API
  16.     //使用SparkContext创建RDD
  17.     val lines: RDD[String] = sc.textFile(args(0))
  18.     //Transformation 开始 //
  19.     //切分压平
  20.     val words: RDD[String] = lines.flatMap(_.split(" "))
  21.     //将单词和一组合放在元组中
  22.     val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
  23.     //分组聚合,reduceByKey可以先局部聚合再全局聚合
  24.     val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
  25.     //排序
  26.     val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
  27.     //Transformation 结束 //
  28.     //调用Action将计算结果保存到HDFS中
  29.     sorted.saveAsTextFile(args(1))
  30.     //释放资源
  31.     sc.stop()
  32.   }
  33. }
复制代码
使用maven打包


提交任务

•        上传jar包到服务器,然后使用sparksubmit命令提交任务
  1. /bigdata/spark-3.2.3-bin-hadoop3.2/bin/spark-submit \
  2. --master spark://node-1.51doit.cn:7077 \
  3. --executor-memory 1g --total-executor-cores 4 \
  4. --class cn._51doit.spark.day01.WordCount \
  5. /root/spark-in-action-1.0.jar hdfs://node-1.51doit.cn:9000/words.txt hdfs://node-1.51doit.cn:9000/out
  6. 参数说明:
  7. --master 指定masterd地址和端口,协议为spark://,端口是RPC的通信端口
  8. --executor-memory 指定每一个executor的使用的内存大小
  9. --total-executor-cores指定整个application总共使用了cores
  10. --class 指定程序的main方法全类名
  11. jar包路径 args0 args1
复制代码
Java编写Spark的WordCount

使用匿名实现类方式
  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaPairRDD;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.FlatMapFunction;
  6. import org.apache.spark.api.java.function.Function2;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import scala.Tuple2;
  9. import java.util.Arrays;
  10. import java.util.Iterator;
  11. public class JavaWordCount {
  12.     public static void main(String[] args) {
  13.         SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
  14.         //创建JavaSparkContext
  15.         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  16.         //使用JavaSparkContext创建RDD
  17.         JavaRDD<String> lines = jsc.textFile(args[0]);
  18.         //调用Transformation(s)
  19.         //切分压平
  20.         JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  21.             @Override
  22.             public Iterator<String> call(String line) throws Exception {
  23.                 return Arrays.asList(line.split(" ")).iterator();
  24.             }
  25.         });
  26.         //将单词和一组合在一起
  27.         JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(
  28.                 new PairFunction<String, String, Integer>() {
  29.                     @Override
  30.                     public Tuple2<String, Integer> call(String word) throws Exception {
  31.                         return Tuple2.apply(word, 1);
  32.                     }
  33.         });
  34.         //分组聚合
  35.         JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(
  36.                 new Function2<Integer, Integer, Integer>() {
  37.             @Override
  38.             public Integer call(Integer v1, Integer v2) throws Exception {
  39.                 return v1 + v2;
  40.             }
  41.         });
  42.         //排序,先调换KV的顺序VK
  43.         JavaPairRDD<Integer, String> swapped = reduced.mapToPair(
  44.                 new PairFunction<Tuple2<String, Integer>, Integer, String>() {
  45.             @Override
  46.             public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
  47.                 return tp.swap();
  48.             }
  49.         });
  50.         //再排序
  51.         JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
  52.         //再调换顺序
  53.         JavaPairRDD<String, Integer> result = sorted.mapToPair(
  54.                 new PairFunction<Tuple2<Integer, String>, String, Integer>() {
  55.             @Override
  56.             public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
  57.                 return tp.swap();
  58.             }
  59.         });
  60.         //触发Action,将数据保存到HDFS
  61.         result.saveAsTextFile(args[1]);
  62.         //释放资源
  63.         jsc.stop();
  64.     }
  65. }
复制代码
使用Lambda表达式方式
  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaPairRDD;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import scala.Tuple2;
  6. import java.util.Arrays;
  7. public class JavaLambdaWordCount {
  8.     public static void main(String[] args) {
  9.         SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount");
  10.         //创建SparkContext
  11.         JavaSparkContext jsc = new JavaSparkContext(conf);
  12.         //创建RDD
  13.         JavaRDD<String> lines = jsc.textFile(args[0]);
  14.         //切分压平
  15.         JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
  16.         //将单词和一组合
  17.         JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> Tuple2.apply(word, 1));
  18.         //分组聚合
  19.         JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);
  20.         //调换顺序
  21.         JavaPairRDD<Integer, String> swapped = reduced.mapToPair(tp -> tp.swap());
  22.         //排序
  23.         JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
  24.         //调换顺序
  25.         JavaPairRDD<String, Integer> result = sorted.mapToPair(tp -> tp.swap());
  26.         //将数据保存到HDFS
  27.         result.saveAsTextFile(args[1]);
  28.         //释放资源
  29.         jsc.stop();
  30.     }
  31. }
复制代码
本地运行Spark和Debug

spark程序每次都打包上在提交到集群上比较麻烦且不方便调试,Spark还可以进行Local模式运行,方便测试和调试
在本地运行
  1. //Spark程序local模型运行,local[*]是本地运行,并开启多个线程
  2. val conf: SparkConf = new SparkConf()
  3.   .setAppName("WordCount")
  4.   .setMaster("local[*]") //设置为local模式执行
复制代码
并输入运行参数
hdfs://linux01:9000/words.txt hdfs://linux01:9000/out/out01
读取HDFS中的数据

由于往HDFS中的写入数据存在权限问题,所以在代码中设置用户为HDFS目录的所属用户
  1. //往HDFS中写入数据,将程序的所属用户设置成更HDFS一样的用户
  2. System.setProperty("HADOOP_USER_NAME", "root")
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4