石小疯 发表于 2024-8-22 09:16:27

大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算效果

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:



[*]Hadoop(已更完)
[*]HDFS(已更完)
[*]MapReduce(已更完)
[*]Hive(已更完)
[*]Flume(已更完)
[*]Sqoop(已更完)
[*]Zookeeper(已更完)
[*]HBase(已更完)
[*]Redis (已更完)
[*]Kafka(已更完)
[*]Spark(正在更新!)
章节内容

上节完成的内容如下:


[*]Spark案例编写 Scala
[*]计算圆周率
[*]找共同的好友
https://i-blog.csdnimg.cn/direct/97665c615c844ffaa3116af429e9ee90.png
Super Word Count

需求背景



[*]给定一段文本
[*]将单词全部转换为小写
[*]去除标点符号
[*]去除停用词
[*]count值降序生存
[*]效果生存到MySQL
[*]额外要求:标点符合和停用词可以自界说
编写代码

先实现到MySQL生存前的内容,我们须要先编写测试一下我们的代码是否正确
package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount1 {

private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")

private val punctuation = "[\\)\\.,:;'!\\?]"

def main(args: Array): Unit = {
    val conf = new SparkConf()
      .setAppName("ScalaSuperWordCount1")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val lines: RDD = sc.textFile(args(0))
    lines
      .flatMap(_.split("\\s+"))
      .map(_.toLowerCase)
      .map(_.replaceAll(punctuation, ""))
      .filter(word => !stopWords.contains(word) && word.trim.nonEmpty)
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
      .collect()
      .foreach(println)
   
    sc.stop()
}

}

详细表明

object SuperWordCount1 { … }



[*]SuperWordCount1 是一个 Scala 对象,界说了一个单例对象用于运行单词计数步伐。
private val stopWords = “in on to from by a an the is are were was i we you your he his”.split(“\s+”)



[*]这里界说了一个 stopWords 列表,包罗了常见的停用词,这些词在统计单词频率时会被过滤掉。
[*]split(“\s+”) 方法将这些停用词用空白字符分割成数组,便于后续的查找和过滤。
private val punctuation = “[\)\.,:;'!\?]”



[*]界说了一个正则表达式 punctuation,用于匹配常见的标点符号。这些标点符号在统计单词频率时会被去除。
def main(args: Array): Unit = { … }



[*]main 方法是步伐的入口点,args 是下令行参数,其中 args(0) 通常表示输入文件的路径。
val conf = new SparkConf().setAppName(“ScalaSuperWordCount1”).setMaster(“local
[*]”)



[*]SparkConf() 用于配置 Spark 应用步伐。setAppName(“ScalaSuperWordCount1”) 设置应用步伐的名称。
[*]setMaster(“local
[*]”) 指定应用步伐以本地模式运行,使用全部可用的 CPU 核心。
val sc = new SparkContext(conf)



[*]SparkContext 是 Spark 应用步伐的核心,用于与 Spark 集群进行交互。
sc.setLogLevel(“WARN”)



[*]设置日记级别为 “WARN”,淘汰日记输出,方便检察紧张信息。
val lines: RDD = sc.textFile(args(0))



[*]sc.textFile(args(0)) 从指定的文本文件路径加载数据,创建一个 RDD,其中每一行文本都作为一个字符串元素。
[*]lines 是包罗输入文本数据的 RDD。
flatMap(_.split(“\s+”))



[*]flatMap 方法将每一行字符串按空白字符拆分成单词,并将其展开成单个单词的 RDD。
map(_.toLowerCase)



[*]将每个单词转换为小写,以确保统计时不区分巨细写。
map(_.replaceAll(punctuation, “”))



[*]使用正则表达式 punctuation 去除单词中的标点符号,使得统计效果更加正确。
filter(word => !stopWords.contains(word) && word.trim.nonEmpty)



[*]filter 方法过滤掉停用词和空白单词:
[*]!stopWords.contains(word) 确保当前单词不在停用词列表中。
[*]word.trim.nonEmpty 确保单词在去除前后空白字符后不是空字符串。
map((_, 1))



[*]将每个单词映射为 (word, 1) 的键值对,表示每个单词出现一次。
reduceByKey(_ + _)



[*]reduceByKey 方法根据键(单词)对值(计数)进行累加,统计每个单词的总出现次数。
sortBy(_._2, false)



[*]将统计效果按值(单词出现的次数)从大到小排序。
collect().foreach(println)



[*]collect() 方法将 RDD 中的数据网络到驱动步伐中(即本地),然后使用 foreach(println) 输出每个单词及其出现的次数。
[*]由于 collect 会将数据从分布式情况中拉到本地,须要留意数据量大的情况下大概导致内存不敷的题目。
sc.stop()



[*]在计算完成后,调用 sc.stop() 方法停止 SparkContext,释放资源。
添加依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>
同时我们须要在build的部门,也要加入对应的内容,让驱动可以加载进来:
<build>
    <plugins>
      <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>4.4.0</version>
            <executions>
                <execution>
                  <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                  </goals>
                </execution>
            </executions>
      </plugin>
      <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
                <archive>
                  <manifest>
                        <mainClass>cn.lagou.sparkcore.WordCount</mainClass>
                  </manifest>
                </archive>
                <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                  <phase>package</phase>
                  <goals>
                        <goal>single</goal>
                  </goals>
                </execution>
            </executions>
      </plugin>
    </plugins>
</build>
创建库表

我们新建一个数据库,也要新建一个数据表
CREATE TABLE `wordcount` (
`word` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
写入SQL-未优化

我们在 foreach 中生存了数据,此时须要创建大量的MySQL毗连,服从是比力低的。
package icu.wzk

import com.mysql.cj.xdevapi.PreparableStatement
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{Connection, DriverManager, PreparedStatement}

object SuperWordCount2 {

private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")

private val punctuation = "[\\)\\.,:;'!\\?]"

def main(args: Array): Unit = {
    val conf = new SparkConf()
      .setAppName("ScalaSuperWordCount2")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val lines: RDD = sc.textFile(args(0))

    val words: RDD = lines
      .flatMap(_.split("\\s+"))
      .map(_.trim.toLowerCase())

    val clearWords: RDD = words
      .filter(!stopWords.contains(_))
      .map(_.replaceAll(punctuation, ""))

    val result: RDD[(String, Int)] = clearWords
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
    result.foreach(println)

    // 输出到 MySQL
    val username = "hive"
    val password = "hive@wzk.icu"
    val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false"

    var conn: Connection = null
    var stmt: PreparedStatement = null
    var sql = "insert into wordcount values(?, ?)"

    result.foreach{
      case (word, count) => try {
      conn = DriverManager.getConnection(url, username, password)
      stmt = conn.prepareStatement(sql)
      stmt.setString(1, word)
      stmt.setInt(2, count)
      } catch {
      case e: Exception => e.printStackTrace()
      } finally {
      if (stmt != null) {
          stmt.close()
      }
      if (conn != null) {
          conn.close()
      }
      }
    }

    sc.stop()
}

}

写入SQL-优化版

优化后使用 foreachPartition 生存数据,一个分区创建一个链接:cache RDD
留意:


[*]SparkSQL 有方便的读写MySQL的方法,给参数直接调用即可
[*]但把握这个方法很紧张,因为SparkSQL不是支持全部类型的数据库
package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{Connection, DriverManager, PreparedStatement}

object SuperWordCount3 {

private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")
private val punctuation = "[\\)\\.,:;'!\\?]"
private val username = "hive"
private val password = "hive@wzk.icu"
private val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false"

def main(args: Array): Unit = {
    val conf = new SparkConf()
      .setAppName("ScalaSuperWordCount2")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val lines: RDD = sc.textFile(args(0))

    val words: RDD = lines
      .flatMap(_.split("\\s+"))
      .map(_.trim.toLowerCase())

    val clearWords: RDD = words
      .filter(!stopWords.contains(_))
      .map(_.replaceAll(punctuation, ""))

    val result: RDD[(String, Int)] = clearWords
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
    result.foreach(println)

    result.foreachPartition(saveAsMySQL)

    sc.stop()
}

def saveAsMySQL(iter: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null
    var stmt: PreparedStatement = null
    var sql = "insert into wordcount values(?, ?)"

    try {
      conn = DriverManager.getConnection(url, username, password)
      stmt = conn.prepareStatement(sql)
      iter.foreach{
      case (word, count) =>
          stmt.setString(1, word)
          stmt.setInt(2, count)
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (stmt != null) {
      stmt.close()
      }
      if (conn != null) {
      conn.close()
      }
    }
}

}

打包上传

mvn clean package
打包并上传到项目:
https://i-blog.csdnimg.cn/direct/48709bb394ce4ba3b8b8a29e68ee925b.png
运行项目

不写入SQL版

spark-submit --master local[*] --class icu.wzk.SuperWordCount1 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
运行效果如下图:
https://i-blog.csdnimg.cn/direct/3a9edf46b466417b92d8924de7e228bc.png
写入SQL-未优化版

spark-submit --master local[*] --class icu.wzk.SuperWordCount2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
写入SQL-优化版

spark-submit --master local[*] --class icu.wzk.SuperWordCount3 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
运行效果如下图:
https://i-blog.csdnimg.cn/direct/74a190033ff24931a1668ad115c7e47c.png
检察数据

检察数据库,内容如下:
https://i-blog.csdnimg.cn/direct/63635361e3ea454f94c262086eb01967.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算效果