马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
1. 依靠引入
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.13</artifactId>
- <version>3.5.3</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.13</artifactId>
- <version>3.5.3</version>
- </dependency>
复制代码 2. 数据加载
从 MySQL 数据库中加载 jobTest 表中的数据,使用 Spark 的 JDBC 功能连接到数据库。
代码片段:
- // 数据库连接信息
- String jdbcUrl = "jdbc:mysql://82.157.185.251:3306/job_demo?useUnicode=true";
- String user = "root";
- String password = "138748";
- String tableName = "jobTest";
-
- // 创建 SparkSession
- SparkSession spark = SparkSession.builder()
- .appName("Job Data Cleaning")
- .config("spark.master", "local")
- .getOrCreate();
-
- Dataset<Row> df = spark.read()
- .format("jdbc")
- .option("url", jdbcUrl)
- .option("dbtable", tableName)
- .option("user", user)
- .option("password", password)
- .load();
复制代码 3. 数据处置惩罚
3.1 数据去重
删除数据表中完全重复的记载,确保数据唯一性。
代码片段:
- Dataset<Row> cleanedDf = df.dropDuplicates();
复制代码 作用:
- 删除全部字段值完全雷同的记载,去除重复数据。
- 包管数据唯一性,避免重复记载对分析效果的干扰。
3.2 填充缺失值
针对特定字段(如 jobName、company 等),将空值或空字符串更换为默认值 "未知"。
代码片段:
- cleanedDf = cleanedDf.na()
- .fill("未知", new String[]
- {"jobName", "company", "salary", "experience", "educational",
- "city", "majorName", "companyScale", "companyStatus"});
复制代码 作用:
- 避免空值或空字符串对数据分析造成影响。
- 填充后,字段中无空值,便于后续数据处置惩罚。
3.3 薪资数据清洗
- 删除无效记载:
- 删除薪资字段中值为 0 或 -1 的记载,这些值被认为是无效的薪资信息。
- 薪资格式标准化:
- 假如薪资字段包罗 k(如 5k-10k),保持原样。
- 假如薪资字段是数字类型(如 5000),将其转换为千为单元,并添加 k 后缀(如 5000 转换为 5k)。
- 假如薪资字段格式不精确,则填充为 "未知"。
代码片段:
- cleanedDf = cleanedDf.filter(cleanedDf.col("salary").notEqual("0")
- .and(cleanedDf.col("salary").notEqual("-1")));
- cleanedDf = cleanedDf.withColumn("salary",
- functions.when(
- functions.col("salary").contains("k"),
- functions.col("salary")
- ).otherwise(
- functions.when(
- functions.col("salary").rlike("^[0-9]+(\\.[0-9]+)?$"),
- functions.concat( functions.format_number(functions.col("salary").cast("double").divide(1000), 1),
- functions.lit("k")
- )
- ).otherwise("未知")
- )
- );
复制代码 作用:
- 删除无效薪资记载,包管数据有效性。
- 将薪资标准化为统一的格式(包罗 k),便于后续分析。
3.4 都会字段清洗
去掉 city 字段中多余的“市”字,保留都会名称。
代码片段:
- cleanedDf = cleanedDf.withColumn("city", functions.regexp_replace(cleanedDf.col("city"), "市", ""));
复制代码 作用:
- 清洗都会字段中的冗余信息,保留核心内容(都会名称)。
- 进步字段的规范性,便于数据展示和分析。
3.5 数据生存
将清洗后的数据生存至 MySQL 数据库的新表 cleaned_jobTEST 中。
代码片段:
- cleanedDf.write()
- .format("jdbc")
- .option("url", jdbcUrl)
- .option("dbtable", "cleaned_jobTEST")
- .option("user", user)
- .option("password", password)
- .mode("overwrite")
- .save();
复制代码 作用:
- 生存清洗后的数据,供后续分析或其他系统使用。
- 新表 cleaned_jobTEST 覆盖写入,确保每次运行清洗后的数据是一致的
4. 常用方法:
在每一步数据处置惩罚的时候,都会用到一个方法
cleanedDf.withColumn(...,...)
当我们想要处置惩罚某一个字段的信息时会用到这个函数,第一个参数为想要更新或者添加的字段。第二个参数是我们想要更新的条件。
- functions.when(condition, value).otherwise(...)
复制代码 when是基于条件选择字段值,condition是条件,value为假如满意条件举行的操作.otherwise为不满意条件要举行的操作。
指定字段列,这里是选择salary这个字段
检查字段值中是否包罗某个条件,例如检查字段值是否包罗指定字符串(如 "k")。
检查字段值是否包罗符合条件,这里我使用了正则表达式,检查字段中是否包罗数字。
将字段值转换为指定数据类型,type为字段类型
- functions.format_number(value, decimalPlaces)
复制代码 格式化数字值并保留指定小数
转换为千分单元
拼接字段值
创建字段值
- cleanedDf = cleanedDf.withColumn("salary",
- functions.when(
- functions.col("salary").contains("k"),
- functions.col("salary") // 如果包含 'k',保持原样
- ).otherwise(
- functions.when(
- functions.col("salary").rlike("^[0-9]+(\\.[0-9]+)?$"),
- functions.concat( functions.format_number(functions.col("salary").cast("double").divide(1000), 1),
- functions.lit("k") // 添加 'k'
- )
- ).otherwise("未知") // 如果格式不正确,填充为 '未知'
- )
- );
复制代码 对指定内容举行正则更换。选择city的字段值将时更换为空。
- functions.regexp_replace(cleanedDf.col("city"), "市", "")
复制代码
5. Spark官方使用文档:functions (Spark 3.5.4 )
整体代码:
- import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.functions;public class DataCleaning { public static void main(String[] args) { // 数据库连接信息 String jdbcUrl = "jdbc:mysql://82.157.185.251:3306/job_demo?useUnicode=true&characterEncoding=utf-8"; String user = "root"; String password = "138748"; String tableName = "jobTest"; // 创建 SparkSession SparkSession spark = SparkSession.builder() .appName("Job Data Cleaning") .config("spark.master", "local") .getOrCreate(); // 从数据库加载数据 Dataset<Row> df = spark.read() .format("jdbc") .option("url", jdbcUrl) .option("dbtable", tableName) .option("user", user) .option("password", password) .load(); // 去重 Dataset<Row> cleanedDf = df.dropDuplicates();// //填充缺失值,处置惩罚空字符串// String[] columnsToFill = {"jobName", "company", "salary", "experience", "educational", "city", "majorName", "companyScale", "companyStatus"};// for (String column : columnsToFill) {// cleanedDf = cleanedDf.withColumn(column,// functions.when(functions.col(column).equalTo(""), "未知")// .otherwise(functions.col(column)));// } cleanedDf = cleanedDf.na().fill("未知", new String[]{"jobName", "company", "salary", "experience", "educational", "city", "majorName", "companyScale", "companyStatus"}); //处置惩罚薪资,删除0和-1的记载 cleanedDf = cleanedDf.filter(cleanedDf.col("salary").notEqual("0") .and(cleanedDf.col("salary").notEqual("-1"))); //处置惩罚薪资,检查是否包罗 'k' 并举行相应转换 cleanedDf = cleanedDf.withColumn("salary", functions.when( functions.col("salary").contains("k"), functions.col("salary") // 假如包罗 'k',保持原样 ).otherwise( functions.when( functions.col("salary").rlike("^[0-9]+(\\.[0-9]+)?$"), // 检查是否为有效数字 functions.concat( functions.format_number(functions.col("salary").cast("double").divide(1000), 1), // 除以1000保留两位小数 functions.lit("k") // 加上 'k' ) ).otherwise("未知") // 假如格式不精确,则填充为 "未知" ) ); //去掉 city 列中包罗 "市" 的部门 cleanedDf = cleanedDf.withColumn("city", functions.regexp_replace(cleanedDf.col("city"), "市", "") ); //生存清洗后的数据 cleanedDf.write() .format("jdbc") .option("url", jdbcUrl) .option("dbtable", "cleaned_jobTEST") .option("user", user) .option("password", password) .mode("overwrite") .save(); // 停止 SparkSession spark.stop(); }}
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |