9. 大数据集群(PySpark)+Hive+MySQL+PyEcharts+Flask:信用贷款风险分析与 ...

打印 上一主题 下一主题

主题 854|帖子 854|积分 2562

一、大数据集群介绍

  本案例部署了3个节点的完全分布式集群,开发环境如下:
节点/组件/安装包版本备注名称节点192.168.126.10master数据节点192.168.126.11slave1数据节点192.168.126.12slave2JDKjdk-8u281Java运行环境,Spark的运行必要JDK的支持Hadoophadoop-3.1.4提供HDFS、Hive运行环境支持。HDFS体系访问端口为:hdfs://192.168.126.10:9000Hivehive-3.1.2数据仓库PySparkspark-3.4.3-bin-hadoop3.tgzSpark集群的master节点的地址和端口为:spark://192.168.126.10:7077MySQL5.7.18存储数据分析结果,端口:3306,用户名:root,密码:123456pythonPython-3.9.0.tgz3.9.0版本的PythonMySQL Connectormysql-connector-java-5.1.32-bin.jarSpark 连接MySQL的驱动IntelliJ IDEAUltimate 2020.3编程工具IDEA 1. PySpark简介

  Spark集群是一种分布式计算框架,它基于内存计算,提供了高效的数据抽象和并行计算能力,能够处理大规模数据集的批处理和实时处理任务。Spark采用内存存储中间计算结果,可淘汰迭代运算的磁盘I/O,并通过并行计算有向无环图的优化,使其运行速率比MapReduce快100倍;Spark可以使用Hadoop YARN和Apache Mesos作为其资源管理和调治器,可以从多种数据源读取数据,如HDFS、HBase、MySQL等。
  PySpark则是Spark的Python API,允许Python开发者利用Spark的强大功能举行数据处理和分析。本案例采用PySpark大数据集群做数据分析与发掘,使用HDFS作为文件存储体系。PySpark由多个组件构成,包括Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算库)等。
  大数据集群运行时,Spark的管理页面如下图所示:

2. Hive简介

  Hive是一款基于Hadoop的数据仓库工具,可以将布局化的数据文件映射为一张数据库表,并提供类SQL查询功能。Hive的本质是将Hive SQL(HQL)转换成MapReduce步调,或者Tez、Spark等任务举行运算,使得用户可以通过SQL查询语言来查询Hadoop集群中的数据。
  Hive的应用场景


  • 大规模数据分析:Hive能够处理PB级别的大规模数据,通过分布式存储和计算,提高数据处理速率。
  • 数据仓库:Hive提供了数据仓库的基本功能,如数据界说、数据加载、数据查询、数据分析等,用户可以使用Hive创建数据库、表、分区等布局,以便于管理和查询数据。
  • 日志分析:Hive可以处理大规模的日志数据,如Web日志、应用步调日志等,通过HiveQL举行查询和分析,快速了解用户行为、应用步调运行环境等信息。
  Hive的存储与计算


  • 数据存储:Hive使用HDFS(Hadoop Distributed File System)来存储数据,支持多种数据存储格式,如TextFile、SequenceFile、ORCFile、Parquet等。
  • 计算引擎:Hive支持多种执行引擎,如MapReduce、Tez和Spark。用户可以根据数据特点和业务需求,选择符合的执行引擎来优化查询性能。
3. PyEcharts

  Pyecharts是一款基于Python的开源数据可视化库,它整合了Echarts与Python的优势,使得在Python环境中能够轻松创建出美观且交互性强的图表。Pyecharts支持多达数十种图表类型,包括折线图、柱状图、散点图、饼图等常见图表,以及舆图、热力图、关系图等特色图表。这些丰富的图表类型能够满意差别场景下的数据可视化需求。
通过Pyecharts,可以绘制出如天下各省份GDP数据舆图、中国各地区生齿统计数据柱状图、股票买卖业务数据K线图、贩卖数据折线图等多种图表,直观地展示数据。
  Pyecharts官方文档: https://pyecharts.org/#/zh-cn/intro
  PyEcharts-gallery,图表示例: https://gallery.pyecharts.org/#/README
4. Flask

  Flask是一个轻量级、机动、可扩展的Web应用框架,使用Python编写,适适用于构建中小型Web应用步调。它提供了基本的路由、模板引擎、URL构建、错误处理等功能,并支持插件和扩展来增强其功能。
Flask提供了一个路由体系,可以将差别的URL路径映射到相应的处理函数上。当用户访问特定的URL时,Flask会调用相应的处理函数,并返反相应。
Flask内置了一个基于Jinja2的模板引擎,开发者可以使用模板来渲染HTML页面。通过将动态数据通报给模板,并使用模板语言来界说页面的布局和样式,开发者可以轻松地创建和管理Web页面的外观和布局。
   Flask中文版教程
  Flask英文版教程
  本案例的分析思路:Hive作为数据仓库,Spark为计算引擎,Spark分析结果存储在Hive中,少部分结果存储在MySQL,PyEcharts将MySQL中的分析结果绘制成图表,Flask再将图表渲染到页面做成数据可视化大屏。
二、信用贷款数据集介绍

  信用贷款是指以乞贷人的信誉发放的贷款,乞贷人不必要提供包管。其特征就是债务人无需提供抵押品或第三方包管,仅凭自己的信誉就能取得贷款,并以乞贷人信用程度作为还款包管。随着金融市场的不断发展,信用贷款作为一种无抵押、无包管的贷款形式,其市场需求日益增长。然而,信用贷款也陪同着较高的风险,因此,对信用贷款风险举行全面、正确的分析成为贷款机构的重要任务。本案例有7个数据集,分别为用户基本信息的训练集、测试集和测试结果集,用户登录信息的训练集和测试集,用户更新信息的训练集和测试集,如下表所示。
名称说明Training_Master.csv用户基本信息表(训练集)Test_Master.csv用户基本信息表(测试集)Test_Master_result.csv用户基本信息表(测试结果集)Training_UserUpdate.csv用户登录信息表(训练集)Test_UserUpdate.csv用户登录信息表(测试集)Training_LogInfo.csv用户更新信息表(训练集)Test_LogInfo.csv用户更新信息表(测试集)  文件截图如下所示:
1. 用户基本信息表

  用户基本信息包括3个数据文件:Training_Master.csv(训练集)、Test_Master.csv(测试集)、Test_Master_result.csv(测试结果集)。训练集共109个字段,测试集共108个字段(少了标签字段target),测试结果集共3个字段。用户基本信息表的相干字段说明如下:
字段名称说明是否含有缺失值Idx用户ID否UserInfo_i用户基本信息是Education_Info_i用户学历信息 ,Education_Info_1到Education_Info_24是WeblogInfo_i用户网页登录信息,WeblogInfo_1到WeblogInfo_57是SocalNetwork_i用户交际网络信息,SocalNetwork_1到SocalNetwork_17是ListingInfo乞贷成交时间否target用户是否逾期否   Training_Master.csv(训练集)有3万条记载,部分数据截图如下:


  Test_Master.csv(测试集)有19999条记载,部分数据截图如下:

Test_Master_result.csv(测试结果集)的部分数据截图如下:

2.用户登录信息表

  用户登录信息数据包括Training_LogInfo.csv(训练集)、Test_LogInfo.csv(测试集),都是5个字段,字段寄义如下:
字段名称说明是否含有缺失值Idx用户ID否ListingInfo乞贷成交时间否LogInfo1用户登录操纵代码否LogInfo2用户登录操纵类别否LogInfo3用户登录时间否   训练集有580551条记载,数据集有385880条记载,字段都一样,部分数据截图如下:

3. 用户更新信息表

  用户更新信息包括Training_UserUpdate.csv(训练集)、Test_UserUpdate.csv(测试集),都是4个字段,训练集有372463条记载,测试集有248832条记载,字段寄义如下表所示。
字段名称说明是否含有缺失值Idx用户ID否ListingInfo乞贷成交时间否UserupdateInfo1用户更新信息内容否UserupdateInfo2用户更新信息时间否   数据集部分数据截图如下:

三、信用贷款风险分析

1. 加载数据到Hive仓库

(1)启动Hadoop(start-all.sh),将7个数据集上传到HDFS的/data/credit目录下:

(2)创建Hive数据库
  打开Linux终端,启动Hive shell,创建Hive数据库creditdb。
create database creaditdb;

(3)启动spark集群
  启动spark集群:cd /usr/local/spark-3.4.3-bin-hadoop3/sbin; ./start-all.sh
打开IDEA,编写代码,将7个数据集导入Hive的数据库creditdb中。先创建一个字典变量,设置CSV文件名与Hive表名的对应关系,键为CSV文件名,值为Hive表名(小写)。代码如下:
  1. from pyspark import SparkConf
  2. from pyspark.ml import Pipeline
  3. from pyspark.ml.classification import GBTClassifier
  4. from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  5. from pyspark.ml.feature import StringIndexer, VectorAssembler, PCA, VectorIndexer, IndexToString
  6. from pyspark.sql import SparkSession, Row, Window, functions as F
  7. from pyspark.sql.functions import *
  8. import matplotlib.pyplot as plt
  9. from pyspark.sql.types import IntegerType, DoubleType
  10. import pandas as pd
  11. import matplotlib
  12. # 设置一个支持GUI的后端,用于在IDEA绘图
  13. matplotlib.use('TkAgg')  # 或者 'Qt5Agg', 'GTK3Agg' 等,用于在IDEA绘图
  14. import matplotlib.pyplot as plt
  15. plt.rcParams['font.sans-serif'] = ['SimHei']
  16. plt.rcParams['axes.unicode_minus'] = False
  17. # ==================== 写数据到MySQL的函数======================
  18. # 封装将pyspark的dataframe写入mysql数据库的函数
  19. from pyspark.sql import DataFrame
  20. import pymysql
  21. def write_to_mysql(df: DataFrame, url: str, properties: dict, table_name: str):
  22.     """
  23.     将DataFrame写入MySQL数据库的指定表。检查表是否存在,存在则先删除表
  24.       参数:
  25.     - df: 要写入的DataFrame。
  26.     - url: MySQL数据库的JDBC URL。
  27.     - properties: 包含数据库连接信息的字典(user, password, driver)。
  28.     - table_name: 要写入的MySQL表名。
  29.     - mode: 写入模式('append'、'overwrite'等)。
  30.     """
  31.     conn = pymysql.connect(host=url.split('/')[2].split(':')[0],
  32.                            port=int(url.split('/')[2].split(':')[1].split('/')[0]),
  33.                            user=properties['user'],
  34.                            password=properties['password'],
  35.                            database=url.split('/')[-1],
  36.                            charset='utf8mb4',
  37.                            cursorclass=pymysql.cursors.DictCursor)
  38.     try:
  39.         with conn.cursor() as cursor:
  40.             # 检查表是否存在
  41.             cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
  42.             if cursor.fetchone():
  43.                 # 如果表存在,则删除它
  44.                 cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
  45.                 conn.commit()
  46.                 print(f"删除表:{table_name}")
  47.     finally:
  48.         conn.close()
  49.         # 写数据到MySQL
  50.     df.coalesce(1).write.format('jdbc') \
  51.         .option('url', url) \
  52.         .option('dbtable', table_name) \
  53.         .option('user', properties['user']) \
  54.         .option('password', properties['password']) \
  55.         .option('driver', properties['driver']) \
  56.         .option('mode', 'append') \
  57.         .save()
  58.     print(f'Spark将数据写入MySQL的{table_name}表完成。')
  59. # ---------使用示例-----------
  60. url = "jdbc:mysql://192.168.126.10:3306/test"
  61. properties = {
  62.     "user": "root",
  63.     "password": "123456",
  64.     "driver": "com.mysql.jdbc.Driver"  # MySQL驱动
  65. }
  66. # write_to_mysql(final_join, url, properties, 'movie')  # 将movie写入MySQL。
  67. # =============函数结束================================
  68. # 调用.enableHiveSupport()来启用对Hive的支持,这样Spark就可以访问Hive的metastore,进行表的创建、查询等操作。
  69. conf = SparkConf().setAppName("信用贷款分析").setMaster('spark://192.168.126.10:7077')
  70. spark = SparkSession.builder \
  71.     .config(conf=conf) \
  72.     .enableHiveSupport() \
  73.     .getOrCreate()
  74. spark.sql('show databases').show()
  75. # ---------1. 创建hive数据库,并导入数据--------------------------
  76. # 所有的数据集存储在hdfs://192.168.126.10:9000/data/credit目录下。
  77. # 在Linux终端启动Hive shell,创建数据库:creditDB。命令为: CREATE DATABASE creditDB
  78. path = 'hdfs://192.168.126.10:9000/data/credit'
  79. database = 'creditDB'
  80. # 创建一个字典file_table_name,将CSV文件名映射到Hive中的表名。
  81. file_table_name = {
  82.     'Test_LogInfo.csv': 'loginfo_test',
  83.     'Test_Master.csv': 'masterinfo_test',
  84.     'Test_Master_result.csv': 'test_master_result',
  85.     'Test_UserUpdate.csv': 'userupdate_test',
  86.     'Training_LogInfo.csv': 'loginfo_train',
  87.     'Training_Master.csv': 'masterinfo_train',
  88.     'Training_UserUpdate.csv': 'userupdate_train'
  89. }
  90. # 将数据集csv文件导入creditDB数据库的hive表。
  91. import os
  92. for fname, tbname in file_table_name.items():
  93.     fpath = path + os.sep + fname  # 对于每个文件,构建完整的文件路径fpath
  94.     print('transforming data from:[{:<25}] to [{}]...'.format(fname, tbname))
  95.     data = spark.read.csv(fpath, header=True)  # 读取CSV文件,其中header=True表示CSV文件的第一行是列名。
  96.     # 将DataFrame的列名转换为小写,因为Hive默认列名是小写的,这有助于避免列名不匹配的问题。
  97.     # data.toDF(*[...]) 方法用于根据提供的列名列表创建一个新的DataFrame。
  98.     data = data.toDF(*[col.lower() for col in data.columns])
  99.     # 将DataFrame写入到Hive的指定数据库和表中。如果表已存在,overwrite模式会先删除旧表再创建新表。
  100.     data.write.mode('overwrite').saveAsTable('{}.{}'.format(database, tbname))  # 如:写入表:creditDB.loginfo_test
  101. spark.sql('USE creditDB')  # 选择hive数据库creditDB
  102. spark.sql('SELECT * FROM loginfo_test LIMIT 10 ').show()  # 查看 表loginfo_test前10行
复制代码
  这段代码中,有部分代码是写数据到MySQL的封装函数:write_to_mysql,方便反面将分析结果写入MySQL数据库。接着初始化Spark应用,连接Hive,再将csv数据集导入到Hive表中,相对应的Hive表自动创建。
数据集导入到Hive表的过程如下:

  检察导入到Hive中的masterinfo_train表,如下图所示。

2. 基本信息表masterinfo的训练集和测试集归并

  在数据分析和数据预处理前,将训练集和测试集归并;在构建猜测模型时,再将训练集和测试集分开。
  将基本信息的训练数据集(masterinfo_train)和测试数据集(masterinfo_test)、测试结果集(test_master_result)归并成一张表,为后续的数据分析和数据预处理做准备。
(1)训练集masterinfo_train的目标列target在倒数第2列,将其调到倒数第1列。
  读取hive的masterinfo_train表,复制target列并重命名label,最后将target列删除。
  1. # -----------2.合并训练集和测试集-----------
  2. # 2.1将训练数据集masterinfo_train和测试数据集masterinfo_test及其标签数据集test_master_result合并成一张表,方便统一做数据处理
  3. # masterinfo_train的目标列target在倒数第2列,将其调到倒数第1列
  4. # 读取hive的masterinfo_train表,复制target列并重命名label,最后将target列删除。
  5. masterinfo_train: DataFrame = spark.sql('SELECT  *, target as label FROM creditDB.masterinfo_train').drop('target')
  6. masterinfo_train.show(5)
  7. master_train = masterinfo_train.withColumnRenamed('label', 'target')  # 将label列重命名为target。 处理后的训练集
  8. master_train.show(5)
复制代码
(2)删除测试结果集test_master_result最后一列乞贷时间,统一测试集masterinfo_test的乞贷时间格式
  训练集的乞贷时间格式为:2020/02/21,而测试集的乞贷时间格式为:21/02/2020,需将测试集的时间格式设为和训练集的一样。
  1. # 2.2 删除测试结果集test_master_result最后一列借款时间,统一测试集masterinfo_test的借款时间格式
  2. test_master_result: DataFrame = spark.sql('select idx,target from creditDB.test_master_result')  # 剔除 测试结果集的最后一列:借款时间
  3. # masterinfo_test表的借款时间listinginfo格式为:21/2/2020,调整为2020/2/21,和其他表时间格式保持一致。
  4. masterinfo_test: DataFrame = spark.sql("select *, \
  5.              concat_ws('/',split(listinginfo, '/')[2],\
  6.                            split(listinginfo, '/')[1],\
  7.                            split(listinginfo, '/')[0]) as  time from creditDB.masterinfo_test") \
  8.     .drop('listinginfo') \
  9.     .withColumnRenamed('time', 'listinginfo')  # 调整时间格式
复制代码
(3)数据归并入库,整理后的主表为:masterinfo
  用join方法,将测试集和测试结果集连接归并,再用union()方法和训练集纵向连接归并。最后将归并后的数据集写入Hive数据库的masterinfo表。
  1. # 2.3 数据合并入库
  2. master_test = masterinfo_test.join(test_master_result, 'idx')  # 测试集和测试结果集合并。此时数据集有标签target,和训练集结构无异
  3. masterinfo = master_train.union(master_test)  # 训练集和测试集纵向连接合并
  4. masterinfo.write.mode('overwrite').saveAsTable('creditDB.masterinfo')  # 将合并后的数据集 写入hive数据库,新建表masterinfo
  5. print('合并后数据集的样本数:', masterinfo.count())
  6. print("==============1.主表整理后的数据集:masterinfo================\n")
  7. masterinfo.show(5)
复制代码
3.用户信息完善环境与逾期率的关系探索

  在征信领域,用户信息的完善度越好,其还款意愿就越强。对用户信息表中用户信息的完备程度举行统计,分析用户信息完善程度与逾期之间的关系。
(1)从Hive加载用户信息表masterinfo,并重命名为masterNew
  加载用户信息表masterinfo,重命名为masterInfo,并赋值给新表masterNew ;对于masterNew,除了用户ID列idx和标签列target外,其他列的内容假如是缺失值,则用1填充,否则用0更换。如许的操纵是为了后续计算每行记载的缺失值,即将所有1相加,就是每行缺失值的数量。
  1. # ---- 二、用户信息完善情况与逾期率的关系探索-------
  2. # 1.在征信领域,用户信息的完善度越好,其还款意愿就越强。对用户信息表中用户信息的完整程度进行统计,分析用户信息完善程度与用户信用评级之间的关系。
  3. masterInfo: DataFrame = spark.sql('select * from creditDB.masterinfo')  # PySpark读取hive表:masterinfo
  4. masterNew = masterInfo
  5. # 新建masterNew,idx和target两列不用计算,直接复制列名,其他列:缺失值替换为1,其他值替换为0
  6. for column in masterInfo.columns[1:-1]:
  7.     # 使用withColumn添加新列(或覆盖原列),标记缺失值为1,非缺失值为0
  8.     masterNew = masterNew.withColumn(column, when(isnull(col(column)), 1).otherwise(0))
  9. masterNew.show(5)
复制代码
  处理后的masterNew部分数据如下:

(2)统计masterNew 每行记载缺失值数量
  统计masterNew 表中每行记载缺失值数量。PySpark面向列计算,不擅长行计算。使用F_expr和字符串格式化来构建SQL求和表达式,新增求和列:sum。
  1. # ------2.统计分析--------
  2. # PySpark面向列计算,不擅长行计算。下面提供两种计算行总和的方法,
  3. from pyspark.sql.functions import expr as F_expr
  4. columns_to_sum = [column for column in masterNew.columns if column not in ['idx', 'target']]  # 排除不需要的列
  5. sum_expression = " + ".join(f"{column}" for column in columns_to_sum)  # 使用F_expr和字符串格式化来构建SQL求和表达式
  6. masterNew_row_sums = masterNew.withColumn("sum", F_expr(sum_expression))  # 应用表达式并添加新列
  7. masterNew_row_sums.show(5)
  8. result = masterNew_row_sums.groupby('sum').count()
  9. result = result.orderBy('sum')
  10. result.show()
  11. write_to_mysql(result, url, properties, 'master_nan_counts')  # 将统计结果写入MySQL的test数据库的master_nan_counts表。
  12. result = result.toPandas()
复制代码
  对每行记载的缺失值求和,新增sum列后的数据表:masterNew_row_sums。
  按缺失值总和sum分组,统计每组的用户数量,统计结果如下。

  由图可知,缺失值总数sum为0(没有缺失值)的用户数是416人,缺失值总数为5个的用户数则为23586人。最后,将统计结果写入MySQL的test数据库的master_nan_counts表。
  将统计数据可视化,绘制柱形图如下所示。

  绘图代码如下:
  1. # 绘图
  2. result.plot.bar(
  3.     xlabel='用户信息缺失数量',
  4.     ylabel='用户数量统计',
  5.     width=0.6)
  6. plt.show()
