大连全瓷种植牙齿制作中心 发表于 2025-1-16 12:17:44

Java爬虫——使用Spark举行数据清楚

1. 依靠引入

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.13</artifactId>
   <version>3.5.3</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.13</artifactId>
  <version>3.5.3</version>
</dependency> 2. 数据加载

从 MySQL 数据库中加载 jobTest 表中的数据,使用 Spark 的 JDBC 功能连接到数据库。
代码片段:
// 数据库连接信息
String jdbcUrl = "jdbc:mysql://82.157.185.251:3306/job_demo?useUnicode=true";
String user = "root";
String password = "138748";
String tableName = "jobTest";

// 创建 SparkSession
SparkSession spark = SparkSession.builder()
 .appName("Job Data Cleaning")
 .config("spark.master", "local")
 .getOrCreate();

Dataset<Row> df = spark.read()
     .format("jdbc")
     .option("url", jdbcUrl)
     .option("dbtable", tableName)
     .option("user", user)
     .option("password", password)
     .load(); 3. 数据处置惩罚

3.1 数据去重

删除数据表中完全重复的记载,确保数据唯一性。
代码片段:
Dataset<Row> cleanedDf = df.dropDuplicates(); 作用:


[*] 删除全部字段值完全雷同的记载,去除重复数据。
[*] 包管数据唯一性,避免重复记载对分析效果的干扰。
3.2 填充缺失值

针对特定字段(如 jobName、company 等),将空值或空字符串更换为默认值 "未知"。
代码片段:
cleanedDf = cleanedDf.na()
            .fill("未知", new String[]
            {"jobName", "company", "salary", "experience", "educational",
            "city", "majorName", "companyScale", "companyStatus"}); 作用:


[*] 避免空值或空字符串对数据分析造成影响。
[*] 填充后,字段中无空值,便于后续数据处置惩罚。
3.3 薪资数据清洗



[*] 删除无效记载:

[*] 删除薪资字段中值为 0 或 -1 的记载,这些值被认为是无效的薪资信息。

[*] 薪资格式标准化:

[*] 假如薪资字段包罗 k(如 5k-10k),保持原样。
[*] 假如薪资字段是数字类型(如 5000),将其转换为千为单元,并添加 k 后缀(如 5000 转换为 5k)。
[*] 假如薪资字段格式不精确,则填充为 "未知"。

代码片段:
cleanedDf = cleanedDf.filter(cleanedDf.col("salary").notEqual("0")
     .and(cleanedDf.col("salary").notEqual("-1")));
cleanedDf = cleanedDf.withColumn("salary",
       functions.when(
               functions.col("salary").contains("k"),
               functions.col("salary")
     ).otherwise(
               functions.when(
                       functions.col("salary").rlike("^+(\\.+)?$"),
                       functions.concat(                            functions.format_number(functions.col("salary").cast("double").divide(1000), 1),
                               functions.lit("k")
                     )
             ).otherwise("未知")
     )
); 作用:


[*] 删除无效薪资记载,包管数据有效性。
[*] 将薪资标准化为统一的格式(包罗 k),便于后续分析。
3.4 都会字段清洗

去掉 city 字段中多余的“市”字,保留都会名称。
代码片段:
cleanedDf = cleanedDf.withColumn("city", functions.regexp_replace(cleanedDf.col("city"), "市", "")); 作用:


[*] 清洗都会字段中的冗余信息,保留核心内容(都会名称)。
[*] 进步字段的规范性,便于数据展示和分析。
3.5 数据生存

将清洗后的数据生存至 MySQL 数据库的新表 cleaned_jobTEST 中。
代码片段:
cleanedDf.write()
     .format("jdbc")
     .option("url", jdbcUrl)
     .option("dbtable", "cleaned_jobTEST")
     .option("user", user)
     .option("password", password)
     .mode("overwrite")
     .save(); 作用:


[*] 生存清洗后的数据,供后续分析或其他系统使用。
[*] 新表 cleaned_jobTEST 覆盖写入,确保每次运行清洗后的数据是一致的
4. 常用方法:
在每一步数据处置惩罚的时候,都会用到一个方法
cleanedDf.withColumn(...,...)
当我们想要处置惩罚某一个字段的信息时会用到这个函数,第一个参数为想要更新或者添加的字段。第二个参数是我们想要更新的条件。
functions.when(condition, value).otherwise(...) when是基于条件选择字段值,condition是条件,value为假如满意条件举行的操作.otherwise为不满意条件要举行的操作。
functions.col("salary") 指定字段列,这里是选择salary这个字段
functions.contains("k") 检查字段值中是否包罗某个条件,例如检查字段值是否包罗指定字符串(如 "k")。
functions.rlike(regex) 检查字段值是否包罗符合条件,这里我使用了正则表达式,检查字段中是否包罗数字。
functions.cast("type") 将字段值转换为指定数据类型,type为字段类型
functions.format_number(value, decimalPlaces) 格式化数字值并保留指定小数
functions.divide 转换为千分单元
functions.concat() 拼接字段值
functions.lit("value") 创建字段值
cleanedDf = cleanedDf.withColumn("salary",
       functions.when(
               functions.col("salary").contains("k"),
               functions.col("salary") // 如果包含 'k',保持原样
     ).otherwise(
               functions.when(
                       functions.col("salary").rlike("^+(\\.+)?$"),
                       functions.concat(                              functions.format_number(functions.col("salary").cast("double").divide(1000), 1),
                               functions.lit("k") // 添加 'k'
                     )
             ).otherwise("未知") // 如果格式不正确,填充为 '未知'
     )
);


对指定内容举行正则更换。选择city的字段值将时更换为空。 
functions.regexp_replace(cleanedDf.col("city"), "市", "")

5. Spark官方使用文档:functions (Spark 3.5.4 )
整体代码:
​import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.functions;​public class DataCleaning {    public static void main(String[] args) {        // 数据库连接信息        String jdbcUrl = "jdbc:mysql://82.157.185.251:3306/job_demo?useUnicode=true&characterEncoding=utf-8";        String user = "root";        String password = "138748";        String tableName = "jobTest";​        // 创建 SparkSession        SparkSession spark = SparkSession.builder()              .appName("Job Data Cleaning")              .config("spark.master", "local")              .getOrCreate();​        // 从数据库加载数据        Dataset<Row> df = spark.read()              .format("jdbc")              .option("url", jdbcUrl)              .option("dbtable", tableName)              .option("user", user)              .option("password", password)              .load();​        // 去重        Dataset<Row> cleanedDf = df.dropDuplicates();​//      //填充缺失值,处置惩罚空字符串//      String[] columnsToFill = {"jobName", "company", "salary", "experience", "educational", "city", "majorName", "companyScale", "companyStatus"};//      for (String column : columnsToFill) {//          cleanedDf = cleanedDf.withColumn(column,//                  functions.when(functions.col(column).equalTo(""), "未知")//                          .otherwise(functions.col(column)));//      }​        cleanedDf = cleanedDf.na().fill("未知", new String[]{"jobName", "company", "salary", "experience", "educational", "city", "majorName", "companyScale", "companyStatus"});        //处置惩罚薪资,删除0和-1的记载        cleanedDf = cleanedDf.filter(cleanedDf.col("salary").notEqual("0")              .and(cleanedDf.col("salary").notEqual("-1")));        //处置惩罚薪资,检查是否包罗 'k' 并举行相应转换        cleanedDf = cleanedDf.withColumn("salary",                functions.when(                        functions.col("salary").contains("k"),                        functions.col("salary") // 假如包罗 'k',保持原样              ).otherwise(                        functions.when(                                functions.col("salary").rlike("^+(\\.+)?$"), // 检查是否为有效数字                                functions.concat(                                        functions.format_number(functions.col("salary").cast("double").divide(1000), 1), // 除以1000保留两位小数                                        functions.lit("k") // 加上 'k'                              )                      ).otherwise("未知") // 假如格式不精确,则填充为 "未知"              )      );​        //去掉 city 列中包罗 "市" 的部门        cleanedDf = cleanedDf.withColumn("city", functions.regexp_replace(cleanedDf.col("city"), "市", "")      );        //生存清洗后的数据        cleanedDf.write()              .format("jdbc")              .option("url", jdbcUrl)              .option("dbtable", "cleaned_jobTEST")              .option("user", user)              .option("password", password)              .mode("overwrite")              .save();​        // 停止 SparkSession        spark.stop();​  }} ​

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Java爬虫——使用Spark举行数据清楚