大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算效果 ...

打印 上一主题 下一主题

主题 539|帖子 539|积分 1617

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)
章节内容

上节完成的内容如下:


  • Spark案例编写 Scala
  • 计算圆周率
  • 找共同的好友

Super Word Count

需求背景



  • 给定一段文本
  • 将单词全部转换为小写
  • 去除标点符号
  • 去除停用词
  • count值降序生存
  • 效果生存到MySQL
  • 额外要求:标点符合和停用词可以自界说
编写代码

先实现到MySQL生存前的内容,我们须要先编写测试一下我们的代码是否正确
  1. package icu.wzk
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object SuperWordCount1 {
  5.   private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")
  6.   private val punctuation = "[\\)\\.,:;'!\\?]"
  7.   def main(args: Array[String]): Unit = {
  8.     val conf = new SparkConf()
  9.       .setAppName("ScalaSuperWordCount1")
  10.       .setMaster("local[*]")
  11.     val sc = new SparkContext(conf)
  12.     sc.setLogLevel("WARN")
  13.     val lines: RDD[String] = sc.textFile(args(0))
  14.     lines
  15.       .flatMap(_.split("\\s+"))
  16.       .map(_.toLowerCase)
  17.       .map(_.replaceAll(punctuation, ""))
  18.       .filter(word => !stopWords.contains(word) && word.trim.nonEmpty)
  19.       .map((_, 1))
  20.       .reduceByKey(_ + _)
  21.       .sortBy(_._2, false)
  22.       .collect()
  23.       .foreach(println)
  24.    
  25.     sc.stop()
  26.   }
  27. }
复制代码
详细表明

object SuperWordCount1 { … }



  • SuperWordCount1 是一个 Scala 对象,界说了一个单例对象用于运行单词计数步伐。
private val stopWords = “in on to from by a an the is are were was i we you your he his”.split(“\s+”)



  • 这里界说了一个 stopWords 列表,包罗了常见的停用词,这些词在统计单词频率时会被过滤掉。
  • split(“\s+”) 方法将这些停用词用空白字符分割成数组,便于后续的查找和过滤。
private val punctuation = “[\)\.,:;'!\?]”



  • 界说了一个正则表达式 punctuation,用于匹配常见的标点符号。这些标点符号在统计单词频率时会被去除。
def main(args: Array[String]): Unit = { … }



  • main 方法是步伐的入口点,args 是下令行参数,其中 args(0) 通常表示输入文件的路径。
val conf = new SparkConf().setAppName(“ScalaSuperWordCount1”).setMaster(“local
  • ”)



    • SparkConf() 用于配置 Spark 应用步伐。setAppName(“ScalaSuperWordCount1”) 设置应用步伐的名称。
    • setMaster(“local
    • ”) 指定应用步伐以本地模式运行,使用全部可用的 CPU 核心。
    val sc = new SparkContext(conf)



    • SparkContext 是 Spark 应用步伐的核心,用于与 Spark 集群进行交互。
    sc.setLogLevel(“WARN”)



    • 设置日记级别为 “WARN”,淘汰日记输出,方便检察紧张信息。
    val lines: RDD[String] = sc.textFile(args(0))



    • sc.textFile(args(0)) 从指定的文本文件路径加载数据,创建一个 RDD[String],其中每一行文本都作为一个字符串元素。
    • lines 是包罗输入文本数据的 RDD。
    flatMap(_.split(“\s+”))



    • flatMap 方法将每一行字符串按空白字符拆分成单词,并将其展开成单个单词的 RDD。
    map(_.toLowerCase)



    • 将每个单词转换为小写,以确保统计时不区分巨细写。
    map(_.replaceAll(punctuation, “”))



    • 使用正则表达式 punctuation 去除单词中的标点符号,使得统计效果更加正确。
    filter(word => !stopWords.contains(word) && word.trim.nonEmpty)



    • filter 方法过滤掉停用词和空白单词:
    • !stopWords.contains(word) 确保当前单词不在停用词列表中。
    • word.trim.nonEmpty 确保单词在去除前后空白字符后不是空字符串。
    map((_, 1))



    • 将每个单词映射为 (word, 1) 的键值对,表示每个单词出现一次。
    reduceByKey(_ + _)



    • reduceByKey 方法根据键(单词)对值(计数)进行累加,统计每个单词的总出现次数。
    sortBy(_._2, false)



    • 将统计效果按值(单词出现的次数)从大到小排序。
    collect().foreach(println)



    • collect() 方法将 RDD 中的数据网络到驱动步伐中(即本地),然后使用 foreach(println) 输出每个单词及其出现的次数。
    • 由于 collect 会将数据从分布式情况中拉到本地,须要留意数据量大的情况下大概导致内存不敷的题目。
    sc.stop()



    • 在计算完成后,调用 sc.stop() 方法停止 SparkContext,释放资源。
    添加依赖

    1. <dependency>
    2.     <groupId>mysql</groupId>
    3.     <artifactId>mysql-connector-java</artifactId>
    4.     <version>8.0.28</version>
    5. </dependency>
    复制代码
    同时我们须要在build的部门,也要加入对应的内容,让驱动可以加载进来:
    1. <build>
    2.     <plugins>
    3.         <plugin>
    4.             <groupId>net.alchim31.maven</groupId>
    5.             <artifactId>scala-maven-plugin</artifactId>
    6.             <version>4.4.0</version>
    7.             <executions>
    8.                 <execution>
    9.                     <goals>
    10.                         <goal>compile</goal>
    11.                         <goal>testCompile</goal>
    12.                     </goals>
    13.                 </execution>
    14.             </executions>
    15.         </plugin>
    16.         <plugin>
    17.             <groupId>org.apache.maven.plugins</groupId>
    18.             <artifactId>maven-assembly-plugin</artifactId>
    19.             <version>3.3.0</version>
    20.             <configuration>
    21.                 <archive>
    22.                     <manifest>
    23.                         <mainClass>cn.lagou.sparkcore.WordCount</mainClass>
    24.                     </manifest>
    25.                 </archive>
    26.                 <descriptorRefs>
    27.                     <descriptorRef>jar-with-dependencies</descriptorRef>
    28.                 </descriptorRefs>
    29.             </configuration>
    30.             <executions>
    31.                 <execution>
    32.                     <phase>package</phase>
    33.                     <goals>
    34.                         <goal>single</goal>
    35.                     </goals>
    36.                 </execution>
    37.             </executions>
    38.         </plugin>
    39.     </plugins>
    40. </build>
    复制代码
    创建库表

    我们新建一个数据库,也要新建一个数据表
    1. CREATE TABLE `wordcount` (
    2.   `word` varchar(255) DEFAULT NULL,
    3.   `count` int(11) DEFAULT NULL
    4. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
    复制代码
    写入SQL-未优化

    我们在 foreach 中生存了数据,此时须要创建大量的MySQL毗连,服从是比力低的。
    1. package icu.wzk
    2. import com.mysql.cj.xdevapi.PreparableStatement
    3. import org.apache.spark.rdd.RDD
    4. import org.apache.spark.{SparkConf, SparkContext}
    5. import java.sql.{Connection, DriverManager, PreparedStatement}
    6. object SuperWordCount2 {
    7.   private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")
    8.   private val punctuation = "[\\)\\.,:;'!\\?]"
    9.   def main(args: Array[String]): Unit = {
    10.     val conf = new SparkConf()
    11.       .setAppName("ScalaSuperWordCount2")
    12.       .setMaster("local[*]")
    13.     val sc = new SparkContext(conf)
    14.     sc.setLogLevel("WARN")
    15.     val lines: RDD[String] = sc.textFile(args(0))
    16.     val words: RDD[String] = lines
    17.       .flatMap(_.split("\\s+"))
    18.       .map(_.trim.toLowerCase())
    19.     val clearWords: RDD[String] = words
    20.       .filter(!stopWords.contains(_))
    21.       .map(_.replaceAll(punctuation, ""))
    22.     val result: RDD[(String, Int)] = clearWords
    23.       .map((_, 1))
    24.       .reduceByKey(_ + _)
    25.       .sortBy(_._2, false)
    26.     result.foreach(println)
    27.     // 输出到 MySQL
    28.     val username = "hive"
    29.     val password = "hive@wzk.icu"
    30.     val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    31.     var conn: Connection = null
    32.     var stmt: PreparedStatement = null
    33.     var sql = "insert into wordcount values(?, ?)"
    34.     result.foreach{
    35.       case (word, count) => try {
    36.         conn = DriverManager.getConnection(url, username, password)
    37.         stmt = conn.prepareStatement(sql)
    38.         stmt.setString(1, word)
    39.         stmt.setInt(2, count)
    40.       } catch {
    41.         case e: Exception => e.printStackTrace()
    42.       } finally {
    43.         if (stmt != null) {
    44.           stmt.close()
    45.         }
    46.         if (conn != null) {
    47.           conn.close()
    48.         }
    49.       }
    50.     }
    51.     sc.stop()
    52.   }
    53. }
    复制代码
    写入SQL-优化版

    优化后使用 foreachPartition 生存数据,一个分区创建一个链接:cache RDD
    留意:


    • SparkSQL 有方便的读写MySQL的方法,给参数直接调用即可
    • 但把握这个方法很紧张,因为SparkSQL不是支持全部类型的数据库
    1. package icu.wzk
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.{SparkConf, SparkContext}
    4. import java.sql.{Connection, DriverManager, PreparedStatement}
    5. object SuperWordCount3 {
    6.   private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")
    7.   private val punctuation = "[\\)\\.,:;'!\\?]"
    8.   private val username = "hive"
    9.   private val password = "hive@wzk.icu"
    10.   private val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    11.   def main(args: Array[String]): Unit = {
    12.     val conf = new SparkConf()
    13.       .setAppName("ScalaSuperWordCount2")
    14.       .setMaster("local[*]")
    15.     val sc = new SparkContext(conf)
    16.     sc.setLogLevel("WARN")
    17.     val lines: RDD[String] = sc.textFile(args(0))
    18.     val words: RDD[String] = lines
    19.       .flatMap(_.split("\\s+"))
    20.       .map(_.trim.toLowerCase())
    21.     val clearWords: RDD[String] = words
    22.       .filter(!stopWords.contains(_))
    23.       .map(_.replaceAll(punctuation, ""))
    24.     val result: RDD[(String, Int)] = clearWords
    25.       .map((_, 1))
    26.       .reduceByKey(_ + _)
    27.       .sortBy(_._2, false)
    28.     result.foreach(println)
    29.     result.foreachPartition(saveAsMySQL)
    30.     sc.stop()
    31.   }
    32.   def saveAsMySQL(iter: Iterator[(String, Int)]): Unit = {
    33.     var conn: Connection = null
    34.     var stmt: PreparedStatement = null
    35.     var sql = "insert into wordcount values(?, ?)"
    36.     try {
    37.       conn = DriverManager.getConnection(url, username, password)
    38.       stmt = conn.prepareStatement(sql)
    39.       iter.foreach{
    40.         case (word, count) =>
    41.           stmt.setString(1, word)
    42.           stmt.setInt(2, count)
    43.       }
    44.     } catch {
    45.       case e: Exception => e.printStackTrace()
    46.     } finally {
    47.       if (stmt != null) {
    48.         stmt.close()
    49.       }
    50.       if (conn != null) {
    51.         conn.close()
    52.       }
    53.     }
    54.   }
    55. }
    复制代码
    打包上传

    1. mvn clean package
    复制代码
    打包并上传到项目:

    运行项目

    不写入SQL版

    1. spark-submit --master local[*] --class icu.wzk.SuperWordCount1 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
    复制代码
    运行效果如下图:

    写入SQL-未优化版

    1. spark-submit --master local[*] --class icu.wzk.SuperWordCount2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
    复制代码
    写入SQL-优化版

    1. spark-submit --master local[*] --class icu.wzk.SuperWordCount3 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
    复制代码
    运行效果如下图:

    检察数据

    检察数据库,内容如下:


    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
  • 本帖子中包含更多资源

    您需要 登录 才可以下载或查看,没有账号?立即注册

    x
    回复

    使用道具 举报

    0 个回复

    倒序浏览

    快速回复

    您需要登录后才可以回帖 登录 or 立即注册

    本版积分规则

    石小疯

    金牌会员
    这个人很懒什么都没写!

    标签云

    快速回复 返回顶部 返回列表