Java爬虫——使用Spark举行数据清楚

打印 上一主题 下一主题

主题 977|帖子 977|积分 2946

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1. 依靠引入

  1. <dependency>
  2.    <groupId>org.apache.spark</groupId>
  3.    <artifactId>spark-core_2.13</artifactId>
  4.    <version>3.5.3</version>
  5. </dependency>
  6. <dependency>
  7.   <groupId>org.apache.spark</groupId>
  8.   <artifactId>spark-sql_2.13</artifactId>
  9.   <version>3.5.3</version>
  10. </dependency>
复制代码
2. 数据加载

从 MySQL 数据库中加载 jobTest 表中的数据,使用 Spark 的 JDBC 功能连接到数据库。
代码片段
  1. // 数据库连接信息
  2. String jdbcUrl = "jdbc:mysql://82.157.185.251:3306/job_demo?useUnicode=true";
  3. String user = "root";
  4. String password = "138748";
  5. String tableName = "jobTest";
  6. // 创建 SparkSession
  7. SparkSession spark = SparkSession.builder()
  8.    .appName("Job Data Cleaning")
  9.    .config("spark.master", "local")
  10.    .getOrCreate();
  11. Dataset<Row> df = spark.read()
  12.        .format("jdbc")
  13.        .option("url", jdbcUrl)
  14.        .option("dbtable", tableName)
  15.        .option("user", user)
  16.        .option("password", password)
  17.        .load();
复制代码

3. 数据处置惩罚

3.1 数据去重

删除数据表中完全重复的记载,确保数据唯一性。
代码片段
  1. Dataset<Row> cleanedDf = df.dropDuplicates();
复制代码
作用


  • 删除全部字段值完全雷同的记载,去除重复数据。
  • 包管数据唯一性,避免重复记载对分析效果的干扰。

3.2 填充缺失值

针对特定字段(如 jobName、company 等),将空值或空字符串更换为默认值 "未知"。
代码片段
  1. cleanedDf = cleanedDf.na()
  2.             .fill("未知", new String[]
  3.             {"jobName", "company", "salary", "experience", "educational",
  4.               "city", "majorName", "companyScale", "companyStatus"});
复制代码
作用


  • 避免空值或空字符串对数据分析造成影响。
  • 填充后,字段中无空值,便于后续数据处置惩罚。

3.3 薪资数据清洗



  • 删除无效记载:

    • 删除薪资字段中值为 0 或 -1 的记载,这些值被认为是无效的薪资信息。

  • 薪资格式标准化:

    • 假如薪资字段包罗 k(如 5k-10k),保持原样。
    • 假如薪资字段是数字类型(如 5000),将其转换为千为单元,并添加 k 后缀(如 5000 转换为 5k)。
    • 假如薪资字段格式不精确,则填充为 "未知"。

代码片段
  1. cleanedDf = cleanedDf.filter(cleanedDf.col("salary").notEqual("0")
  2.        .and(cleanedDf.col("salary").notEqual("-1")));
  3. cleanedDf = cleanedDf.withColumn("salary",
  4.        functions.when(
  5.                functions.col("salary").contains("k"),
  6.                functions.col("salary")
  7.        ).otherwise(
  8.                functions.when(
  9.                        functions.col("salary").rlike("^[0-9]+(\\.[0-9]+)?$"),
  10.                        functions.concat(                            functions.format_number(functions.col("salary").cast("double").divide(1000), 1),
  11.                                functions.lit("k")
  12.                        )
  13.                ).otherwise("未知")
  14.        )
  15. );
复制代码
作用


  • 删除无效薪资记载,包管数据有效性。
  • 将薪资标准化为统一的格式(包罗 k),便于后续分析。

3.4 都会字段清洗

去掉 city 字段中多余的“市”字,保留都会名称。
代码片段
  1. cleanedDf = cleanedDf.withColumn("city", functions.regexp_replace(cleanedDf.col("city"), "市", ""));
复制代码
作用


  • 清洗都会字段中的冗余信息,保留核心内容(都会名称)。
  • 进步字段的规范性,便于数据展示和分析。

3.5 数据生存

将清洗后的数据生存至 MySQL 数据库的新表 cleaned_jobTEST 中。
代码片段
  1. cleanedDf.write()
  2.        .format("jdbc")
  3.        .option("url", jdbcUrl)
  4.        .option("dbtable", "cleaned_jobTEST")
  5.        .option("user", user)
  6.        .option("password", password)
  7.        .mode("overwrite")
  8.        .save();
复制代码
作用


  • 生存清洗后的数据,供后续分析或其他系统使用。
  • 新表 cleaned_jobTEST 覆盖写入,确保每次运行清洗后的数据是一致的
