第1关:网约车打消订单数据清洗
任务形貌
基于 EduCoder 平台提供的初始数据集(数据集存放在 /data/workspace/myshixun/ProvOrderCancel 中),按照下面的要求,完成网约车打消订单数据的清洗工作。
编程要求
判断一行数据字段是否完备,假如字段长度小于 8 且有不完备字段(字段值为空),则清洗掉这一行数据;
去重清洗:若有雷同订单 id(orderid)只保存第一行,其他的清洗掉;
打消来由(cancelreason)有大量的字符串 null,请将这些字符串用"未知"替换;
将数据集中的订单时间(ordertime)、订单打消时间(canceltime)转换为 “yyyy-MM-dd HH:mm:ss”格式,同时只保存订单时间和订单打消时间在 2019 年 03 月 07 日的数据,其余数据全部清洗掉;
处理惩罚数据集中的行政区划代码(address),结合 MySQL 数据库 trafficdb 的表 t_address 中的 address_code 与 address_name 数据对应,将其转换成对应的地域名称,在行政区划代码(address)字段后插入,列名为 districtname,若数据中行政区划代码在数据库没有找到对应的行政区名称,则将行政区名设置为“未知”;
清洗完的数据保存为一个 csv 文件,存放至/root/files下,字符之间分割符为|。
数据集说明
本数据集是湖南网约车打消订单数据,包含八个字段的信息,数据集的字段含义说明如下:
字段名 说明
companyid 公司ID名
address 行政区划代码
orderid 订单ID
ordertime 订单时间
canceltime 取消时间
operator 操纵类型
canceltypecode 取消操纵类型
cancelreason 取消的原因
MySQL 数据库 mydb 毗连方式:
url:jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8;
用户名:root;
密码:123123。
表:t_address
列名 类型 介绍
address_code varchar(255) 行政编码
address_name varchar(255) 行政地域
测试说明
平台会对你编写的代码举行测试:
假如代码正确,会执行你的代码,验证网约车打消订单数据是否清洗成功。点击评测按钮后,可以查察清洗后的数据,是否清洗成功。若步伐未通过的情况下,可以点击测试集查察具体问题。
开始你的任务吧,祝你成功!
第2关:网约车成功订单数据清洗
任务形貌
基于 EduCoder 平台提供的初始数据集(数据集存放在 /data/workspace/myshixun/ProvOrderCreate 中),按照下面的要求,完成网约车成功订单数据清洗的清洗工作。
编程要求
去重清洗:若有雷同订单 id(orderid)只保存第一行,其他的清洗掉;
因乘客便签(passengernote)、加密字段(encrypt_c)和支付类型(faretype)中有大量的空值,所以删除字段名为 乘客便签(passengernote)、加密字段(encrypt_c)和支付类型(faretype)的所有数据;
去除上述字段后,再判断一行数据字段是否完备,假如字段长度小于 11 或有不完备字段(字段值为空),则清洗掉这一行数据;
将数据集中的订单时间(ordertime)、出发时间(departtime)转换为 “yyyy-MM-dd HH:mm:ss”格式,同时只保存订单时间(ordertime)和出发时间(departtime)在 2019 年 03 月 07 日的数据,其余数据全部清洗掉;
处理惩罚数据集中的行政区划代码(address),结合 MySQL 数据库 trafficdb 的表 t_address 中的 address_code 与 address_name 数据对应,将其转换成对应的地域名称,在行政区划代码(address)字段后插入,列名为 districtname,若数据中行政区划代码在数据库没有找到对应的行政区名称,则将行政区名设置为“未知”;
经度字段有出发地经度(deplongitude)、出发地纬度(destlongitude),纬度字段为出发地纬度(deplatitude)、目的地纬度(destlatitude),经度字段字符串长度有 9 位而纬度字段字符串长度有 8 位。将经度举行处理惩罚,如 113058072 变为 113.058072 。将纬度举行处理惩罚,如 25770062 变为 25.770062。
清洗完的数据保存为一个 csv 文件,存放至/root/files下,字符之间分割符为\t。
数据集说明
本数据集是湖南网约车成功订单数据,包含 14 个字段的信息,数据集的字段含义说明如下:
字段名 说明
companyid 公司ID名
address 行政区划代码
orderid 订单ID
departtime 订单ID
ordertime 订单时间
passengernote 乘客便签
departure 出发地
deplongitude 出发地经度
deplatitude 出发地纬度
destination 目的地
destlongitude 目的地的经度
destlatitude 目的地的纬度
encrypt_c 加密字段
faretype 乘客类型
MySQL 数据库 mydb 毗连方式:
url:jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8;
用户名:root;
密码:123123。
表:t_address
列名 类型 介绍
address_code varchar(255) 行政编码
address_name varchar(255) 行政地域
测试说明
平台会对你编写的代码举行测试:
假如代码正确,会执行你的代码,验证网约车成功订单数据是否清洗成功。点击评测按钮后,可以查察清洗后的数据,是否清洗成功。若步伐未通过的情况下,可以点击测试集查察具体问题。
开始你的任务吧,祝你成功!
第一关答案
- import org.apache.log4j.Level;
- import org.apache.log4j.Logger;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.functions;
- public class TrainClean {
- public static void main(String[] args) {
- // 设置日志级别为ERROR,减少日志输出
- Logger.getLogger("org").setLevel(Level.ERROR);
- // 创建Spark会话
- SparkSession spark = SparkSession.builder()
- .master("local")
- .appName("Boxoffice_Movie")
- .getOrCreate();
- // 加载数据
- Dataset<Row> data = spark.read()
- .option("header", "true") // 使用第一行作为列名
- .option("inferSchema", "true") // 自动推断数据类型
- .csv("/data/workspace/myshixun/ProvOrderCancel");
- // 去掉列名中的多余空格
- for (String column : data.columns()) {
- data = data.withColumnRenamed(column, column.trim());
- }
- // 去掉数据中的前后空格
- for (String column : data.columns()) {
- data = data.withColumn(column, functions.trim(data.col(column)));
- }
- // 清洗数据:检查字段是否完整
- // 使用Spark SQL API来过滤数据
- data = data.filter(row -> {
- // 检查行中字段的数量是否小于8
- if (row.length() < 8) {
- return false;
- }
- // 检查行中是否有空值
- for (int i = 0; i < row.length(); i++) {
- if (row.isNullAt(i)) {
- return false;
- }
- }
- // 如果没有问题,返回true,保留该行
- return true;
- });
- // 去重清洗:保留每个orderid的第一行
- data = data.dropDuplicates("orderid");
- // 替换 cancelreason 列中的 "null" 字符串为 "未知"
- data = data.withColumn(
- "cancelreason",
- functions.when(data.col("cancelreason").equalTo("null"), "未知")
- .otherwise(data.col("cancelreason"))
- );
- // 转换时间格式: 将 ordetime 和 canceltime 转换为 "yyyy-MM-dd HH:mm:ss" 格式
- data = data.withColumn("ordertime", functions.date_format(functions.to_timestamp(data.col("ordertime"), "yyyyMMddHHmmss"), "yyyy-MM-dd HH:mm:ss"))
- .withColumn("canceltime", functions.date_format(functions.to_timestamp(data.col("canceltime"), "yyyyMMddHHmmss"), "yyyy-MM-dd HH:mm:ss"));
- // 过滤出 2019年03月07日 的数据
- // 过滤条件:ordertime 和 canceltime 在 2019-03-07
- data = data.filter(functions.to_date(data.col("ordertime"), "yyyy-MM-dd").equalTo(functions.lit("2019-03-07")))
- .filter(functions.to_date(data.col("canceltime"), "yyyy-MM-dd").equalTo(functions.lit("2019-03-07")));
-
- Dataset<Row> addressMapping = spark.read()
- .format("jdbc")
- .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
- .option("dbtable", "t_address")
- .option("user", "root")
- .option("password", "123123")
- .load();
- Dataset<Row> resultData = data.join(
- addressMapping,
- data.col("address").equalTo(addressMapping.col("address_code")),
- "left_outer"
- )
- .withColumn(
- "districtname",
- functions.when(addressMapping.col("address_name").isNotNull(),
- addressMapping.col("address_name"))
- .otherwise("未知")
- )
- .drop("address_code", "address_name");
- resultData = resultData.repartition(1);
-
- resultData.write()
- .option("delimiter", "|")
- .csv("/root/files");
- // 停止Spark会话
- spark.stop();
- }
- }
复制代码 第二关答案
- import org.apache.log4j.Level;
- import org.apache.log4j.Logger;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.api.java.UDF1;
- import org.apache.spark.sql.types.DataTypes;
- public class OrderClean {
- public static void main(String[] args) {
- /********** Begin **********/
- Logger.getLogger("org").setLevel(Level.ERROR);
- SparkSession spark = SparkSession.builder().master("local").appName("OrderClean").getOrCreate();
- Dataset<Row> orderdata = spark.read().option("delimiter", ",").csv("/data/workspace/myshixun/ProvOrderCreate")
- .toDF("companyid", "address", "orderid", "departtime", "ordertime", "passengernote", "departure", "deplongitude", "deplatitude", "destination", "destlongitude", "destlatitude", "encrypt_c", "faretype").select("companyid", "address", "orderid", "departtime", "ordertime", "departure", "deplongitude", "deplatitude", "destination", "destlongitude", "destlatitude");
- // 清除订单时间和撤销时间不为2019年3月7日的的数据
- Dataset<Row> data = orderdata.where("ordertime like '20190307%' and departtime like '20190307%'");
- // 清除掉字段长度不足的数据
- Dataset<Row> data1 = data.na().drop();
- // 订单去重
- Dataset<Row> data2 = data1.dropDuplicates("orderid");
- // 时间格式化
- spark.udf().register("dateformat", (UDF1<String, String>) s -> s.substring(0, 4) + "-" + s.substring(4, 6) + "-" + s.substring(6, 8) + " " + s.substring(8, 10) + ":" + s.substring(10, 12) + ":" + s.substring(12, 14), DataTypes.StringType);
- // 处理经纬度数据
- spark.udf().register("change", (UDF1<String, String>) s ->
- s.substring(0, s.length() - 6) + "." + s.substring(s.length() - 6)
- , DataTypes.StringType);
- // 获取 t_address 表数据
- Dataset<Row> t_address = spark.read().format("jdbc")
- .option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8&useSSL=false")
- .option("dbtable", "t_address")
- .option("user", "root")
- .option("password", "123123")
- .load();
- t_address.registerTempTable("t_address");
- data2.registerTempTable("data");
- // 两表关联并存入txt文件中,分隔符使用“\t”
- spark.sql("select companyid, data.address address, t_address.address_name districtname, orderid, dateformat(departtime) departtime, dateformat(ordertime) ordertime, departure, change(deplongitude) deplongitude, change(deplatitude) deplatitude, destination, change(destlongitude) destlongitude, change(destlatitude) destlatitude from data left join t_address on data.address = t_address.address_code").repartition(1).write().option("delimiter", "\t").csv("/root/files");
- /********** End **********/
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |