头歌 网约车大数据综合项目——基于Spark的数据清洗

打印 上一主题 下一主题

主题 980|帖子 980|积分 2940

第1关:网约车打消订单数据清洗

任务形貌
基于 EduCoder 平台提供的初始数据集(数据集存放在 /data/workspace/myshixun/ProvOrderCancel 中),按照下面的要求,完成网约车打消订单数据的清洗工作。
编程要求
判断一行数据字段是否完备,假如字段长度小于 8 且有不完备字段(字段值为空),则清洗掉这一行数据;
去重清洗:若有雷同订单 id(orderid)只保存第一行,其他的清洗掉;
打消来由(cancelreason)有大量的字符串 null,请将这些字符串用"未知"替换;
将数据集中的订单时间(ordertime)、订单打消时间(canceltime)转换为 “yyyy-MM-dd HH:mm:ss”格式,同时只保存订单时间和订单打消时间在 2019 年 03 月 07 日的数据,其余数据全部清洗掉;
处理惩罚数据集中的行政区划代码(address),结合 MySQL 数据库 trafficdb 的表 t_address 中的 address_code 与 address_name 数据对应,将其转换成对应的地域名称,在行政区划代码(address)字段后插入,列名为 districtname,若数据中行政区划代码在数据库没有找到对应的行政区名称,则将行政区名设置为“未知”;
清洗完的数据保存为一个 csv 文件,存放至/root/files下,字符之间分割符为|。
数据集说明
本数据集是湖南网约车打消订单数据,包含八个字段的信息,数据集的字段含义说明如下:
字段名    说明
companyid    公司ID名
address    行政区划代码
orderid    订单ID
ordertime    订单时间
canceltime    取消时间
operator    操纵类型
canceltypecode    取消操纵类型
cancelreason    取消的原因

MySQL 数据库 mydb 毗连方式:
url:jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8;
用户名:root;
密码:123123。
表:t_address
列名    类型    介绍
address_code    varchar(255)    行政编码
address_name    varchar(255)    行政地域
测试说明
平台会对你编写的代码举行测试:
假如代码正确,会执行你的代码,验证网约车打消订单数据是否清洗成功。点击评测按钮后,可以查察清洗后的数据,是否清洗成功。若步伐未通过的情况下,可以点击测试集查察具体问题。
开始你的任务吧,祝你成功!
第2关:网约车成功订单数据清洗

任务形貌
基于 EduCoder 平台提供的初始数据集(数据集存放在 /data/workspace/myshixun/ProvOrderCreate 中),按照下面的要求,完成网约车成功订单数据清洗的清洗工作。
编程要求
去重清洗:若有雷同订单 id(orderid)只保存第一行,其他的清洗掉;
因乘客便签(passengernote)、加密字段(encrypt_c)和支付类型(faretype)中有大量的空值,所以删除字段名为 乘客便签(passengernote)、加密字段(encrypt_c)和支付类型(faretype)的所有数据;
去除上述字段后,再判断一行数据字段是否完备,假如字段长度小于 11 或有不完备字段(字段值为空),则清洗掉这一行数据;
将数据集中的订单时间(ordertime)、出发时间(departtime)转换为 “yyyy-MM-dd HH:mm:ss”格式,同时只保存订单时间(ordertime)和出发时间(departtime)在 2019 年 03 月 07 日的数据,其余数据全部清洗掉;
处理惩罚数据集中的行政区划代码(address),结合 MySQL 数据库 trafficdb 的表 t_address 中的 address_code 与 address_name 数据对应,将其转换成对应的地域名称,在行政区划代码(address)字段后插入,列名为 districtname,若数据中行政区划代码在数据库没有找到对应的行政区名称,则将行政区名设置为“未知”;
经度字段有出发地经度(deplongitude)、出发地纬度(destlongitude),纬度字段为出发地纬度(deplatitude)、目的地纬度(destlatitude),经度字段字符串长度有 9 位而纬度字段字符串长度有 8 位。将经度举行处理惩罚,如 113058072 变为 113.058072 。将纬度举行处理惩罚,如 25770062 变为 25.770062。
清洗完的数据保存为一个 csv 文件,存放至/root/files下,字符之间分割符为\t。
数据集说明
本数据集是湖南网约车成功订单数据,包含 14 个字段的信息,数据集的字段含义说明如下:
字段名    说明
companyid    公司ID名
address    行政区划代码
orderid    订单ID
departtime    订单ID
ordertime    订单时间
passengernote    乘客便签
departure    出发地
deplongitude    出发地经度
deplatitude    出发地纬度
destination    目的地
destlongitude    目的地的经度
destlatitude    目的地的纬度
encrypt_c    加密字段
faretype    乘客类型

MySQL 数据库 mydb 毗连方式:
url:jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8;
用户名:root;
密码:123123。
表:t_address
列名    类型    介绍
address_code    varchar(255)    行政编码
address_name    varchar(255)    行政地域
测试说明
平台会对你编写的代码举行测试:
假如代码正确,会执行你的代码,验证网约车成功订单数据是否清洗成功。点击评测按钮后,可以查察清洗后的数据,是否清洗成功。若步伐未通过的情况下,可以点击测试集查察具体问题。
开始你的任务吧,祝你成功!



第一关答案

  1. import org.apache.log4j.Level;
  2. import org.apache.log4j.Logger;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Row;
  5. import org.apache.spark.sql.SparkSession;
  6. import org.apache.spark.sql.functions;
  7. public class TrainClean {
  8.     public static void main(String[] args) {
  9.         // 设置日志级别为ERROR,减少日志输出
  10.         Logger.getLogger("org").setLevel(Level.ERROR);
  11.         // 创建Spark会话
  12.         SparkSession spark = SparkSession.builder()
  13.                 .master("local")
  14.                 .appName("Boxoffice_Movie")
  15.                 .getOrCreate();
  16.         // 加载数据
  17.         Dataset<Row> data = spark.read()
  18.                 .option("header", "true")  // 使用第一行作为列名
  19.                 .option("inferSchema", "true")  // 自动推断数据类型
  20.                 .csv("/data/workspace/myshixun/ProvOrderCancel");
  21.         // 去掉列名中的多余空格
  22.         for (String column : data.columns()) {
  23.             data = data.withColumnRenamed(column, column.trim());
  24.         }
  25.         // 去掉数据中的前后空格
  26.         for (String column : data.columns()) {
  27.             data = data.withColumn(column, functions.trim(data.col(column)));
  28.         }
  29.         // 清洗数据:检查字段是否完整
  30.         // 使用Spark SQL API来过滤数据
  31.         data = data.filter(row -> {
  32.             // 检查行中字段的数量是否小于8
  33.             if (row.length() < 8) {
  34.                 return false;
  35.             }
  36.             // 检查行中是否有空值
  37.             for (int i = 0; i < row.length(); i++) {
  38.                 if (row.isNullAt(i)) {
  39.                     return false;
  40.                 }
  41.             }
  42.             // 如果没有问题,返回true,保留该行
  43.             return true;
  44.         });
  45.         // 去重清洗:保留每个orderid的第一行
  46.         data = data.dropDuplicates("orderid");
  47.         // 替换 cancelreason 列中的 "null" 字符串为 "未知"
  48.         data = data.withColumn(
  49.                 "cancelreason",
  50.                 functions.when(data.col("cancelreason").equalTo("null"), "未知")
  51.                          .otherwise(data.col("cancelreason"))
  52.         );
  53.         // 转换时间格式: 将 ordetime 和 canceltime 转换为 "yyyy-MM-dd HH:mm:ss" 格式
  54.         data = data.withColumn("ordertime", functions.date_format(functions.to_timestamp(data.col("ordertime"), "yyyyMMddHHmmss"), "yyyy-MM-dd HH:mm:ss"))
  55.                    .withColumn("canceltime", functions.date_format(functions.to_timestamp(data.col("canceltime"), "yyyyMMddHHmmss"), "yyyy-MM-dd HH:mm:ss"));
  56.         // 过滤出 2019年03月07日 的数据
  57.         // 过滤条件:ordertime 和 canceltime 在 2019-03-07
  58.         data = data.filter(functions.to_date(data.col("ordertime"), "yyyy-MM-dd").equalTo(functions.lit("2019-03-07")))
  59.                    .filter(functions.to_date(data.col("canceltime"), "yyyy-MM-dd").equalTo(functions.lit("2019-03-07")));
  60.         
  61.         Dataset<Row> addressMapping = spark.read()
  62.         .format("jdbc")
  63.         .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
  64.         .option("dbtable", "t_address")
  65.         .option("user", "root")
  66.         .option("password", "123123")
  67.         .load();
  68.         Dataset<Row> resultData = data.join(
  69.             addressMapping,
  70.             data.col("address").equalTo(addressMapping.col("address_code")),
  71.             "left_outer"
  72.         )
  73.         .withColumn(
  74.         "districtname",
  75.         functions.when(addressMapping.col("address_name").isNotNull(),
  76.                 addressMapping.col("address_name"))
  77.                 .otherwise("未知")
  78.         )
  79.         .drop("address_code", "address_name");
  80.         resultData = resultData.repartition(1);
  81.         
  82.         resultData.write()
  83.         .option("delimiter", "|")
  84.         .csv("/root/files");
  85.         // 停止Spark会话
  86.         spark.stop();
  87.     }
  88. }
复制代码
第二关答案

  1. import org.apache.log4j.Level;
  2. import org.apache.log4j.Logger;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Row;
  5. import org.apache.spark.sql.SparkSession;
  6. import org.apache.spark.sql.api.java.UDF1;
  7. import org.apache.spark.sql.types.DataTypes;
  8. public class OrderClean {
  9.     public static void main(String[] args) {
  10.         /********** Begin **********/
  11.         Logger.getLogger("org").setLevel(Level.ERROR);
  12.         SparkSession spark = SparkSession.builder().master("local").appName("OrderClean").getOrCreate();
  13.         Dataset<Row> orderdata = spark.read().option("delimiter", ",").csv("/data/workspace/myshixun/ProvOrderCreate")
  14.                 .toDF("companyid", "address", "orderid", "departtime", "ordertime", "passengernote", "departure", "deplongitude", "deplatitude", "destination", "destlongitude", "destlatitude", "encrypt_c", "faretype").select("companyid", "address", "orderid", "departtime", "ordertime", "departure", "deplongitude", "deplatitude", "destination", "destlongitude", "destlatitude");
  15.         // 清除订单时间和撤销时间不为2019年3月7日的的数据
  16.         Dataset<Row> data = orderdata.where("ordertime like '20190307%' and departtime like '20190307%'");
  17.         // 清除掉字段长度不足的数据
  18.         Dataset<Row> data1 = data.na().drop();
  19.         // 订单去重
  20.         Dataset<Row> data2 = data1.dropDuplicates("orderid");
  21.         // 时间格式化
  22.         spark.udf().register("dateformat", (UDF1<String, String>) s -> s.substring(0, 4) + "-" + s.substring(4, 6) + "-" + s.substring(6, 8) + " " + s.substring(8, 10) + ":" + s.substring(10, 12) + ":" + s.substring(12, 14), DataTypes.StringType);
  23.         // 处理经纬度数据
  24.         spark.udf().register("change", (UDF1<String, String>) s ->
  25.                         s.substring(0, s.length() - 6) + "." + s.substring(s.length() - 6)
  26.                 , DataTypes.StringType);
  27.         // 获取 t_address 表数据
  28.         Dataset<Row> t_address = spark.read().format("jdbc")
  29.                 .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8&useSSL=false")
  30.                 .option("dbtable", "t_address")
  31.                 .option("user", "root")
  32.                 .option("password", "123123")
  33.                 .load();
  34.         t_address.registerTempTable("t_address");
  35.         data2.registerTempTable("data");
  36.         // 两表关联并存入txt文件中,分隔符使用“\t”
  37.         spark.sql("select companyid, data.address address, t_address.address_name districtname, orderid, dateformat(departtime) departtime, dateformat(ordertime) ordertime, departure, change(deplongitude) deplongitude, change(deplatitude) deplatitude, destination, change(destlongitude) destlongitude, change(destlatitude) destlatitude from data left join t_address on data.address = t_address.address_code").repartition(1).write().option("delimiter", "\t").csv("/root/files");
  38.         /********** End **********/
  39.     }
  40. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万万哇

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表