4. 常用方法:
在每一步数据处置惩罚的时候,都会用到一个方法
cleanedDf.withColumn(...,...)
当我们想要处置惩罚某一个字段的信息时会用到这个函数,第一个参数为想要更新或者添加的字段。第二个参数是我们想要更新的条件。
  1. functions.when(condition, value).otherwise(...)
复制代码
when是基于条件选择字段值,condition是条件,value为假如满意条件举行的操作.otherwise为不满意条件要举行的操作。
  1. functions.col("salary")
复制代码
指定字段列,这里是选择salary这个字段
  1. functions.contains("k")
复制代码
检查字段值中是否包罗某个条件,例如检查字段值是否包罗指定字符串(如 "k")。
  1. functions.rlike(regex)
复制代码
检查字段值是否包罗符合条件,这里我使用了正则表达式,检查字段中是否包罗数字。
  1. functions.cast("type")
复制代码
将字段值转换为指定数据类型,type为字段类型
  1. functions.format_number(value, decimalPlaces)
复制代码
格式化数字值并保留指定小数
  1. functions.divide
复制代码
转换为千分单元
  1. functions.concat()
复制代码
拼接字段值
  1. functions.lit("value")
复制代码
创建字段值
  1. cleanedDf = cleanedDf.withColumn("salary",
  2.        functions.when(
  3.                functions.col("salary").contains("k"),
  4.                functions.col("salary") // 如果包含 'k',保持原样
  5.        ).otherwise(
  6.                functions.when(
  7.                        functions.col("salary").rlike("^[0-9]+(\\.[0-9]+)?$"),
  8.                        functions.concat(                              functions.format_number(functions.col("salary").cast("double").divide(1000), 1),
  9.                                functions.lit("k") // 添加 'k'
  10.                        )
  11.                ).otherwise("未知") // 如果格式不正确,填充为 '未知'
  12.        )
  13. );
复制代码
对指定内容举行正则更换。选择city的字段值将时更换为空。 
  1. functions.regexp_replace(cleanedDf.col("city"), "市", "")
复制代码


5. Spark官方使用文档:functions (Spark 3.5.4 )
整体代码:
  1. ​import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.functions;​public class DataCleaning {    public static void main(String[] args) {        // 数据库连接信息        String jdbcUrl = "jdbc:mysql://82.157.185.251:3306/job_demo?useUnicode=true&characterEncoding=utf-8";        String user = "root";        String password = "138748";        String tableName = "jobTest";​        // 创建 SparkSession        SparkSession spark = SparkSession.builder()                .appName("Job Data Cleaning")                .config("spark.master", "local")                .getOrCreate();​        // 从数据库加载数据        Dataset<Row> df = spark.read()                .format("jdbc")                .option("url", jdbcUrl)                .option("dbtable", tableName)                .option("user", user)                .option("password", password)                .load();​        // 去重        Dataset<Row> cleanedDf = df.dropDuplicates();​//        //填充缺失值,处置惩罚空字符串//        String[] columnsToFill = {"jobName", "company", "salary", "experience", "educational", "city", "majorName", "companyScale", "companyStatus"};//        for (String column : columnsToFill) {//            cleanedDf = cleanedDf.withColumn(column,//                    functions.when(functions.col(column).equalTo(""), "未知")//                            .otherwise(functions.col(column)));//        }​        cleanedDf = cleanedDf.na().fill("未知", new String[]{"jobName", "company", "salary", "experience", "educational", "city", "majorName", "companyScale", "companyStatus"});        //处置惩罚薪资,删除0和-1的记载        cleanedDf = cleanedDf.filter(cleanedDf.col("salary").notEqual("0")                .and(cleanedDf.col("salary").notEqual("-1")));        //处置惩罚薪资,检查是否包罗 'k' 并举行相应转换        cleanedDf = cleanedDf.withColumn("salary",                functions.when(                        functions.col("salary").contains("k"),                        functions.col("salary") // 假如包罗 'k',保持原样                ).otherwise(                        functions.when(                                functions.col("salary").rlike("^[0-9]+(\\.[0-9]+)?$"), // 检查是否为有效数字                                functions.concat(                                        functions.format_number(functions.col("salary").cast("double").divide(1000), 1), // 除以1000保留两位小数                                        functions.lit("k") // 加上 'k'                                )                        ).otherwise("未知") // 假如格式不精确,则填充为 "未知"                )        );​        //去掉 city 列中包罗 "市" 的部门        cleanedDf = cleanedDf.withColumn("city", functions.regexp_replace(cleanedDf.col("city"), "市", "")        );        //生存清洗后的数据        cleanedDf.write()                .format("jdbc")                .option("url", jdbcUrl)                .option("dbtable", "cleaned_jobTEST")                .option("user", user)                .option("password", password)                .mode("overwrite")                .save();​        // 停止 SparkSession        spark.stop();​    }}
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连全瓷种植牙齿制作中心

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表