复制代码
(3)信息完善程度与逾期还款的概率关系
  由上图发现,用户信息缺失的数量会合在2~10,因此可将用户缺失信息程度分为3类:[2,4]、[5-7]、[8,10]。分析这三类用户群体的逾期还款比例。
  筛选masterNew_row_sums表中缺失值总数sum在[2~10]之间的记载,新增一列“level”,根据对应的缺失值总数[2,4]、[5-7]、[8,10],添加相应的内容:“2-4”、“5-7“、“8,10”。如下图所示。

  计算三类用户群体的逾期率,计算结果如下所示,缺失值数量在8~10的用户,逾期率是最高的,达到8.3%。

  最后,绘制柱形图将逾期率可视化。

  代码如下所示。
  1. # 3. 探索信息完善程度与逾期还款的概率的关系
  2. # 方法一
  3. filter_result = masterNew_row_sums.select('target', 'sum').filter((col('sum') > 2) & (col('sum') <= 10))
  4. filter_result.show()
  5. filter_result_level = filter_result.withColumn(
  6.     'level',
  7.     when(col('sum').between(2, 4), '2-4') \
  8.         .when(col('sum').between(5, 7), '5-7') \
  9.         .when(col('sum').between(8, 10), '8-10'))  # 添加一列level
  10. filter_result_level.show()
  11. re1 = filter_result_level.filter(col('target') == '1').groupBy('level').count()  # 按level分组统计逾期的人数
  12. re1.show()
  13. re1 = re1.toPandas().set_index('level')  # 转换为pandas dataframe,并设置level为索引
  14. re2 = filter_result_level.groupBy('level').count()  # 按level分组统计人数
  15. re2 = re2.toPandas().set_index('level')
  16. print(re2)
  17. result = (re1 / re2).sort_index()  # 逾期人数/总人数,按索引level排序。
  18. print(result)
  19. result.plot.bar(
  20.     xlabel='用户信息缺失数量',
  21.     ylabel='逾期率',
  22.     width=0.7)
  23. plt.show()
  24. master_nan_delinquency = spark.createDataFrame(result)  # 将pandas DataFrame转为PySpark DataFrame。
  25. write_to_mysql(master_nan_delinquency, url, properties, 'master_nan_delinquency')  # 将统计结果写入MySQL的test数据库的master_nan_delinquency表。
复制代码
为了演示Flask和Pyecharts的数据可视化效果,这里还统计了每个省份的客户人数,并将结果存入MySQL数据库test的province表。代码如下。
  1. # 统计每个省份的客户人数
  2. province = masterInfo.groupBy('userinfo_19').count()
  3. write_to_mysql(province, url, properties, 'province')  # 将统计结果写入MySQL的test数据库的province表。
  4. province.show()
复制代码
4. 用户信息修改环境与逾期率的关系(userupdate表)

  在征信领域,信息修改频率同样会影响用户信用评级。因此,有必要对所有乞贷用户的更新信息日期和乞贷成交日期的分布环境举行分析,探索用户信息修改环境与逾期率的关系。
(1)读取数据
  读取的用户更新信息表如下所示:

  可以看出,用户更新信息表有重复记载。
代码如下:
  1. # 4.用户信息修改情况与逾期率的关系探索
  2. # 在征信领域,信息修改频率同样回影响用户信用评级
  3. # 分析所有借款用户的更新信息日期和借款成交日期的分布情况,探索用户信息修改情况与逾期率的关系。
  4. # 4.1 读取数据
  5. userupdate_test: DataFrame = spark.sql(
  6.     "select idx,userupdateinfo2 from creditDB.userupdate_test")  # 显式地给变量添加DataFrame类型注解,帮助IDE理解变量的类型
  7. userupdate_test.show(5)
  8. userupdate_train: DataFrame = spark.sql("select idx,userupdateinfo2 from creditDB.userupdate_train")
  9. userupdate_train.show(5)
复制代码
(2)归并训练集userupdate_train和测试集userupdate_test为userupdate,统计信息更新次数
  用户信息更新的数据是按天记载,假如同一天有多次修改信息,表中就会出现重复数据。用户在同一天内对信息的修改视为一次修改。因此,需对表中数据去重,然后统计用户修改信息的次数,再将统计结果与用户基本信息数据归并,最后对用户修改次数举行统计。
训练集和测试集归并,去重,并按用户ID分组统计更新天数(即更新次数),如下图所示。

  归并后的表与用户基本信息表masterInfo(选择该表用户ID列和标签target列)连接成新表master_update,新增一列“updatedays”,数据为“count”列数据,再删除“count”列。结果如下图所示。

  将表master_update按更新天数分组,统计用户idx数量,并添加统计结果,列名为:all_count。

  将统计结果写入MySQL数据库test的updatedays_count表。将统计数据绘制成柱形图,如下所示。

  1. # 4.2 对信息修改表中的信息更新次数进行统计
  2. # 用户信息更新数据是按天记录,同一天多次修改信息,表中就会出现重复数据。用户在同一天内对信息的修改视为一次修改。
  3. # 因此,需对表中数据去重,然后统计用户修改信息的次数,再将统计结果与用户基本信息数据合并,最后对用户修改次数进行统计。
  4. userupdate = userupdate_test.union(userupdate_train) \
  5.     .distinct() \
  6.     .groupBy('idx') \
  7.     .count()  # 两表纵向拼接,去重,按idx分组,统计每组(每个用户)的信息更新次数。
  8. master_update = userupdate.join(masterInfo.select('idx', 'target'), 'idx') \
  9.     .withColumn('update_days', userupdate['count']) \
  10.     .drop('count')  # 两表连接,选择idx,target,以idx字段连接;新增一列upadate_days;删除count列。
  11. master_update.show(5)
  12. updatedays_count = master_update.groupBy('update_days') \
  13.     .agg(F.count('idx')) \
  14.     .withColumnRenamed('count(idx)', 'all_count') \
  15.     .orderBy('update_days')  # 按更新天数update_days分组;统计用户数idx;重新命名列为all_count;按update_days排序。
  16. updatedays_count.show(5)
  17. write_to_mysql(updatedays_count, url, properties, 'update_days_count')  # 将统计结果写入MySQL的test数据库的updatedays_count表。
  18. # 绘图
  19. pd_updatedays_count = updatedays_count.toPandas()
  20. pd_updatedays_count.plot.bar(
  21.     x='update_days',
  22.     y='all_count',
  23.     xlabel='用户修改信息次数',
  24.     ylabel='修改信息总人数'
  25. )
  26. plt.show()
复制代码
(3)探索用户修改信息的次数与逾期率之间的关系
  上图表明,用户修改信息的次数会合在1~5。因此,对修改次数会合在1-5的用户举行分析,探索用户修改信息的次数与逾期率的关系。
  对表master_update,筛选信息更新次数小于5的数据,按更新次数update_days和标签target举行分组,统计用户数量,统计结果作为新的列:target_count。结果赋值给新表(5天更新次数表):daysUnder5_final。
  将5天更新次数表daysUnder5_final与更新次数表updatedays_count按update_days连接,获得总更新次数(all_count),然后筛选逾期target=1的用户,结果赋值给新表daysUnder5_final。

  计算逾期人数/总人数的比率,添加列名rate,计算结果赋值给updatedays_overdue表,最后将数据写入MySQL的test数据库的update_days_overdue表。

  绘制用户修改信息次数与逾期率的柱形图,如下图所示。

代码如下:
  1. # 4.3 探索用户修改信息的次数与逾期率的关系
  2. # 从上图可知,用户修改信息的次数集中在1~5。因此对修改次数集中在1~5的用户进行重点分析,探索用户修改信息次数与逾期率之间的关系。
  3. daysUnder5 = master_update.filter('update_days<=5') \
  4.     .groupBy('update_days', 'target') \
  5.     .agg(F.count('idx')) \
  6.     .withColumnRenamed('count(idx)', 'target_count')  # 筛选更新次数小于5次;按更新次数和目标分组;统计用户总数;新增一列分组用户总数:target_count
  7. daysUnder5.orderBy('update_days').show(5)
  8. daysUnder5_final = daysUnder5.join(updatedays_count, 'update_days')  # 两表连接
  9. daysUnder5_final = daysUnder5_final.filter('target==1')  # 筛选逾期target=1的用户
  10. daysUnder5_final.orderBy('update_days').show(5)
  11. updatedays_overdue = daysUnder5_final.selectExpr('update_days', 'target_count/all_count as rate') \
  12.     .orderBy('update_days')  # 计算更新次数update_days分组中,逾期人数/总人数的比率
  13. updatedays_overdue.show(5)
  14. write_to_mysql(updatedays_overdue, url, properties, 'update_days_overdue')  # 将统计结果写入MySQL的test数据库的update_days_overdue表。
  15. # 绘图
  16. pd_updatedays_overdue = updatedays_overdue.toPandas()
  17. pd_updatedays_overdue.plot.bar(
  18.     x='update_days',
  19.     y='rate',
  20.     xlabel='用户修改信息次数',
  21.     ylabel='逾期率'
  22. )
  23. # plt.show()
复制代码
5. 用户乞贷月份与逾期率的关系分析

  用户资金状况随差别时间节点(诸如春节期间资金普遍紧张等)发生颠簸,这些变化深刻地影响着用户的贷款决策。为缓解财务压力,部分用户会选择在特定时期申请贷款作为应急措施。鉴于此,深入分析用户基本信息中的贷款月份数据显得尤为关键,这有助于我们洞察用户贷款行为的季候性模式,为金融机构提供更加精准的市场洞察和计谋支持。
(1)读取用户基本信息表masterInfo,提取日期字段中的月份信息
  读取用户基本信息表masterInfo,提取乞贷时间listinginfo字段的月份。按’month’, 'target’分组,统计差别月份的逾期/未逾期记载总数。

(2)统计差别月份的逾期率
  按月份分组,统计用户总人数。

  将month_count与month_count_sum归并后,再筛选逾期用户,得到按月分组的逾期人数和总人数(最后一列)。

  计算每月的逾期率。将计算结果写入MySQL的test数据库的month_overdue表。

  绘制乞贷月份和逾期率的柱形图。

  1. # -----5.用户借款月份与逾期率的关系探索
  2. # 考虑到不同时期,用户的资金状况(如过年资金紧张等)可能会影响用户的贷款行为,某些用户会为了应对资金压力而转向贷款。
  3. # 因此,需要对用户基本信息中的借款月份进行探索性分析。
  4. # 5.1 提取日期字段中的月份信息
  5. # masterInfo: DataFrame = ...  # 为masterInfo做类型注解
  6. master_month = masterInfo.selectExpr('idx', "split(listinginfo,'/')[1] as month", 'target')  # 提取月份
  7. month_count = master_month.groupBy('month', 'target').count()  # 统计不用月份的逾期记录数
  8. month_count.orderBy('month').show(5)  # 此时month字段是字符串类型
  9. # 统计不用月份的逾期率
  10. # 按月份统计用户总人数和预期还款用户人数,统计不同月份的逾期率。
  11. month_count_sum = month_count.groupBy('month') \
  12.     .agg(F.sum('count')) \
  13.     .withColumnRenamed('sum(count)', 'sum_count')  # 按月份分组,统计用户总人数
  14. month_count_final: DataFrame = month_count.join(month_count_sum, 'month')
  15. month_count_final.show(5)  # 观察结果
  16. month_count_final = month_count_final.filter('target==1')  # 逾期还款的统计结果
  17. month_count_final.show(5)  # 与上面的结果有什么区别。
  18. month_overdue = month_count_final \
  19.     .selectExpr('cast(month as int)', 'count/sum_count as rate') \
  20.     .orderBy('month')  # 计算逾期率,将月份month设为整型;按月份排序。
  21. month_overdue.show(5)
  22. write_to_mysql(month_overdue, url, properties, 'month_overdue')  # 将统计结果写入MySQL的test数据库的month_overdue表。
  23. # 绘制 借款月份和逾期率的柱形图
  24. df_month_overdue = month_overdue.toPandas()
  25. df_month_overdue.plot.bar(
  26.     x='month',
  27.     y='rate',
  28.     xlabel='借款月份',
  29.     ylabel='逾期率'
  30. )
  31. # plt.show()
  32. # 从柱形图可以发现,3、4、5、11、12月份用户逾期还款的概率明显高于其他月份。
复制代码
四、数据预处理

1.计算用户信息缺失个数,用乞贷月份构建新特征(nullcount表)

(1)计算用户信息缺失个数
  这部分工作在“3.用户信息完善环境与逾期率的关系探索”中已经完成,结果保存在masterNew_row_sums表中,如下图所示。


(2)提取月份的表master_month
  前面已经完成,如下图所示。

(3)选择masterNew_row_sums表的’idx’, ‘target’, 'sum’字段,并与master_month表归并;重命名列名,结果如下图所示。

  最后将nullcount表存储在Hive数据库的nullcount表中。
2.用户更新信息重建(userupdateprocess)

  用户更新信息表userupdateinfo_1、userupdateinfo_2字段对每一位用户均有多条记载,与masterinfo主表的一位用户一条记载的形式不符。时间格式必要统一。
(1)读取数据
  读取用户更新信息训练集userupdate_train和测试集userupdate_test,将listinginfo1列的数据前面的“_”去掉;将乞贷成交时间listinginfo与更新时间userupdateinfo2时间格式设为-格式;计算两个时间的间隔天数,并重命名列为diff_days。最后两张表纵向归并,结果如下。

  1. # 6.2 用户更新信息重建
  2. # 用户更新信息表userupdateinfo_1、userupdateinfo_2字段对每一位用户均有多条记录,与masterinfo主表的一位用户一条记录的形式不符。时间格式需要统一。
  3. # 将listinginfo1列的数据前面的_去掉;将 借款成交时间listinginfo与更新时间userupdateinfo2时间格式设为-格式;计算两个时间间隔,并重命名列为diff_days;
  4. train_update_query = """  
  5. SELECT   
  6.     idx,   
  7.     LOWER(REGEXP_REPLACE(userupdateinfo1, '_', '')) AS userupdateinfo1,  
  8.     DATEDIFF(  
  9.         TO_DATE(REGEXP_REPLACE(listinginfo, '/', '-')),  
  10.         TO_DATE(REGEXP_REPLACE(userupdateinfo2, '/', '-'))  
  11.     ) AS diff_days,  
  12.     userupdateinfo2  
  13. FROM   
  14.     {}.{}  
  15. """.format('creditDB', 'userupdate_train')
  16. train_update: DataFrame = spark.sql(train_update_query)
  17. train_update.show(5)
  18. test_update_query = """  
  19. SELECT   
  20.     idx,   
  21.     LOWER(REGEXP_REPLACE(userupdateinfo1, '_', '')) AS userupdateinfo1,  
  22.     DATEDIFF(  
  23.         TO_DATE(REGEXP_REPLACE(listinginfo, '/', '-')),  
  24.         TO_DATE(REGEXP_REPLACE(userupdateinfo2, '/', '-'))  
  25.     ) AS diff_days,  
  26.     userupdateinfo2  
  27. FROM   
  28.     {}.{}  
  29. """.format('creditDB', 'userupdate_test')
  30. test_update: DataFrame = spark.sql(test_update_query)
  31. test_update.show(5)
  32. userupdate = train_update.union(test_update)  # 两表纵向拼接合并
复制代码
(2)计算用户更新信息的统计值
  用户更新信息表记载了用户更新操纵的类型、更新操纵的时间、乞贷成交的时间等。


  • 计算乞贷前:乞贷时间与更新时间之差的最大值(最早更新)和最小值。

  1. # ---(1)计算用户更新信息的统计值
  2. # 用户更新信息表记录了用户更新操作的类型、更新操作的时间、借款成交的时间等。
  3. # 计算借款之前 用户最早更新信息的时间  距离 借款成交时间的 天数,用户更新次数,用户更新频率,借款成交时间之前用户更新信息的天数,用户更改特征数目。
  4. update_days = userupdate.select('idx', 'diff_days').distinct() \
  5.     .groupBy('idx') \
  6.     .agg(F.max('diff_days'), F.min('diff_days')) \
  7.     .withColumnRenamed('max(diff_days)', 'first_update') \
  8.     .withColumnRenamed('min(diff_days)', 'last_update')  # 计算借款前:借款与更新时间差的最大值(最早更新)和最小值。
  9. update_days.show(5)
复制代码


  • 统计用户更新次数。

  1. update_counts = userupdate.select('idx', 'diff_days') \
  2.     .groupBy('idx') \
  3.     .agg(F.count('diff_days')) \
  4.     .withColumnRenamed('count(diff_days)', 'update_counts')
  5. update_counts.show(5)
复制代码


  • 计算用户更新频次:更新次数/距离初次更新的天数(first_update)

  1. # 计算用户更新频次
  2. # 更新频率=更新次数/距离首次更新的天数(first_update)
  3. update_frequency = update_days.join(update_counts, on='idx') \
  4.     .selectExpr('idx', 'update_counts/first_update as update_frequency')
  5. update_frequency.show(5)
复制代码


  • 统计乞贷成交时间之前,用户更新信息的天数

  1. # 统计借款成交时间之前,用户更新信息的天数
  2. days_count = userupdate.select('idx', 'userupdateinfo2') \
  3.     .distinct() \
  4.     .groupBy('idx') \
  5.     .agg(F.count('userupdateinfo2')) \
  6.     .withColumnRenamed('count(userupdateinfo2)', 'update_num')
  7. days_count.show(5)
复制代码


  • 统计用户更改的特征数量:更新信息时,大概会修改很多内容,如学历、电话、婚姻状况等,更新内容列为userupdateinfo1。

  1. # 统计用户更改特征数目
  2. update_casts_counts = userupdate.select('idx', 'userupdateinfo1') \
  3.     .distinct() \
  4.     .groupBy('idx') \
  5.     .agg(F.count('userupdateinfo1')) \
  6.     .withColumnRenamed('count(userupdateinfo1)', 'update_cats')  # 每位用户 更改特征的数目
  7. update_casts_counts.show(5)
复制代码
(3)长宽表转换
  长表是指行多列少的表,即一行中的数据量比较少,但行数大。宽表是指列多行少的表,即一行中的数据量较大,但行数少。
  用户更新信息表userupdate中,每行数据记载了用户每次的更新内容和更新时间,每位用户有多条记载,与主表masterinfo的每位用户一条记载的形式不符。因此必要对userupdate_train和userupdate_test举行布局转换,以用户修改的内容为字段,构建宽表,表中每行数据记载一位用户的更新信息。


  • 统计每位用户更新各特征的次数;没有更新的特征用0填充。

  1. # (2) 长宽表转换
  2. # 长表是指行多列少的表,即一行中的数据量较少,行数多。宽表是指列多行少,即一行中的数据量较大,行数少。一行数据可以存放用户的很多信息
  3. # 统计每位用户更新各特征的次数;没有更新的特征用0填充;
  4. update_casts = userupdate.select('idx', 'userupdateinfo1') \
  5.     .groupBy('idx', 'userupdateinfo1') \
  6.     .count()  # 统计每位用户的 修改特征 及次数。
  7. update_casts.show(5)
复制代码


  • 长表 转 宽边。按用户idx分组,用pivot将每位用户的userupdateinfo1列的数据 转为一行,且计算每个值的个数,没有数据用0填充。

  1. update_casts_result = update_casts.groupBy('idx') \
  2.     .pivot('userupdateinfo1') \
  3.     .sum('count').na.fill(0)  # 长表 转 宽边。按用户idx分组,用pivot将每位用户的userupdateinfo1列的数据 转为一行,且计算每个值的个数,没有数据用0填充。
  4. update_casts_result.show(5)
复制代码


  • 多表连接,按idx为唯一标识举行归并,并存储在Hive中。

  1. # 多表连接,按idx为唯一标识进行合并,并存储在Hive中。
  2. result = update_days.join(update_casts_counts, on='idx') \
  3.     .join(update_frequency, on='idx') \
  4.     .join(days_count, on='idx') \
  5.     .join(update_casts_result, on='idx')  # 连表,添加更多字段:first_update,last_update,update_cats,update_frequency,update_num
  6. print("==============2.用户更新信息 整理后的数据集:userupdateprocess============\n")
  7. result.show(5)
  8. result.write.mode('overwrite').saveAsTable('creditdb.userupdateprocess')
复制代码
3. 用户登录信息重建(loginfoprocess)

  用户登录信息表中的loginfo1(登录操纵代码)、loginfo2(登录操纵类别)字段对每一位用户均有多条记载,与主表masterinfo的每位用户一条记载的形式不符。统计用户登录平台 次数、频率、最早登录时间等,用于构建猜测用户贷款风险的用户登录信息特征。
(1) 加载用户登录信息训练集loginfo_train和测试集loginfo_test,转换时间格式:2020/3/5 转为 2020-3-5,用于计算时间间隔。最后将训练集和测试集归并。
   处理前的用户登录信息训练集loginfo_train,如下图所示。

  loginfo1(登录操纵代码)、loginfo2(登录操纵类别)两列归并处理为longinfo2,计算乞贷时间listinginfo与登录时间loginfo3的时间差diff_days。最后将训练集和测试集归并,归并后的用户登录信息数据集为loginfo,如下图所示。

  1. # (1) 将时间格式:2020/3/5 转为 2020-3-5,用于计算时间间隔,完成数据预处理
  2. train_log_query = """  
  3. SELECT   
  4.     idx,   
  5.     CONCAT('log_',REGEXP_REPLACE(loginfo1,'-','c'),'_',loginfo2) as loginfo2,\
  6.     listinginfo,\
  7.     DATEDIFF(REGEXP_REPLACE(listinginfo,'/','-'),REGEXP_REPLACE(loginfo3,'/','-')) as diff_days,\
  8.     loginfo3   
  9. FROM   {}.{}  
  10. """.format('creditDB', 'loginfo_train')
  11. train_log: DataFrame = spark.sql(train_log_query)
  12. train_log.show(10)
  13. test_log_query = """  
  14. SELECT   
  15.     idx,   
  16.     CONCAT('log_',REGEXP_REPLACE(loginfo1,'-','c'),'_',loginfo2) as loginfo2,\
  17.     listinginfo,\
  18.     DATEDIFF(REGEXP_REPLACE(listinginfo,'/','-'),REGEXP_REPLACE(loginfo3,'/','-')) as diff_days,\
  19.     loginfo3   
  20. FROM   {}.{}  
  21. """.format('creditDB', 'loginfo_test')
  22. test_log: DataFrame = spark.sql(test_log_query)
  23. test_log.show(10)
  24. loginfo = train_log.union(test_log)
复制代码
(2)计算用户登录信息的统计值


  • 用户最早登录时间、最晚登录时间,距离乞贷成交时间的天数。max(diff_days)、min(diff_days)
    将表loginfo按用户分组,计算最大时间差max(diff_days),即最早登录时间,计算最小时间差min(diff_days),即最晚登录时间。

      由图可知,用户10096距离乞贷成交时间最早的一次登录,是在乞贷前7天,最晚登录时间是在乞贷前0天(当天)。
  1. # (2)计算用户登录信息的统计值
  2. # 用户最早登录时间、最晚登录时间,距离 借款成交时间 的天数。max(diff_days)、min(diff_days)
  3. first_last_day = loginfo.groupBy('idx') \
  4.     .agg(F.max('diff_days'), F.min('diff_days')) \
  5.     .withColumnRenamed('max(diff_days)', 'first_log') \
  6.     .withColumnRenamed('min(diff_days)', 'last_log')
  7. first_last_day.show(5)
复制代码


  • 用户的登录操纵类别总数
      longinfo2列是用户登录操纵类型,因此,将表loginfo按用户分组,统计loginfo2的数量,就可以得到用户的登录操纵类别总数。

  1. # 用户总的登录类别数目
  2. log_casts = loginfo.select('idx', 'loginfo2') \
  3.     .distinct() \
  4.     .groupBy('idx') \
  5.     .agg(F.count('loginfo2')) \
  6.     .withColumnRenamed('count(loginfo2)', 'log_casts')
  7. log_casts.show(5)
复制代码


  • 用户登录平台的天数
      loginfo3列为用户登录日期,将表loginfo按用户分组,对登录日期去重后计数,就可以得到用户登录平台的天数。

  1. # 用户登录平台的天数
  2. log_num = loginfo.select('idx', 'loginfo3') \
  3.     .distinct() \
  4.     .groupBy('idx') \
  5.     .agg(F.count('loginfo3')) \
  6.     .withColumnRenamed('count(loginfo3)', 'log_num')
  7. log_num.show(5)
复制代码


  • 用户第一次登录平台(最早登录)之后,每一天登录平台的频率。
      登录平台的频率log_frequency=用户登录平台的总天数log_num/最早登录平台的天数first_log 。

  1. # 用户第一次登录平台之后,每一天登录平台的频率。
  2. log_frequency = log_num.join(first_last_day, on='idx') \
  3.     .selectExpr('idx', 'log_num/first_log as log_frequency')
  4. log_frequency.show(5)
复制代码
(3)长宽表转换
  前面的用户登录信息表,字段很少,每行的信息量很小;每一位用户有多条记载,与目标masterinfo的每位用户一条记载的形式不符。以用户操纵类型为字段,构建新表(宽表)。
以idx字段为唯一标识,统计每位用户登录操纵类型的次数。如下图所示。

  上图中,用户57826的操纵类型log_c4_6的次数是10次。
  用pivot() 函数将多分类变量转换为多个列。即以idx字段为唯一标识,统计每位用户登录操纵类型loginfo2的次数,假如没有相应的操纵类型次数,用0填充。最后将结果存入Hive的loginfoprocess表中。

  1. # 构建基于宽表的用户登录信息表
  2. # 前面的用户登录信息表,字段很少,每行的信息量很小;每一位用户有多条记录,与目标masterinfo的每位用户一条记录的形式不符。
  3. # 以用户操作类型为字段,构建新表(宽表)。以idx字段为唯一标识,统计每位用户登录操作类型的次数。如果没有相应的操作类型次数,用0填充。
  4. log_casts_counts = loginfo.select('idx', 'loginfo2') \
  5.     .groupBy('idx', 'loginfo2') \
  6.     .count()
  7. log_casts_counts.show(5)
  8. log_casts_result = log_casts_counts.groupBy('idx') \
  9.     .pivot('loginfo2') \
  10.     .sum('count') \
  11.     .na.fill(0)
  12. print("==============3.用户登录信息整理后的数据集:loginfoprocess============\n")
  13. log_casts_result.show(5)
  14. log_casts_result.write.mode('overwrite').saveAsTable('creditDB.loginfoprocess')
复制代码
4.对主表masterinfo做分类数据预处理

  在数据会合,有些字段的内容是分类型数据,如户籍省份(广西、广东、湖南、四川等分类),婚姻状况(已婚、未婚等),必要处理这些分类数据。别的,另有部分数据包含空格,空格在后续的数据分析中大概会造成不可预估的影响。因此,必要对包含空格的数据举行洗濯和处理,将其转换成规范的数据。
(1)读取数据masterinfo,删除空格
  主表masterinfo中有大量的字符型数据,部分数据包含空格,导致字符串无法匹配。如“中国 移动”中间有一个空格,无法和“中国移动”匹配。
  检察主表masterinfo的userinfo_9列有几家通讯公司,发现有些数据包含空格,如下图所示。

  使用trim()函数,对主表masterinfo的字符型列做删除空格处理。userinfo_9列是通讯公司,userinfo_2、userinfo_4、userinfo_8和userinfo_20列是城市。处理完后,再检察userinfo_9列的去重值,结果如下。

  1. # 6.4 -------------分类数据预处理--对主表masterinfo进行处理。用于机器学习------
  2. # (1)读取数据并删除数据中的空格
  3. data = spark.sql('select * from creditdb.masterinfo')  # 从hive读取主表masterinfo
  4. # 查看userinfo_9列的不重复值
  5. data.printSchema()
  6. data.select('userinfo_9').distinct().show()  # 查看有几家通讯公司
  7. data_trim = data \
  8.     .withColumn('userinfo_9', F.trim('userinfo_9')) \
  9.     .withColumn('userinfo_2', F.trim('userinfo_2')) \
  10.     .withColumn('userinfo_4', F.trim('userinfo_4')) \
  11.     .withColumn('userinfo_8', F.trim('userinfo_8')) \
  12.     .withColumn('userinfo_20', F.trim('userinfo_20'))  # 这几个字段是通讯公司、城市
  13. data_trim.select('userinfo_9').distinct().show()  # 查看 删除空格后的数据
复制代码
(2)重复字段归并
  userinfo_7和userinfo_19两列是省份。

  添加一列:diffprov,比较userinfo_7和userinfo_19两列的省份是否一致,假如省份一致为11,不一致为0。

  1. # (2) 重复字段合并
  2. data_trim.select('idx', 'userinfo_7', 'userinfo_19').show(5)  # 这两个是省份。比较两个省份是否一致。
  3. # 两个数据任何一个为空值,则为0,如果两个数据相同为1,不同为0
  4. udf_trans_diff = F.udf(
  5.     lambda arg1, arg2: -1 if not ((len(arg1) > 0) and (len(arg2) > 0)) else (1 if (arg1 in arg2) else 0))
  6. diffprov = data_trim.withColumn('diffprov', udf_trans_diff(data_trim['userinfo_7'], data_trim['userinfo_19']))
  7. diffprov.select('idx', 'userinfo_7', 'userinfo_19', 'diffprov').show(5)  # 比较两个省份是否一致。
复制代码
  有的城市名 含有“市”,有的没有,把“市”去掉。与城市有关的列为:userinfo_2、userinfo_4、userinfo_8、userinfo_20。

  把“市”去掉,再将4列的值两两比较,假如两个值一致(城市 雷同)为1,差别为0,并添加相应的列名userinfodiff_2_20等。

  1. # 有的城市名 含有“市”,有的没有。把“市”去掉。
  2. keystr = '市'
  3. udf_trans_del = F.udf(lambda arg: arg.replace(keystr, '') if ((arg is not None) and (keystr in arg)) else arg)
  4. tmp_data_transform = diffprov \
  5.     .withColumn('userinfo_2', udf_trans_del(F.decode('userinfo_2', 'UTF-8'))) \
  6.     .withColumn('userinfo_4', udf_trans_del(F.decode('userinfo_4', 'UTF-8'))) \
  7.     .withColumn('userinfo_8', udf_trans_del(F.decode('userinfo_8', 'UTF-8'))) \
  8.     .withColumn('userinfo_20', udf_trans_del(F.decode('userinfo_20', 'UTF-8')))  # 去除这四个字段城市名中的“市”
  9. udf_city_cmp = F.udf(lambda arg1, arg2: 1 if (arg1 == arg2) else 0)
  10. city = tmp_data_transform.select('idx', 'userinfo_2', 'userinfo_4', 'userinfo_8', 'userinfo_20')
  11. city_addCol = city \
  12.     .withColumn('userinfodiff_2_20', udf_city_cmp('userinfo_2', 'userinfo_20')) \
  13.     .withColumn('userinfodiff_2_4', udf_city_cmp('userinfo_2', 'userinfo_4')) \
  14.     .withColumn('userinfodiff_2_8', udf_city_cmp('userinfo_2', 'userinfo_8')) \
  15.     .withColumn('userinfodiff_4_20', udf_city_cmp('userinfo_4', 'userinfo_20')) \
  16.     .withColumn('userinfodiff_4_8', udf_city_cmp('userinfo_4', 'userinfo_8')) \
  17.     .withColumn('userinfodiff_8_20', udf_city_cmp('userinfo_8', 'userinfo_20')) \
  18.     .drop('userinfo_2', 'userinfo_4', 'userinfo_8', 'userinfo_20')  # 比较两个城市,相同城市为1,不同为0。删除四个原始字段。
  19. print("比较两个城市,删除userinfo_2、userinfo_4、userinfo_8、userinfo_20字段:city_addCol\n")
  20. city_addCol.show(5)
复制代码
5. 字符串字段编码处理(encodeprocess)

  PySpark的机器学习模型只能处理数值型数据,因此必要将字符型数据转换为数值型数据。别的,假如有些类别占类别总数的比例比较小,可以举行归并,以淘汰类别数量。
(1)界说处理函数:col_percentage_calc(),将类别占比小于0.002的数据归并为"other"
  将类别占总数的比例阈值设置为0.002,类别占总数的比例小于该值时,则归并为"other"。界说一个udf()函数:col_percentage_calc,实现该功能。示例:计算指定字段userinfo_8的类别占比,计算结果的前5条记载如下图所示。

  由图可知,用户10001所在城市为:深圳,而在深圳的用户数为1336个,深圳占所有用户所在城市的比例为:0.02672。用户1007所在的城市用户数为34个,占比为:0.00068,占比小于0.002,所以将该城市设为“other”。
  1. # ---6.5 字符串字段编码处理------------------------------------
  2. # 将类别占总数的比例阈值设为0.002,类别占总数的比例小于该值的则合并为other。
  3. # udf()函数,用于过滤类别占总数比例小于0.002的数据,将其划分为'other'
  4. udf_rate_filter = F.udf(
  5.     lambda feature, feature_rate: feature if (feature is not None and feature_rate > 0.002) else 'other')
  6. # 字段的值类别比例计算函数,计算指定字段中不同类别的值占比
  7. def col_percentage_calc(fmdata, featurecol):
  8.     if fmdata is None:
  9.         raise ValueError("DataFrame cannot be None")
  10.     cols = ['idx', featurecol]
  11.     feature_count_name = featurecol + '_count'  # 字段里面 类别的总数
  12.     feature_rate_name = featurecol + '_rate'  # 字段 占比
  13.     feature_count = fmdata.select(cols) \
  14.         .groupBy(featurecol) \
  15.         .agg(F.count('idx').alias(feature_count_name))  # 按字段里面的类别分组,计算使用这一类别的用户数量,即该类别数量
  16.     # feature_count.show(5)
  17.     feature_all_count = fmdata.select(cols).count()  # 某一字段 的记录数(行数)
  18.     tmp_data = feature_count \
  19.         .withColumn(feature_rate_name, feature_count[feature_count_name] / feature_all_count) \
  20.         .withColumn(featurecol, F.trim(feature_count[featurecol]))  # 字段里每一种类别数量/字段总记录数;去除 字段名空格
  21.     # tmp_data.show(5)
  22.     feature_rate = fmdata.select(cols).join(tmp_data, on=featurecol,
  23.                                             how='left')  # 左连接,在原数据集上 添加2列:feature_count_name,feature_rate_name
  24.     # feature_rate.show()
  25.     result = feature_rate \
  26.         .withColumn(featurecol, udf_rate_filter(featurecol, feature_rate[feature_rate_name])) \
  27.         .drop('feature_count_name',
  28.               'feature_rate_name')  # 如果类别占比< 0.002,则标注为other;删除feature_count_name和feature_rate_name
  29.     return result
  30. re = col_percentage_calc(data, 'userinfo_8')
  31. print("------示例:计算指定字段userinfo_8的类别占比----")
  32. re.show(5)
复制代码
(2)界说函数:col_string_indexer( ),将类别型数据转换为数值型数据
  1. # 将类别数据转换为数值型数据
  2. def col_string_indexer(fmdata, featueCol):
  3.     indexer = StringIndexer(inputCol=featueCol, outputCol=featueCol + '_index')
  4.     idx_model = indexer.fit(fmdata)
  5.     result = idx_model.transform(fmdata)
  6.     return result
复制代码
(3)界说函数:dataCacheToHive( ),使用Hive举行缓存
  Spark将主表masterinfo的类别型数据转换为数值型数据的过程中,产生巨大的中间结果,会导致节点内存不足。因此必要使用Hive举行缓存,即:每转换2列数据,将结果存入Hive,再重新读取。
  1. # 使用Hive进行缓存
  2. def dataCacheToHive(fmdata, database, table, count):
  3.     if (len(spark.sql('show tables in {}'.format(database))
  4.                     .filter("tableName=='{}_{}'".format(table, count))
  5.                     .collect()) == 1):
  6.         print('删除表 [{}.{}_{}]....'.format(database, table, count))
  7.         spark.sql('drop table {}.{}_{}'.format(database, table, count))  # 检查表是否存在,如果存在则删除。
  8.     print('保存表 [{}.{}_{}]'.format(database, table, count))
  9.     fmdata.write.saveAsTable('{}.{}_{}'.format(database, table, count))  # 将表保存到Hive中。
  10.     print('更新表[{}.{}_{}]...'.format(database, table, count))
  11.     spark.catalog.refreshTable('{}.{}_{}'.format(database, table, count))   # 刷新Hive元数据中的表信息,确保之前的任何更改都被识别
  12.     print('返回缓存数据(重新读表) ...')
  13.     return spark.read.table('{}.{}_{}'.format(database, table, count)).cache()  # 读取刚刚保存的表,并通过.cache()方法将其缓存
复制代码
(4 )将删除空格、比较两省、删除“市”后的主表tmp_data_transform,与两两城市相比较后的表city_addCol连接

  1. # tmp_data_transform为删除空格,比较两省,删除“市”后的主表;city_addCol为两两城市相比较后的数据表
  2. cleaned_result = tmp_data_transform.join(city_addCol, on='idx').cache()  # 将两两城市比较的结果加入主表masterinfo
  3. print("---------删除空格,比较两省,删除“市”,两两城市比较后的主表:cleaned_result:")
  4. cleaned_result.show()
  5. cleaned_result = cleaned_result.toDF(*[col.lower() for col in cleaned_result.columns]).cache()  # 将字段名小写
复制代码
(5)选取字符型字段
  选取字符型字段,和剩下的字段(大部分是数值型,另有字符型字段通讯公司字段userinfo_9和婚姻状态字段userinfo_22反面单独数值化)。选取的字符型字段如下。

  1. # 确定字符型字段
  2. nanColumns = ['userinfo_7', 'userinfo_8', 'userinfo_2', 'userinfo_4', 'userinfo_19', 'userinfo_20', 'userinfo_23',
  3.               'userinfo_24',
  4.               'education_info2', 'education_info3', 'education_info4', 'education_info6', 'education_info7',
  5.               'education_info8',
  6.               'webloginfo_19', 'webloginfo_20', 'webloginfo_21']
  7. otherColumns = list(set(nanColumns) ^ set(cleaned_result.columns))  # 其他字段(数值型):两个集合之间的差异
  8. other_column_result = cleaned_result.select(otherColumns).cache()  # 其他字段(数值型)
  9. nan_column_result = cleaned_result.select(['idx'] + nanColumns).cache()  # 字符型字段,加上idx
  10. nan_column_result.printSchema()
  11. print("---选择的字符型字段nan_column_result:")
  12. nan_column_result.show(5)
复制代码
(6)将字符型字段转换为数值型字段
  调用前面封装的函数col_percentage_calc( )、col_string_indexer( )、dataCacheToHive( ),将字符型字段转换为数值型字段。处理过程如下:

  最后将数值化处理结果nan_column_result与原来数值型数据other_column_result连接,存储到Hive的表encodeprocess中。

  1. # 调用前面封装的函数col_percentage_calc(),col_string_indexer(),dataCacheToHive(),将字符型字段转换为数值型字段
  2. count = 0
  3. # nan_column_result = cleaned_result.select(['idx'] + nanColumns)
  4. for col in nanColumns:
  5.     print('processing the columns[{}] ....'.format(col))
  6.     col_other_result = col_percentage_calc(nan_column_result, col)  # 计算字段中类别的占比,小于0.002的类型设为other
  7.     col_index_result = col_string_indexer(col_other_result, col).drop(col)  # 将字符型字段 转为 数值型字段
  8.     nan_column_result = nan_column_result.join(col_index_result, on='idx').drop(col)
  9.     count = count + 1
  10.     if count % 2 == 0:
  11.         nan_column_result = dataCacheToHive(nan_column_result, 'creditdb', 'tmp_data', count)  # 处理两列后,写入Hive缓存
  12. nan_column_result = dataCacheToHive(nan_column_result, 'creditdb', 'tmp_data', count + 2)
  13. print("计算字段占比、数值化字段后主表的字符串字段部分:nan_column_result:\n")
  14. nan_column_result.count()
  15. nan_column_result.show(5)
  16. # 将处理后的数据存储到hive数据库的encodeprocess表中
  17. result = nan_column_result.join(other_column_result, on='idx')
  18. print("------计算字段占比、数值化字段后的主表:encodeprocess-----------")
  19. result.show(5)
  20. result.write.mode('overwrite').saveAsTable('creditDB.encodeprocess')
复制代码
6. 分类数据重编码(onehotprocess)

(1)重编码
  在上一节的encodeprocess表中,userinfo9字段是通讯公司,有四个类别:中国移动、中国联通、中国电信和不详。使用One-Hot编码处理,生成新字段。同理,userinfo22字段是婚姻状态,也采用One-Hot编码处理,生成新字段。
  1. # 6.6 分类数据重编码
  2. data: DataFrame = spark.sql('select * from {}.{}'.format('creditdb', 'encodeprocess'))  # 从hive读取表
  3. pd_phone = data.select(['idx', 'userinfo_9']).toPandas()  # 通讯公司:中国移动,中国联通,
  4. pd_phone['userinfo_9_commerce'] = pd_phone['userinfo_9'].apply(lambda v: 1 if v == '中国移动' else 0)
  5. pd_phone['userinfo_9_unicom'] = pd_phone['userinfo_9'].apply(lambda v: 1 if v == '中国联通' else 0)
  6. pd_phone['userinfo_9_telecom'] = pd_phone['userinfo_9'].apply(lambda v: 1 if v == '中国电信' else 0)
  7. pd_phone['userinfo_9_unknown'] = pd_phone['userinfo_9'].apply(lambda v: 1 if v == '不详' else 0)
  8. pd_phone.drop(['userinfo_9'], axis=1, inplace=True)
  9. pd_phone.head(5)
  10. # 将包含marriage字段和idx字段的DataFrame转换为pandas模块的DataFrame类型
  11. pd_marriage = data.select('idx', 'userinfo_22').toPandas()
  12. pd_marriage['userinfo_22_married'] = pd_marriage['userinfo_22'] \
  13.     .apply(lambda v: 1 if v == '初婚' or v == '已婚' or v == '再婚' else 0)
  14. pd_marriage['userinfo_22_unmarried'] = pd_marriage['userinfo_22'] \
  15.     .apply(lambda v: 1 if v == '未婚' else 0)
  16. pd_marriage['userinfo_22_unknown'] = pd_marriage['userinfo_22'] \
  17.     .apply(lambda v: 1 if v == 'D' or v == '不详' else 0)
  18. pd_marriage.drop(['userinfo_22'], axis=1, inplace=True)
  19. pd_marriage.head(5)
复制代码
(2)处理结果写入hive表:onehotprocess
  将处理结果与主表encodeprocess连接后,存储到Hive表onehotprocess中。

  1. print("----将通讯公司分类和婚姻状况分类,写入hive----------")
  2. marriage = spark.createDataFrame(pd_marriage)
  3. phone = spark.createDataFrame(pd_phone)
  4. onehotprocess = data \
  5.     .drop('userinfo_9') \
  6.     .drop('userinfo_22') \
  7.     .join(marriage, on='idx') \
  8.     .join(phone, on='idx')
  9. onehotprocess.write.mode('overwrite').saveAsTable('creditdb.onehotprocess')
  10. print("--数据预处理后的masterinfo主表:onehotprocess-----")
  11. onehotprocess.show(5)
复制代码
7.缺失值处理(tb_nullprocess)

  经过数据预处理后,得到三张新表:onehotprocess(主表,处理后的用户基本信息表)、userupdateprocess(用户更新信息表)、loginfoprocess(用户登录信息表)。
  但表中仍旧存在缺失值,特征列中缺失数据比较大的,删除该列;特征列中缺失数据比较小的,则使用0填充。
(1)读取数据,三表连接归并
  读取三张表:onehotprocess、userupdateprocess、loginfoprocess,根据idx连接归并数据,三张表都包含字段listinginfo,归并时删除重复字段,只保留一个listinginfo字段。
  1. # -----6.7 缺失值处理---------
  2. # 经过数据预处理,主表masterinfo处理后为:onehotprocess,
  3. # 信息更新表updateinfo处理后为:userupdateprocess,
  4. # 登录信息表loginfo处理后为:loginfoprocess
  5. # (1)三张表连接
  6. database = 'creditdb'
  7. tb_onehotprocess = 'onehotprocess'
  8. tb_loginfoprocess = 'loginfoprocess'
  9. tb_userupdateprocess = 'userupdateprocess'
  10. tb_nullprocess = 'nullprocess'
  11. merge_idx = 'idx'
  12. master: DataFrame = spark.sql('select * from {}.{}'.format(database, tb_onehotprocess)) \
  13.     .drop('listinginfo') \
  14.     .drop('label')
  15. userupdate: DataFrame = spark.sql('select * from {}.{}'.format(database, tb_userupdateprocess)) \
  16.     .drop('listinginfo')
  17. loginfo: DataFrame = spark.sql('select * from {}.{}'.format(database, tb_loginfoprocess)) \
  18.     .drop('listinginfo')
  19. data = userupdate.join(loginfo, on=merge_idx).join(master, on=merge_idx)  # 三表连接
  20. all_count = data.count()  # 表的总记录数
  21. print('表的总记录数:', all_count)
  22. data = data.cache()  # 缓存,方便多次读取
  23. filter_columns = data.drop('idx').drop('target').columns  # 去掉用户ID和目标值target字段,剩下的字段作为训练特征
复制代码
(2)每个字段缺失数据统计及处理
  归并之后的表中存在缺失数据,需统计各特征(除了idx和target外)的缺失数据比例。假如缺失值比例大于0.9,则以为该特征的价值不高,删除。假如比例小于0.9,用0填充缺失值。处理过程如下:

  1. # (2)字段缺失数据统计及处理
  2. # 合并之后的表中存在缺失数据,需统计各特征的缺失数据比例,并且将所有特征的数据类型都转换成double类型。
  3. # 如果缺失值比例大于0.9,则认为该特征的价值不高,删除。如果比例小于0.9,用0填充缺失值。
  4. rate_threshold = 0.9
  5. drop_columns = []
  6. cast_columns = []
  7. procssed_count = 0
  8. all_count = len(filter_columns)  # 除了idx和target字段外,字段数量。
  9. print("总列数:", all_count)
  10. for col in filter_columns:
  11.     procssed_count += 1
  12.     if procssed_count % 10 == 0:
  13.         print('[{}/{}]'.format(procssed_count, all_count))
  14.     null_count = data.select(col).filter(F.col(col).isNull()).count()  # 计算每列的缺失值个数
  15.     rate = null_count / all_count
  16.     if rate > rate_threshold:
  17.         drop_columns.append(col)
  18.     else:
  19.         cast_columns.append(col)
  20.     print('总列数:转换类型列数:删除列数:[{}:{}/{}]' \
  21.           .format(len(data.columns) - 2, len(cast_columns), len(drop_columns)))
复制代码
(3)所有列(除了idx和target外)的数据类型都转换成double型
  除了idx和target外,所有列的数据类型都转换为double型,最后将结果存储到Hive的tb_nullprocess表中。

  1. # 转换数据类型
  2. data = data.drop(*drop_columns)
  3. data = data.select([F.col(column).cast(DoubleType()) for column in cast_columns] + ['idx', 'target'])
  4. data = data.na.fill(0)
  5. data.write.mode('overwrite').saveAsTable('{}.{}'.format(database, tb_nullprocess))
  6. print("----------三表连接以及完全处理后,待模型训练、测试用的数据表:tb_nullprocess---------")
  7. data.show()
复制代码
五、模型构建与评估

1.加载数据、向量化特征、特征提取(降维)

  加载三表连接归并后的数据表tb_nullprocess。选取除’idx’, 'target’以外的所有特征,然后将选取特征向量化。

  表中有206个特征,考虑到特征较多,大概会影响建模效率,因此使用PCA()方法举行特征提取(降维)。特征提取尽大概保留原始数据布局属性的环境下,从 原始特征中寻找最有效、最具代表性的特征,有效淘汰特征的数量,增强后续模型的学习和泛化能力。

  1. # ------三、模型构建与评估--------
  2. # 1.加载数据
  3. database = 'creditdb'
  4. tb_train_master = 'masterinfo_train'
  5. tb_test_master = 'masterinfo_test'
  6. data_master: DataFrame = spark.sql('select * from {}.{}'.format(database, tb_nullprocess))
  7. # 2.特征提取
  8. assembleCols = data_master.drop('idx', 'target').columns  # 选取待向量化特征
  9. # 向量化特征
  10. assembler = VectorAssembler(inputCols=assembleCols, outputCol='features')
  11. assembler_data = assembler.transform(data_master)  # 特征向量化:除了新增字段features外,保留data_master原来字段
  12. assembler_data.show(5)
  13. # 表中有206个特征,考虑到特征较多,可能会影响建模效率,因此使用PCA()方法进行特征提取(降维)。
  14. # 特征提取尽可能保留原始数据结构属性的情况下,从 原始特征中寻找最有效、最具代表性的特征,有效减少特征的数量,增强后续模型的学习和泛化能力。
  15. # 特征降维
  16. pcaModel = PCA(inputCol='features', outputCol='pcaFeatures', k=100).fit(assembler_data)  # 降维到100个特征
  17. df = pcaModel.transform(assembler_data).selectExpr('idx', 'target', 'pcaFeatures', 'features')
  18. df.show(10)
复制代码
2.GBTs建模

  PySpark中的GBTs算法,即Gradient Boosting Trees(梯度提升树),是Spark MLlib库中提供的一种强大的集成学习方法,用于分类和回归任务。梯度提升树通过组合多个弱学习器(通常是决策树)来形成一个强学习器,每个后续模型都尝试改正之前模型的错误。建模流程如下:


  • (1)对标签target编码。假如标签字段为字符型,如:“是”或“否”,这一步很有必要。但假如标签字段为 数值型,也可以省略这一步。设置StringIndexer类的输入字段为target,输出字段自界说为indexedLabel。
  • (2)对特征字段举行向量化。实际上,上一节已经对特征向量化,可以可以省略这一步。没必要对PCA输出的pcaFeatures使用VectorIndexer。可以直接将pcaFeatures作为特征输入到GBTClassifier中。此处为 了说明建模流程,保留这一步。
  • (3)使用GBTClassifier类,创建GTBs建模,设置标签输入字段为indexedLabel,特征输入字段自界说为:indexedFeatures
  • (4)标签字段还原,还原成原来的字符:输入字段为模型自动生成的prediction,设置输出字段自界说为:predictedLabel
  • (5)通过管道,将标签编码、特征向量化、建模和标签还原整合成一个完备的步骤,便于直接用于模型训练和测试。
  1. # -------------3. GBTs建模-----------------
  2. # (1)对标签target编码。输入字段为target,输出字段自定义为indexedLabel。
  3. labelIndexer = StringIndexer(inputCol='target', outputCol='indexedLabel').fit(df)
  4. # (2)对特征字段进行编码,对向量编码。没必要对PCA输出的pcaFeatures使用VectorIndexer。可以直接将pcaFeatures作为特征输入到GBTClassifier中。
  5. featureIndexer = VectorIndexer(inputCol='pcaFeatures', outputCol='indexedFeatures', maxCategories=4).fit(df)
  6. # (3)GTBs建模,设置标签输入字段为indexedLabel,特征输入字段自定义为:indexedFeatures
  7. gbt = GBTClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures', maxIter=10)
  8. # (4)标签字段还原,还原成原来的字符:输入字段为模型自动生成的prediction,设置输出字段自定义为:predictedLabel
  9. labelConverter = IndexToString(inputCol='prediction', outputCol='predictedLabel', labels=labelIndexer.labels)
  10. # (5)通过管道,将预处理,训练和标签还原整合成一个完整的步骤,便于直接用于模型训练和测试。
  11. pipeline = Pipeline().setStages([labelIndexer, featureIndexer, gbt, labelConverter])
复制代码
3.模型训练

  预处理后的三表归并数据集tb_nullprocess,包含了训练数据和测试数据,可以从masterinfo_train表中读取训练数据或测试数据的用户ID号idx,在根据idx从数据集tb_nullprocess中获取训练数据或测试数据。
然后通过管道,使用训练数据训练GBTs模型,得到GBTs模型:gbt_model。
  1. # ------4.训练GBTs模型--------
  2. train_index = spark.sql('select distinct idx from {}.{}'.format(database, tb_train_master))  # 读取原始训练数据的idx号
  3. train_data = df.join(train_index, on='idx')  # 根据idx,就可以提取处理后的对应数据。
  4. # 通过管道,使用训练数据训练GBTs模型,得到GBTs模型:gbt_model
  5. gbt_model = pipeline.fit(train_data)
复制代码
4. 模型评估



  • 和读取训练数据的方法一样,读取测试数据。
  • 使用训练好的模型gbt_model,猜测测试数据,返回一个包含测试结果的数据集predictions。

  • 创建分类评估器evaluator ,设置评估指标为正确率accuracy。
  • 使用评估器evaluator ,对测试数据集的猜测结果predictions举行评估,即对模型评估。返回模型正确率。

  1. # --------5.评估GBTs模型-------------------
  2. test_index = spark.sql('select distinct idx from {}.{}'.format(database, tb_test_master))
  3. test_data = df.join(test_index, on='idx')  # 提取测试数据
  4. predictions = gbt_model.transform(test_data)
  5. predictions.show(10)
  6. evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
  7.                                               predictionCol='prediction',
  8.                                               metricName='accuracy')
  9. accuracy = evaluator.evaluate(predictions)
  10. print('预测的准确性:{}'.format(accuracy))
  11. accuracy_df = spark.createDataFrame([(accuracy,)], ['Accuracy'])  # 创建PySpark的DataFrame
  12. write_to_mysql(accuracy_df, url, properties, 'accuracy_df')  # 将准确率存入MySQL的test数据库的accuracy_df表。
  13. predictions_df = predictions.select('idx', 'target', 'predictedLabel')
  14. write_to_mysql(predictions_df, url, properties, 'predictions_df')  # 将预测结果存入MySQL的test数据库的predictions_df表。
复制代码
六、数据可视化大屏的筹划

  步调测试中绘制的图表仅存在于开发环境或下令行界面中,不太容易分享给非技术用户或团队成员。为了降服这一局限性,一个高效的解决方案是通过开发一个Web应用步调,在网页上绘制图表。如许,用户就可以通过任何标准的Web浏览器来检察图表。别的,还可以将Web应用部署到服务器上,以便用户可以通过互联网访问。
  Flask是一个使用Python编写的轻量级Web应用框架,以其简洁、机动和可扩展的特点而受到开发者的喜爱。
Flask中文版教程
Flask英文版教程
  PyEcharts是一个用于生成ECharts图表的Python库。ECharts是一个使用JavaScript实现的开源可视化库,它提供了丰富的图表类型和强大的数据可视化能力。PyEcharts通过将ECharts与Python相结合,使得开发者可以在Python环境中方便地生成各种图表。
因此,利用Flask构建Web应用框架,结合PyEcharts生成各种数据可视化图表,构建出一个功能丰富的数据可视化平台。用户可以通过浏览器访问该平台,检察各种数据图表和分析结果。
Pyecharts官方文档: https://pyecharts.org/#/zh-cn/intro
pyecharts-gallery,示例: https://gallery.pyecharts.org/#/README
  筹划思路为:读取存储在Linux的MySQL数据库的数据分析结果,再用PyEcharts将数据绘制成图表,最后通过Flask将图表渲染到页面。
  项目标创建过程请参考作者写的另外一篇文章: 链接: Flask+Pyecharts+大数据集群(Linux):数据可视化大屏的实现
   项目布局如下:

  在main.py中,读取MySQL的数据,然后使用PyEcharts配置饼图、柱形图、折线图和舆图等,最后Flask将图表渲染到页面show_pyecharts_05.html。绘制的数据可视化大屏如下所示:


show_pyecharts_05.html代码如下:
  1. import json
  2. import pyecharts.charts
  3. from flask import Flask, render_template
  4. from pyecharts.options import *
  5. from markupsafe import Markup  # 导入 Markup,用于在 Flask 模板中安全地渲染 HTML
  6. from pyecharts import charts
  7. import pandas as pd
  8. from sqlalchemy import create_engine
  9. from pyecharts.charts import Pie, Bar, Line, Liquid
  10. from pyecharts.options import LabelOpts, TitleOpts, LegendOpts
  11. import pymysql
  12. from sqlalchemy import create_engine
  13. import pyecharts.options as opts
  14. from pyecharts.charts import Map
  15. import numpy as np
  16. from pyecharts.components import Table
  17. from pyecharts.options import ComponentTitleOpts
  18. app = Flask(__name__)  # 创建一个 Flask 应用实例
  19. # ---------------4.读取集群的MySQL数据,绘制可视化大屏------------------------
  20. # 自定义函数,读master节点的MySQL数据
  21. def connFun_linux(sql_query):
  22.     engine = create_engine("mysql+pymysql://root:123456@192.168.126.10:3306/test")
  23.     try:
  24.         data = pd.read_sql(sql_query, engine)
  25.         return data
  26.     except Exception as e:
  27.         print(f"An error occurred: {e}")
  28.         raise  # 可选:重新抛出异常以便在外部捕获     '''
  29. # 饼图
  30. def get_pie_linux():
  31.     sql = '''select * from update_days_count where all_count > 100'''
  32.     data = connFun_linux(sql)
  33.     x = [str(days) + '次' for days in data['update_days']]  # 修改次数:x 必须为字符类型
  34.     y = list(data['all_count'])
  35.     data_list = [list(z) for z in zip(x, y)]
  36.     c = (
  37.         Pie()
  38.             .add("", data_list, radius=["40%", "75%"])
  39.             .set_global_opts(title_opts=TitleOpts(title="用户修改信息次数与相对应的人数"),
  40.                              legend_opts=LegendOpts(orient="vertical", pos_top="15%", pos_left="2%"))
  41.             .set_series_opts(label_opts=LabelOpts(formatter="{b}:{c}"))
  42.     )
  43.     return c
  44. # 柱形图
  45. def get_bar_linux():
  46.     sql = '''select * from update_days_overdue  '''
  47.     data = connFun_linux(sql)
  48.     x = list(data["update_days"])
  49.     y = [round(rate * 100, 2) for rate in data['rate']]
  50.     c = (
  51.         pyecharts.charts.Bar()
  52.             .add_xaxis(x)
  53.             .add_yaxis("逾期率", y, label_opts=LabelOpts(formatter="{c}%"))
  54.             .set_global_opts(title_opts=TitleOpts(title="用户修改信息次数与逾期率", subtitle="bar"),
  55.                              xaxis_opts=AxisOpts(axislabel_opts=LabelOpts(rotate=45)), )
  56.     )
  57.     return c
  58. #  折线图
  59. def get_line_linux():
  60.     sql = '''select * from month_overdue  '''
  61.     data = connFun_linux(sql)
  62.     x = [str(num) for num in data['month']]  # x轴必须为字符串类型
  63.     y = [round(rate, 3) for rate in data['rate']]
  64.     c = (
  65.         Line()
  66.             .add_xaxis(x)
  67.             .add_yaxis("逾期率", y,
  68.                        markline_opts=MarkLineOpts(data=[MarkLineItem(type_="average")]),
  69.                        label_opts=LabelOpts(is_show=False), )  # 不显示数据点标签)
  70.             .set_global_opts(title_opts=opts.TitleOpts(title="借款月份与逾期率"))
  71.     )
  72.     return c
  73. # 地图
  74. def get_map_linux():
  75.     sql = '''select * from province'''
  76.     data = connFun_linux(sql)
  77.     province = data[data['userinfo_19'] != '不详']
  78.     max_count = province['count'].max()
  79.     province_name = list(province['userinfo_19'])  # 省份必须含有“省”或“自治区”或“市”
  80.     count = list(province['count'])
  81.     c = (
  82.         Map()
  83.             .add("客户数量", [list(z) for z in zip(province_name, count)], "china")
  84.             .set_global_opts(title_opts=opts.TitleOpts(title="中国地图"),
  85.                              visualmap_opts=opts.VisualMapOpts(max_=4000))
  86.             .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
  87.     )
  88.     return c
  89. # 水球
  90. def get_liquid_linux():
  91.     sql = 'select * from accuracy_df'
  92.     data = connFun_linux(sql)
  93.     pred = np.round(data['Accuracy'].values, 2)[0]
  94.     c = (
  95.         Liquid()
  96.             .add("lq", [pred])
  97.             .set_global_opts(title_opts=TitleOpts(title="Liquid-基本示例"))
  98.     )
  99.     return c
  100. # 表格
  101. def get_table_linux():
  102.     sql = 'select * from predictions_df'
  103.     data = connFun_linux(sql)
  104.     predictions = data[:5]
  105.     predictions_list = predictions.values.tolist()
  106.     headers = ['用户ID', '真实标签', '预测标签']
  107.     rows = predictions_list
  108.     c = (
  109.         Table()
  110.             .add(headers, rows)
  111.             .set_global_opts(
  112.             title_opts=ComponentTitleOpts(title="标签真实值与模型预测结果"))
  113.     )
  114.     return c
  115. @app.route('/show_pyecharts_05')
  116. def show_pyecharts_05():
  117.     pie = get_pie_linux()
  118.     bar = get_bar_linux()
  119.     line = get_line_linux()
  120.     ma = get_map_linux()
  121.     liquid = get_liquid_linux()
  122.     # tab = get_table_linux()
  123.     return render_template("show_pyecharts_05.html",
  124.                            pie_options=pie.dump_options(),
  125.                            bar_options=bar.dump_options(),
  126.                            line_options=line.dump_options(),
  127.                            map_options=ma.dump_options(),
  128.                            liquid_options=liquid.dump_options(),
  129.                            )
  130. if __name__ == "__main__":
  131.     app.run(host='127.0.0.1', port=5000, debug=True)
复制代码
   在前端筹划中,show_pyecharts_05.html设置和初始化多个 ECharts 图表的容器,并通过 Flask 模板渲染机制动态地设置和表现这些图表的选项。代码如下:
  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4.     <meta charset="utf-8" />
  5.     <title>ECharts</title>
  6.     <style >
  7.         /* CSS 样式将放在这里 */
  8.         .chart-container{
  9.             display:flex;
  10.             flex-direction:column;    /* 默认为column,表示垂直布局 */
  11.             justify-content:center;   /* 水平居中 */
  12.             align-items:center;       /* 垂直居中(如果需要的话) */
  13.             height:150vh;             /* 根据需要设置容器高度 */
  14.             width:100%;               /* 占据全屏宽度 */
  15.         }
  16.         .chart-row{
  17.             display: flex;                    /* 每一行都是一个flex容器 */
  18.             justify-content:space-between;    /* 列之间均匀分布 */
  19.             margin-bottom:20px;               /* 行与行之间的间距 */
  20.         }
  21.         .chart-item{
  22.             flex: 1;                         /* 每个flex item占据相等的空间 */
  23.             margin: 0 10px;                  /* 图表之间的间距 */
  24.         }
  25.     </style>
  26.     <!-- 引入刚刚下载的 ECharts 文件 -->
  27.     <script src="/static/echarts.js"></script>
  28.     <script type="text/javascript" src="https://assets.pyecharts.org/assets/v5/echarts.min.js"></script>
  29.     <script type="text/javascript" src="https://assets.pyecharts.org/assets/v5/maps/china.js"></script>
  30.     <script type="text/javascript" src="https://assets.pyecharts.org/assets/v5/echarts-liquidfill.min.js"></script>
  31. </head>
  32. <body>
  33. <!-- 为 ECharts 准备一个定义了宽高的 DOM -->
  34. <div class="chart-container">
  35.     <div class="chart-row">
  36.         <div class="chart-item"  id="pie" style="width: 800px;height:400px;"></div>
  37.         <div class="chart-item" id="bar" style="width: 800px;height:400px;"></div>
  38.     </div>
  39.     <div class="chart-row">
  40.         <div class="chart-item" id="line" style="width: 800px;height:400px;"></div>
  41.         <div class="chart-item" id="barstack" style="width: 800px;height:400px;"></div>
  42.     </div>
  43.     <div class="chart-row">
  44.         <div class="chart-item" id="liquid" style="width: 800px;height:400px;"></div>
  45.         <div class="chart-item" id="table" style="width: 800px;height:400px;"></div>
  46.     </div>
  47. </div>
  48. <script type="text/javascript">
  49.     // 基于准备好的dom,初始化echarts实例
  50.     var pieChart = echarts.init(document.getElementById('pie'));
  51.     var barChart = echarts.init(document.getElementById('bar'));
  52.     var lineChart = echarts.init(document.getElementById('line'));
  53.     var barStackChart = echarts.init(document.getElementById('barstack'));
  54.     var liquidChart = echarts.init(document.getElementById('liquid'));
  55.     // 使用刚指定的配置项和数据显示图表。
  56.     pieChart.setOption({{ pie_options | safe }});
  57.     barChart.setOption({{ bar_options | safe }});
  58.     lineChart.setOption({{ line_options | safe }});
  59.     barStackChart.setOption({{ map_options | safe }});
  60.     liquidChart.setOption({{ liquid_options | safe }});
  61. </script>
  62. </body>
  63. </html>
复制代码
参考资料:
1.PySpark 大数据机器学习入门案例1 :iris+ ML+Logistics分类:https://www.zhihu.com/tardis/bd/art/612626873?source_id=1001
2.pySpark 机器学习库ml入门(参数调优):https://www.jianshu.com/p/20456b512fa7
3.PySpark 逻辑回归(参数调优):https://zhuanlan.zhihu.com/p/461211990
4.Spark ML LR 用 setWeightCol 解决数据不平衡:https://kelun.blog.csdn.net/article/details/103425926
5.Python——机器学习:不平衡数据集常用处理方法和实例:https://blog.csdn.net/weixin_53848907/article/details/135976144
6.决策树分类器(DecisionTreeClassifier):https://blog.csdn.net/qq_66726657/article/details/132470442
7.戴刚,张良均. PySpark大数据分析与应用. 人民邮电出书社,2024.
8.汪明. PySpark实战. 清华大学出书社,2022

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

去皮卡多

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表