9. 大数据集群(PySpark)+Hive+MySQL+PyEcharts+Flask:信用贷款风险分析与 ...

打印 上一主题 下一主题

主题 1036|帖子 1036|积分 3108

文章目录



  • 一、大数据集群先容


    • 1. PySpark简介
    • 2. Hive简介
    • 3. PyEcharts
    • 4. Flask

  • 二、信用贷款数据集先容


    • 1. 用户基本信息表
    • 2.用户登录信息表
    • 3. 用户更新信息表

  • 三、信用贷款风险分析


    • 1. 加载数据到Hive仓库
    • 2. 基本信息表masterinfo的练习集和测试集合并
    • 3.用户信息完善环境与逾期率的关系探索
    • 4. 用户信息修改环境与逾期率的关系(userupdate表)
    • 5. 用户乞贷月份与逾期率的关系分析

  • 四、数据预处置处罚


    • 1.盘算用户信息缺失个数,用乞贷月份构建新特征(nullcount表)
    • 2.用户更新信息重修(userupdateprocess)
    • 3. 用户登录信息重修(loginfoprocess)
    • 4.对主表masterinfo做分类数据预处置处罚
    • 5. 字符串字段编码处置处罚(encodeprocess)
    • 6. 分类数据重编码(onehotprocess)
    • 7.缺失值处置处罚(tb_nullprocess)

  • 五、模型构建与评估


    • 1.加载数据、向量化特征、特征提取(降维)
    • 2.GBTs建模
    • 3.模型练习
    • 4. 模型评估

  • 六、数据可视化大屏的设计
一、大数据集群先容

本案例摆设了3个节点的完全分布式集群,开发环境如下:
节点/组件/安装包
版本
备注
名称节点
192.168.126.10
master
数据节点
192.168.126.11
slave1
数据节点
192.168.126.12
slave2
JDK
jdk-8u281
Java运行环境,Spark的运行需要JDK的支持
Hadoop
hadoop-3.1.4
提供HDFS、Hive运行环境支持。HDFS体系访问端口为:hdfs://192.168.126.10:9000
Hive
hive-3.1.2
数据仓库
PySpark
spark-3.4.3-bin-hadoop3.tgz
Spark集群的master节点的地址和端口为:spark://192.168.126.10:7077
MySQL
5.7.18
存储数据分析效果,端口:3306,用户名:root,密码:123456
python
Python-3.9.0.tgz
3.9.0版本的Python
MySQL Connector
mysql-connector-java-5.1.32-bin.jar
Spark 毗连MySQL的驱动
IntelliJ IDEA
Ultimate 2020.3
编程工具IDEA
1. PySpark简介

Spark集群是一种分布式盘算框架,它基于内存盘算,提供了高效的数据抽象和并行盘算本事,能够处置处罚大规模数据集的批处置处罚和及时处置处罚使命。Spark接纳内存存储中间盘算效果,可减少迭代运算的磁盘I/O,并通过并行盘算有向无环图的优化,使其运行速率比MapReduce快100倍;Spark可以使用Hadoop YARN和Apache Mesos作为其资源管理和调理器,可以从多种数据源读取数据,如HDFS、HBase、MySQL等。
PySpark则是Spark的Python API,允许Python开发者利用Spark的强大功能举行数据处置处罚和分析。本案例接纳PySpark大数据集群做数据分析与发掘,使用HDFS作为文件存储体系。PySpark由多个组件构成,包括Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图盘算库)等。
大数据集群运行时,Spark的管理页面如下图所示:

2. Hive简介

Hive是一款基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。Hive的本质是将Hive SQL(HQL)转换成MapReduce程序,或者Tez、Spark等使命举行运算,使得用户可以通过SQL查询语言来查询Hadoop集群中的数据。
Hive的应用场景


  • 大规模数据分析:Hive能够处置处罚PB级别的大规模数据,通太过布式存储和盘算,提高数据处置处罚速率。
  • 数据仓库:Hive提供了数据仓库的基本功能,如数据界说、数据加载、数据查询、数据分析等,用户可以使用Hive创建数据库、表、分区等结构,以便于管理和查询数据。
  • 日记分析:Hive可以处置处罚大规模的日记数据,如Web日记、应用程序日记等,通过HiveQL举行查询和分析,快速相识用户举动、应用程序运行环境等信息。
Hive的存储与盘算


  • 数据存储:Hive使用HDFS(Hadoop Distributed File System)来存储数据,支持多种数据存储格式,如TextFile、SequenceFile、ORCFile、Parquet等。
  • 盘算引擎:Hive支持多种实验引擎,如MapReduce、Tez和Spark。用户可以根据数据特点和业务需求,选择符合的实验引擎来优化查询性能。
3. PyEcharts

Pyecharts是一款基于Python的开源数据可视化库,它整合了Echarts与Python的优势,使得在Python环境中能够轻松创建出雅观且交互性强的图表。Pyecharts支持多达数十种图表类型,包括折线图、柱状图、散点图、饼图等常见图表,以及地图、热力图、关系图等特色图表。这些丰富的图表类型能够满足不同场景下的数据可视化需求。
通过Pyecharts,可以绘制出如天下各省份GDP数据地图、中国各地区人口统计数据柱状图、股票生意业务数据K线图、销售数据折线图等多种图表,直观地展示数据。
Pyecharts官方文档: https://pyecharts.org/#/zh-cn/intro
PyEcharts-gallery,图体现例: https://gallery.pyecharts.org/#/README
4. Flask

Flask是一个轻量级、机动、可扩展的Web应用框架,使用Python编写,适合用于构建中小型Web应用程序。它提供了基本的路由、模板引擎、URL构建、错误处置处罚等功能,并支持插件和扩展来增强其功能。
Flask提供了一个路由体系,可以将不同的URL路径映射到相应的处置处罚函数上。当用户访问特定的URL时,Flask会调用相应的处置处罚函数,并返回响应。
Flask内置了一个基于Jinja2的模板引擎,开发者可以使用模板来渲染HTML页面。通过将动态数据传递给模板,并使用模板语言来界说页面的结构和样式,开发者可以轻松地创建和管理Web页面的外观和结构。
Flask中文版教程
Flask英文版教程
本案例的分析思绪:Hive作为数据仓库,Spark为盘算引擎,Spark分析效果存储在Hive中,少部分效果存储在MySQL,PyEcharts将MySQL中的分析效果绘制成图表,Flask再将图表渲染到页面做成数据可视化大屏。
二、信用贷款数据集先容

信用贷款是指以乞贷人的信誉发放的贷款,乞贷人不需要提供担保。其特征就是债务人无需提供抵押品或第三方担保,仅凭自己的信誉就能取得贷款,并以乞贷人信用程度作为还款保证。随着金融市场的不断发展,信用贷款作为一种无抵押、无担保的贷款形式,其市场需求日益增长。然而,信用贷款也陪同着较高的风险,因此,对信用贷款风险举行全面、正确的分析成为贷款机构的重要使命。本案例有7个数据集,分别为用户基本信息的练习集、测试集和测试效果集,用户登录信息的练习集和测试集,用户更新信息的练习集和测试集,如下表所示。
名称
说明
Training_Master.csv
用户基本信息表(练习集)
Test_Master.csv
用户基本信息表(测试集)
Test_Master_result.csv
用户基本信息表(测试效果集)
Training_UserUpdate.csv
用户登录信息表(练习集)
Test_UserUpdate.csv
用户登录信息表(测试集)
Training_LogInfo.csv
用户更新信息表(练习集)
Test_LogInfo.csv
用户更新信息表(测试集)
文件截图如下所示:

1. 用户基本信息表

用户基本信息包括3个数据文件:Training_Master.csv(练习集)、Test_Master.csv(测试集)、Test_Master_result.csv(测试效果集)。练习集共109个字段,测试集共108个字段(少了标签字段target),测试效果集共3个字段。用户基本信息表的相关字段说明如下:
字段名称
说明
是否含有缺失值
Idx
用户ID

UserInfo_i
用户基本信息

Education_Info_i
用户学历信息 ,Education_Info_1到Education_Info_24

WeblogInfo_i
用户网页登录信息,WeblogInfo_1到WeblogInfo_57

SocalNetwork_i
用户外交网络信息,SocalNetwork_1到SocalNetwork_17

ListingInfo
乞贷成交时间

target
用户是否逾期

Training_Master.csv(练习集)有3万条记录,部分数据截图如下:


Test_Master.csv(测试集)有19999条记录,部分数据截图如下:

Test_Master_result.csv(测试效果集)的部分数据截图如下:

2.用户登录信息表

用户登录信息数据包括Training_LogInfo.csv(练习集)、Test_LogInfo.csv(测试集),都是5个字段,字段寄义如下:
字段名称
说明
是否含有缺失值
Idx
用户ID

ListingInfo
乞贷成交时间

LogInfo1
用户登录操作代码

LogInfo2
用户登录操作类别

LogInfo3
用户登录时间

练习集有580551条记录,数据集有385880条记录,字段都一样,部分数据截图如下:

3. 用户更新信息表

用户更新信息包括Training_UserUpdate.csv(练习集)、Test_UserUpdate.csv(测试集),都是4个字段,练习集有372463条记录,测试集有248832条记录,字段寄义如下表所示。
字段名称
说明
是否含有缺失值
Idx
用户ID

ListingInfo
乞贷成交时间

UserupdateInfo1
用户更新信息内容

UserupdateInfo2
用户更新信息时间

数据集部分数据截图如下:

三、信用贷款风险分析

1. 加载数据到Hive仓库

(1)启动Hadoop(start-all.sh),将7个数据集上传到HDFS的/data/credit目录下:

(2)创建Hive数据库
打开Linux终端,启动Hive shell,创建Hive数据库creditdb。
create database creaditdb;

(3)启动spark集群
启动spark集群:cd /usr/local/spark-3.4.3-bin-hadoop3/sbin; ./start-all.sh
打开IDEA,编写代码,将7个数据集导入Hive的数据库creditdb中。先创建一个字典变量,设置CSV文件名与Hive表名的对应关系,键为CSV文件名,值为Hive表名(小写)。代码如下:
  1. from pyspark import SparkConf
  2. from pyspark.ml import Pipeline
  3. from pyspark.ml.classification import GBTClassifier
  4. from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  5. from pyspark.ml.feature import StringIndexer, VectorAssembler, PCA, VectorIndexer, IndexToString
  6. from pyspark.sql import SparkSession, Row, Window, functions as F
  7. from pyspark.sql.functions import *
  8. import matplotlib.pyplot as plt
  9. from pyspark.sql.types import IntegerType, DoubleType
  10. import pandas as pd
  11. import matplotlib
  12. # 设置一个支持GUI的后端,用于在IDEA绘图
  13. matplotlib.use('TkAgg')  # 或者 'Qt5Agg', 'GTK3Agg' 等,用于在IDEA绘图
  14. import matplotlib.pyplot as plt
  15. plt.rcParams['font.sans-serif'] = ['SimHei']
  16. plt.rcParams['axes.unicode_minus'] = False
  17. # ==================== 写数据到MySQL的函数======================
  18. # 封装将pyspark的dataframe写入mysql数据库的函数
  19. from pyspark.sql import DataFrame
  20. import pymysql
  21. def write_to_mysql(df: DataFrame, url: str, properties: dict, table_name: str):
  22.     """
  23.     将DataFrame写入MySQL数据库的指定表。检查表是否存在,存在则先删除表
  24.       参数:
  25.     - df: 要写入的DataFrame。
  26.     - url: MySQL数据库的JDBC URL。
  27.     - properties: 包含数据库连接信息的字典(user, password, driver)。
  28.     - table_name: 要写入的MySQL表名。
  29.     - mode: 写入模式('append'、'overwrite'等)。
  30.     """
  31.     conn = pymysql.connect(host=url.split('/')[2].split(':')[0],
  32.                            port=int(url.split('/')[2].split(':')[1].split('/')[0]),
  33.                            user=properties['user'],
  34.                            password=properties['password'],
  35.                            database=url.split('/')[-1],
  36.                            charset='utf8mb4',
  37.                            cursorclass=pymysql.cursors.DictCursor)
  38.     try:
  39.         with conn.cursor() as cursor:
  40.             # 检查表是否存在
  41.             cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
  42.             if cursor.fetchone():
  43.                 # 如果表存在,则删除它
  44.                 cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
  45.                 conn.commit()
  46.                 print(f"删除表:{table_name}")
  47.     finally:
  48.         conn.close()
  49.         # 写数据到MySQL
  50.     df.coalesce(1).write.format('jdbc')
  51.     .option('url', url)
  52.     .option('dbtable', table_name)
  53.     .option('user', properties['user'])
  54.     .option('password', properties['password'])
  55.     .option('driver', properties['driver'])
  56.     .option('mode', 'append')
  57.     .save()
  58.     print(f'Spark将数据写入MySQL的{table_name}表完成。')
  59. # ---------使用示例-----------
  60. url = "jdbc:mysql://192.168.126.10:3306/test"
  61. properties = {
  62.     "user": "root",
  63.     "password": "123456",
  64.     "driver": "com.mysql.jdbc.Driver"  # MySQL驱动
  65. }
  66. # write_to_mysql(final_join, url, properties, 'movie')  # 将movie写入MySQL。
  67. # =============函数结束================================
  68. # 调用.enableHiveSupport()来启用对Hive的支持,这样Spark就可以访问Hive的metastore,进行表的创建、查询等操作。
  69. conf = SparkConf().setAppName("信用贷款分析").setMaster('spark://192.168.126.10:7077')
  70. spark = SparkSession.builder
  71. .config(conf=conf)
  72. .enableHiveSupport()
  73. .getOrCreate()
  74. spark.sql('show databases').show()
  75. # ---------1. 创建hive数据库,并导入数据--------------------------
  76. # 所有的数据集存储在hdfs://192.168.126.10:9000/data/credit目录下。
  77. # 在Linux终端启动Hive shell,创建数据库:creditDB。命令为: CREATE DATABASE creditDB
  78. path = 'hdfs://192.168.126.10:9000/data/credit'
  79. database = 'creditDB'
  80. # 创建一个字典file_table_name,将CSV文件名映射到Hive中的表名。
  81. file_table_name = {
  82.     'Test_LogInfo.csv': 'loginfo_test',
  83.     'Test_Master.csv': 'masterinfo_test',
  84.     'Test_Master_result.csv': 'test_master_result',
  85.     'Test_UserUpdate.csv': 'userupdate_test',
  86.     'Training_LogInfo.csv': 'loginfo_train',
  87.     'Training_Master.csv': 'masterinfo_train',
  88.     'Training_UserUpdate.csv': 'userupdate_train'
  89. }
  90. # 将数据集csv文件导入creditDB数据库的hive表。
  91. import os
  92. for fname, tbname in file_table_name.items():
  93.     fpath = path + os.sep + fname  # 对于每个文件,构建完整的文件路径fpath
  94.     print('transforming data from:[{:<25}] to [{}]...'.format(fname, tbname))
  95.     data = spark.read.csv(fpath, header=True)  # 读取CSV文件,其中header=True表示CSV文件的第一行是列名。
  96.     # 将DataFrame的列名转换为小写,因为Hive默认列名是小写的,这有助于避免列名不匹配的问题。
  97.     # data.toDF(*[...]) 方法用于根据提供的列名列表创建一个新的DataFrame。
  98.     data = data.toDF(*[col.lower() for col in data.columns])
  99.     # 将DataFrame写入到Hive的指定数据库和表中。如果表已存在,overwrite模式会先删除旧表再创建新表。
  100.     data.write.mode('overwrite').saveAsTable('{}.{}'.format(database, tbname))  # 如:写入表:creditDB.loginfo_test
  101. spark.sql('USE creditDB')  # 选择hive数据库creditDB
  102. spark.sql('SELECT * FROM loginfo_test LIMIT 10 ').show()  # 查看 表loginfo_test前10行
复制代码
这段代码中,有部分代码是写数据到MySQL的封装函数:write_to_mysql,方便后面将分析效果写入MySQL数据库。接着初始化Spark应用,毗连Hive,再将csv数据集导入到Hive表中,相对应的Hive表主动创建。
数据集导入到Hive表的过程如下:

查看导入到Hive中的masterinfo_train表,如下图所示。

2. 基本信息表masterinfo的练习集和测试集合并

在数据分析和数据预处置处罚前,将练习集和测试集合并;在构建猜测模型时,再将练习集和测试集分开。
将基本信息的练习数据集(masterinfo_train)和测试数据集(masterinfo_test)、测试效果集(test_master_result)合并成一张表,为后续的数据分析和数据预处置处罚做准备。
(1)练习集masterinfo_train的目的列target在倒数第2列,将其调到倒数第1列。
读取hive的masterinfo_train表,复制target列并重命名label,末了将target列删除。
  1. # -----------2.合并训练集和测试集-----------
  2. # 2.1将训练数据集masterinfo_train和测试数据集masterinfo_test及其标签数据集test_master_result合并成一张表,方便统一做数据处理
  3. # masterinfo_train的目标列target在倒数第2列,将其调到倒数第1列
  4. # 读取hive的masterinfo_train表,复制target列并重命名label,最后将target列删除。
  5. masterinfo_train: DataFrame = spark.sql('SELECT  *, target as label FROM creditDB.masterinfo_train').drop('target')
  6. masterinfo_train.show(5)
  7. master_train = masterinfo_train.withColumnRenamed('label', 'target')  # 将label列重命名为target。 处理后的训练集
  8. master_train.show(5)
复制代码
(2)删除测试效果集test_master_result末了一列乞贷时间,统一测试集masterinfo_test的乞贷时间格式
练习集的乞贷时间格式为:2020/02/21,而测试集的乞贷时间格式为:21/02/2020,需将测试集的时间格式设为和练习集的一样。
  1. # 2.2 删除测试结果集test_master_result最后一列借款时间,统一测试集masterinfo_test的借款时间格式
  2. test_master_result: DataFrame = spark.sql('select idx,target from creditDB.test_master_result')  # 剔除 测试结果集的最后一列:借款时间
  3. # masterinfo_test表的借款时间listinginfo格式为:21/2/2020,调整为2020/2/21,和其他表时间格式保持一致。
  4. masterinfo_test: DataFrame = spark.sql("select *,
  5.          concat_ws('/',split(listinginfo, '/')[2],
  6.                        split(listinginfo, '/')[1],
  7.                        split(listinginfo, '/')[0]) as  time from creditDB.masterinfo_test")
  8. .drop('listinginfo')
  9. .withColumnRenamed('time', 'listinginfo')  # 调整时间格式
复制代码
(3)数据合并入库,整理后的主表为:masterinfo
用join方法,将测试集和测试效果集毗团结并,再用union()方法和练习集纵向毗团结并。末了将合并后的数据集写入Hive数据库的masterinfo表。
  1. # 2.3 数据合并入库
  2. master_test = masterinfo_test.join(test_master_result, 'idx')  # 测试集和测试结果集合并。此时数据集有标签target,和训练集结构无异
  3. masterinfo = master_train.union(master_test)  # 训练集和测试集纵向连接合并
  4. masterinfo.write.mode('overwrite').saveAsTable('creditDB.masterinfo')  # 将合并后的数据集 写入hive数据库,新建表masterinfo
  5. print('合并后数据集的样本数:', masterinfo.count())
  6. print("==============1.主表整理后的数据集:masterinfo================
  7. ")
  8. masterinfo.show(5)
复制代码
3.用户信息完善环境与逾期率的关系探索

在征信领域,用户信息的完善度越好,其还款意愿就越强。对用户信息表中用户信息的完备程度举行统计,分析用户信息完善程度与逾期之间的关系。
(1)从Hive加载用户信息表masterinfo,并重命名为masterNew
加载用户信息表masterinfo,重命名为masterInfo,并赋值给新表masterNew ;对于masterNew,除了用户ID列idx和标签列target外,其他列的内容假如是缺失值,则用1填充,否则用0更换。这样的操作是为了后续盘算每行记录的缺失值,即将所有1相加,就是每行缺失值的数量。
  1. # ---- 二、用户信息完善情况与逾期率的关系探索-------
  2. # 1.在征信领域,用户信息的完善度越好,其还款意愿就越强。对用户信息表中用户信息的完整程度进行统计,分析用户信息完善程度与用户信用评级之间的关系。
  3. masterInfo: DataFrame = spark.sql('select * from creditDB.masterinfo')  # PySpark读取hive表:masterinfo
  4. masterNew = masterInfo
  5. # 新建masterNew,idx和target两列不用计算,直接复制列名,其他列:缺失值替换为1,其他值替换为0
  6. for column in masterInfo.columns[1:-1]:
  7.     # 使用withColumn添加新列(或覆盖原列),标记缺失值为1,非缺失值为0
  8.     masterNew = masterNew.withColumn(column, when(isnull(col(column)), 1).otherwise(0))
  9. masterNew.show(5)
复制代码
处置处罚后的masterNew部分数据如下:

(2)统计masterNew 每行记录缺失值数量
统计masterNew 表中每行记录缺失值数量。PySpark面向列盘算,不善于行盘算。使用F_expr和字符串格式化来构建SQL求和表达式,新增求和列:sum。
  1. # ------2.统计分析--------
  2. # PySpark面向列计算,不擅长行计算。下面提供两种计算行总和的方法,
  3. from pyspark.sql.functions import expr as F_expr
  4. columns_to_sum = [column for column in masterNew.columns if column not in ['idx', 'target']]  # 排除不需要的列
  5. sum_expression = " + ".join(f"{column}" for column in columns_to_sum)  # 使用F_expr和字符串格式化来构建SQL求和表达式
  6. masterNew_row_sums = masterNew.withColumn("sum", F_expr(sum_expression))  # 应用表达式并添加新列
  7. masterNew_row_sums.show(5)
  8. result = masterNew_row_sums.groupby('sum').count()
  9. result = result.orderBy('sum')
  10. result.show()
  11. write_to_mysql(result, url, properties, 'master_nan_counts')  # 将统计结果写入MySQL的test数据库的master_nan_counts表。
  12. result = result.toPandas()
复制代码
对每行记录的缺失值求和,新增sum列后的数据表:masterNew_row_sums。
按缺失值总和sum分组,统计每组的用户数量,统计效果如下。

由图可知,缺失值总数sum为0(没有缺失值)的用户数是416人,缺失值总数为5个的用户数则为23586人。末了,将统计效果写入MySQL的test数据库的master_nan_counts表。
将统计数据可视化,绘制柱形图如下所示。

画图代码如下:
  1. # 绘图
  2. result.plot.bar(
  3.     xlabel='用户信息缺失数量',
  4.     ylabel='用户数量统计',
  5.     width=0.6)
  6. plt.show()
复制代码
(3)信息完善程度与逾期还款的概率关系
由上图发现,用户信息缺失的数量集中在2~10,因此可将用户缺失信息程度分为3类:[2,4]、[5-7]、[8,10]。分析这三类用户群体的逾期还款比例。
筛选masterNew_row_sums表中缺失值总数sum在[2~10]之间的记录,新增一列“level”,根据对应的缺失值总数[2,4]、[5-7]、[8,10],添加相应的内容:“2-4”、“5-7“、“8,10”。如下图所示。

盘算三类用户群体的逾期率,盘算效果如下所示,缺失值数量在8~10的用户,逾期率是最高的,到达8.3%。

末了,绘制柱形图将逾期率可视化。

代码如下所示。
  1. # 3. 探索信息完善程度与逾期还款的概率的关系
  2. # 方法一
  3. filter_result = masterNew_row_sums.select('target', 'sum').filter((col('sum') > 2) & (col('sum') <= 10))
  4. filter_result.show()
  5. filter_result_level = filter_result.withColumn(
  6.     'level',
  7.     when(col('sum').between(2, 4), '2-4')
  8.     .when(col('sum').between(5, 7), '5-7')
  9.     .when(col('sum').between(8, 10), '8-10'))  # 添加一列level
  10. filter_result_level.show()
  11. re1 = filter_result_level.filter(col('target') == '1').groupBy('level').count()  # 按level分组统计逾期的人数
  12. re1.show()
  13. re1 = re1.toPandas().set_index('level')  # 转换为pandas dataframe,并设置level为索引
  14. re2 = filter_result_level.groupBy('level').count()  # 按level分组统计人数
  15. re2 = re2.toPandas().set_index('level')
  16. print(re2)
  17. result = (re1 / re2).sort_index()  # 逾期人数/总人数,按索引level排序。
  18. print(result)
  19. result.plot.bar(
  20.     xlabel='用户信息缺失数量',
  21.     ylabel='逾期率',
  22.     width=0.7)
  23. plt.show()
  24. master_nan_delinquency = spark.createDataFrame(result)  # 将pandas DataFrame转为PySpark DataFrame。
  25. write_to_mysql(master_nan_delinquency, url, properties, 'master_nan_delinquency')  # 将统计结果写入MySQL的test数据库的master_nan_delinquency表。
复制代码
为了演示Flask和Pyecharts的数据可视化效果,这里还统计了每个省份的客户人数,并将效果存入MySQL数据库test的province表。代码如下。
  1. # 统计每个省份的客户人数
  2. province = masterInfo.groupBy('userinfo_19').count()
  3. write_to_mysql(province, url, properties, 'province')  # 将统计结果写入MySQL的test数据库的province表。
  4. province.show()
复制代码
4. 用户信息修改环境与逾期率的关系(userupdate表)

在征信领域,信息修改频率同样会影响用户信用评级。因此,有须要对所有乞贷用户的更新信息日期和乞贷成交日期的分布环境举行分析,探索用户信息修改环境与逾期率的关系。
(1)读取数据
读取的用户更新信息表如下所示:

可以看出,用户更新信息表有重复记录。
代码如下:
  1. # 4.用户信息修改情况与逾期率的关系探索
  2. # 在征信领域,信息修改频率同样回影响用户信用评级
  3. # 分析所有借款用户的更新信息日期和借款成交日期的分布情况,探索用户信息修改情况与逾期率的关系。
  4. # 4.1 读取数据
  5. userupdate_test: DataFrame = spark.sql(
  6.     "select idx,userupdateinfo2 from creditDB.userupdate_test")  # 显式地给变量添加DataFrame类型注解,帮助IDE理解变量的类型
  7. userupdate_test.show(5)
  8. userupdate_train: DataFrame = spark.sql("select idx,userupdateinfo2 from creditDB.userupdate_train")
  9. userupdate_train.show(5)
复制代码
(2)合并练习集userupdate_train和测试集userupdate_test为userupdate,统计信息更新次数
用户信息更新的数据是按天记录,假如同一天有多次修改信息,表中就会出现重复数据。用户在同一天内对信息的修改视为一次修改。因此,需对表中数据去重,然后统计用户修改信息的次数,再将统计效果与用户基本信息数据合并,末了对用户修改次数举行统计。
练习集和测试集合并,去重,并按用户ID分组统计更新天数(即更新次数),如下图所示。

合并后的表与用户基本信息表masterInfo(选择该表用户ID列和标签target列)毗连成新表master_update,新增一列“updatedays”,数据为“count”列数据,再删除“count”列。效果如下图所示。

将表master_update按更新天数分组,统计用户idx数量,并添加统计效果,列名为:all_count。

将统计效果写入MySQL数据库test的updatedays_count表。将统计数据绘制成柱形图,如下所示。

  1. # 4.2 对信息修改表中的信息更新次数进行统计
  2. # 用户信息更新数据是按天记录,同一天多次修改信息,表中就会出现重复数据。用户在同一天内对信息的修改视为一次修改。
  3. # 因此,需对表中数据去重,然后统计用户修改信息的次数,再将统计结果与用户基本信息数据合并,最后对用户修改次数进行统计。
  4. userupdate = userupdate_test.union(userupdate_train)
  5. .distinct()
  6. .groupBy('idx')
  7. .count()  # 两表纵向拼接,去重,按idx分组,统计每组(每个用户)的信息更新次数。
  8. master_update = userupdate.join(masterInfo.select('idx', 'target'), 'idx')
  9. .withColumn('update_days', userupdate['count'])
  10. .drop('count')  # 两表连接,选择idx,target,以idx字段连接;新增一列upadate_days;删除count列。
  11. master_update.show(5)
  12. updatedays_count = master_update.groupBy('update_days')
  13. .agg(F.count('idx'))
  14. .withColumnRenamed('count(idx)', 'all_count')
  15. .orderBy('update_days')  # 按更新天数update_days分组;统计用户数idx;重新命名列为all_count;按update_days排序。
  16. updatedays_count.show(5)
  17. write_to_mysql(updatedays_count, url, properties, 'update_days_count')  # 将统计结果写入MySQL的test数据库的updatedays_count表。
  18. # 绘图
  19. pd_updatedays_count = updatedays_count.toPandas()
  20. pd_updatedays_count.plot.bar(
  21.     x='update_days',
  22.     y='all_count',
  23.     xlabel='用户修改信息次数',
  24.     ylabel='修改信息总人数'
  25. )
  26. plt.show()
复制代码
(3)探索用户修改信息的次数与逾期率之间的关系
上图表明,用户修改信息的次数集中在1~5。因此,对修改次数集中在1-5的用户举行分析,探索用户修改信息的次数与逾期率的关系。
对表master_update,筛选信息更新次数小于5的数据,按更新次数update_days和标签target举行分组,统计用户数量,统计效果作为新的列:target_count。效果赋值给新表(5天更新次数表):daysUnder5_final。
将5天更新次数表daysUnder5_final与更新次数表updatedays_count按update_days毗连,获得总更新次数(all_count),然后筛选逾期target=1的用户,效果赋值给新表daysUnder5_final。

盘算逾期人数/总人数的比率,添加列名rate,盘算效果赋值给updatedays_overdue表,末了将数据写入MySQL的test数据库的update_days_overdue表。

绘制用户修改信息次数与逾期率的柱形图,如下图所示。

代码如下:
  1. # 4.3 探索用户修改信息的次数与逾期率的关系
  2. # 从上图可知,用户修改信息的次数集中在1~5。因此对修改次数集中在1~5的用户进行重点分析,探索用户修改信息次数与逾期率之间的关系。
  3. daysUnder5 = master_update.filter('update_days<=5')
  4. .groupBy('update_days', 'target')
  5. .agg(F.count('idx'))
  6. .withColumnRenamed('count(idx)', 'target_count')  # 筛选更新次数小于5次;按更新次数和目标分组;统计用户总数;新增一列分组用户总数:target_count
  7. daysUnder5.orderBy('update_days').show(5)
  8. daysUnder5_final = daysUnder5.join(updatedays_count, 'update_days')  # 两表连接
  9. daysUnder5_final = daysUnder5_final.filter('target==1')  # 筛选逾期target=1的用户
  10. daysUnder5_final.orderBy('update_days').show(5)
  11. updatedays_overdue = daysUnder5_final.selectExpr('update_days', 'target_count/all_count as rate')
  12. .orderBy('update_days')  # 计算更新次数update_days分组中,逾期人数/总人数的比率
  13. updatedays_overdue.show(5)
  14. write_to_mysql(updatedays_overdue, url, properties, 'update_days_overdue')  # 将统计结果写入MySQL的test数据库的update_days_overdue表。
  15. # 绘图
  16. pd_updatedays_overdue = updatedays_overdue.toPandas()
  17. pd_updatedays_overdue.plot.bar(
  18.     x='update_days',
  19.     y='rate',
  20.     xlabel='用户修改信息次数',
  21.     ylabel='逾期率'
  22. )
  23. # plt.show()
复制代码
5. 用户乞贷月份与逾期率的关系分析

用户资金状态随不同时间节点(诸如春节期间资金普遍紧张等)发生波动,这些变革深刻地影响着用户的贷款决策。为缓解财务压力,部分用户会选择在特定时期申请贷款作为应急步伐。鉴于此,深入分析用户基本信息中的贷款月份数据显得尤为关键,这有助于我们洞察用户贷款举动的季候性模式,为金融机构提供更加精准的市场洞察和策略支持。
(1)读取用户基本信息表masterInfo,提取日期字段中的月份信息
读取用户基本信息表masterInfo,提取乞贷时间listinginfo字段的月份。按’month’, 'target’分组,统计不同月份的逾期/未逾期记录总数。

(2)统计不同月份的逾期率
按月份分组,统计用户总人数。

将month_count与month_count_sum合并后,再筛选逾期用户,得到按月分组的逾期人数和总人数(末了一列)。

盘算每月的逾期率。将盘算效果写入MySQL的test数据库的month_overdue表。

绘制乞贷月份和逾期率的柱形图。

  1. # -----5.用户借款月份与逾期率的关系探索
  2. # 考虑到不同时期,用户的资金状况(如过年资金紧张等)可能会影响用户的贷款行为,某些用户会为了应对资金压力而转向贷款。
  3. # 因此,需要对用户基本信息中的借款月份进行探索性分析。
  4. # 5.1 提取日期字段中的月份信息
  5. # masterInfo: DataFrame = ...  # 为masterInfo做类型注解
  6. master_month = masterInfo.selectExpr('idx', "split(listinginfo,'/')[1] as month", 'target')  # 提取月份
  7. month_count = master_month.groupBy('month', 'target').count()  # 统计不用月份的逾期记录数
  8. month_count.orderBy('month').show(5)  # 此时month字段是字符串类型
  9. # 统计不用月份的逾期率
  10. # 按月份统计用户总人数和预期还款用户人数,统计不同月份的逾期率。
  11. month_count_sum = month_count.groupBy('month')
  12. .agg(F.sum('count'))
  13. .withColumnRenamed('sum(count)', 'sum_count')  # 按月份分组,统计用户总人数
  14. month_count_final: DataFrame = month_count.join(month_count_sum, 'month')
  15. month_count_final.show(5)  # 观察结果
  16. month_count_final = month_count_final.filter('target==1')  # 逾期还款的统计结果
  17. month_count_final.show(5)  # 与上面的结果有什么区别。
  18. month_overdue = month_count_final
  19. .selectExpr('cast(month as int)', 'count/sum_count as rate')
  20. .orderBy('month')  # 计算逾期率,将月份month设为整型;按月份排序。
  21. month_overdue.show(5)
  22. write_to_mysql(month_overdue, url, properties, 'month_overdue')  # 将统计结果写入MySQL的test数据库的month_overdue表。
  23. # 绘制 借款月份和逾期率的柱形图
  24. df_month_overdue = month_overdue.toPandas()
  25. df_month_overdue.plot.bar(
  26.     x='month',
  27.     y='rate',
  28.     xlabel='借款月份',
  29.     ylabel='逾期率'
  30. )
  31. # plt.show()
  32. # 从柱形图可以发现,3、4、5、11、12月份用户逾期还款的概率明显高于其他月份。
复制代码
四、数据预处置处罚

1.盘算用户信息缺失个数,用乞贷月份构建新特征(nullcount表)

(1)盘算用户信息缺失个数
这部分工作在“3.用户信息完善环境与逾期率的关系探索”中已经完成,效果生存在masterNew_row_sums表中,如下图所示。


(2)提取月份的表master_month
前面已经完成,如下图所示。

(3)选择masterNew_row_sums表的’idx’, ‘target’, 'sum’字段,并与master_month表合并;重命名列名,效果如下图所示。

末了将nullcount表存储在Hive数据库的nullcount表中。
2.用户更新信息重修(userupdateprocess)

用户更新信息表userupdateinfo_1、userupdateinfo_2字段对每一位用户均有多条记录,与masterinfo主表的一位用户一条记录的形式不符。时间格式需要统一。
(1)读取数据
读取用户更新信息练习集userupdate_train和测试集userupdate_test,将listinginfo1列的数据前面的“_”去掉;将乞贷成交时间listinginfo与更新时间userupdateinfo2时间格式设为-格式;盘算两个时间的间隔天数,并重命名列为diff_days。末了两张表纵向合并,效果如下。

  1. # 6.2 用户更新信息重建
  2. # 用户更新信息表userupdateinfo_1、userupdateinfo_2字段对每一位用户均有多条记录,与masterinfo主表的一位用户一条记录的形式不符。时间格式需要统一。
  3. # 将listinginfo1列的数据前面的_去掉;将 借款成交时间listinginfo与更新时间userupdateinfo2时间格式设为-格式;计算两个时间间隔,并重命名列为diff_days;
  4. train_update_query = """  
  5. SELECT   
  6.     idx,   
  7.     LOWER(REGEXP_REPLACE(userupdateinfo1, '_', '')) AS userupdateinfo1,  
  8.     DATEDIFF(  
  9.         TO_DATE(REGEXP_REPLACE(listinginfo, '/', '-')),  
  10.         TO_DATE(REGEXP_REPLACE(userupdateinfo2, '/', '-'))  
  11.     ) AS diff_days,  
  12.     userupdateinfo2  
  13. FROM   
  14.     {}.{}  
  15. """.format('creditDB', 'userupdate_train')
  16. train_update: DataFrame = spark.sql(train_update_query)
  17. train_update.show(5)
  18. test_update_query = """  
  19. SELECT   
  20.     idx,   
  21.     LOWER(REGEXP_REPLACE(userupdateinfo1, '_', '')) AS userupdateinfo1,  
  22.     DATEDIFF(  
  23.         TO_DATE(REGEXP_REPLACE(listinginfo, '/', '-')),  
  24.         TO_DATE(REGEXP_REPLACE(userupdateinfo2, '/', '-'))  
  25.     ) AS diff_days,  
  26.     userupdateinfo2  
  27. FROM   
  28.     {}.{}  
  29. """.format('creditDB', 'userupdate_test')
  30. test_update: DataFrame = spark.sql(test_update_query)
  31. test_update.show(5)
  32. userupdate = train_update.union(test_update)  # 两表纵向拼接合并
复制代码
(2)盘算用户更新信息的统计值
用户更新信息表记录了用户更新操作的类型、更新操作的时间、乞贷成交的时间等。


  • 盘算乞贷前:乞贷时间与更新时间之差的最大值(最早更新)和最小值。

    —(1)盘算用户更新信息的统计值

    用户更新信息表记录了用户更新操作的类型、更新操作的时间、乞贷成交的时间等。

    盘算乞贷之前 用户最早更新信息的时间 距离 乞贷成交时间的 天数,用户更新次数,用户更新频率,乞贷成交时间之前用户更新信息的天数,用户更改特征数目。

    update_days = userupdate.select(‘idx’, ‘diff_days’).distinct()
    .groupBy(‘idx’)
    .agg(F.max(‘diff_days’), F.min(‘diff_days’))
    .withColumnRenamed(‘max(diff_days)’, ‘first_update’)
    .withColumnRenamed(‘min(diff_days)’, ‘last_update’) # 盘算乞贷前:乞贷与更新时间差的最大值(最早更新)和最小值。
    update_days.show(5)
  • 统计用户更新次数。

    update_counts = userupdate.select(‘idx’, ‘diff_days’)
    .groupBy(‘idx’)
    .agg(F.count(‘diff_days’))
    .withColumnRenamed(‘count(diff_days)’, ‘update_counts’)
    update_counts.show(5)
  • 盘算用户更新频次:更新次数/距离首次更新的天数(first_update)

    盘算用户更新频次

    更新频率=更新次数/距离首次更新的天数(first_update)

    update_frequency = update_days.join(update_counts, on=‘idx’)
    .selectExpr(‘idx’, ‘update_counts/first_update as update_frequency’)
    update_frequency.show(5)
  • 统计乞贷成交时间之前,用户更新信息的天数

  1. # 统计借款成交时间之前,用户更新信息的天数
  2. days_count = userupdate.select('idx', 'userupdateinfo2')
  3. .distinct()
  4. .groupBy('idx')
  5. .agg(F.count('userupdateinfo2'))
  6. .withColumnRenamed('count(userupdateinfo2)', 'update_num')
  7. days_count.show(5)
复制代码


  • 统计用户更改的特征数目:更新信息时,可能会修改很多内容,如学历、电话、婚姻状态等,更新内容列为userupdateinfo1。

    统计用户更改特征数目

    update_casts_counts = userupdate.select(‘idx’, ‘userupdateinfo1’)
    .distinct()
    .groupBy(‘idx’)
    .agg(F.count(‘userupdateinfo1’))
    .withColumnRenamed(‘count(userupdateinfo1)’, ‘update_cats’) # 每位用户 更改特征的数目
    update_casts_counts.show(5)
(3)长宽表转换
长表是指行多列少的表,即一行中的数据量比较少,但行数大。宽表是指列多行少的表,即一行中的数据量较大,但行数少。
用户更新信息表userupdate中,每行数据记录了用户每次的更新内容和更新时间,每位用户有多条记录,与主表masterinfo的每位用户一条记录的形式不符。因此需要对userupdate_train和userupdate_test举行结构转换,以用户修改的内容为字段,构建宽表,表中每行数据记录一位用户的更新信息。


  • 统计每位用户更新各特征的次数;没有更新的特征用0填充。

    (2) 长宽表转换

    长表是指行多列少的表,即一行中的数据量较少,行数多。宽表是指列多行少,即一行中的数据量较大,行数少。一行数据可以存放用户的很多信息

    统计每位用户更新各特征的次数;没有更新的特征用0填充;

    update_casts = userupdate.select(‘idx’, ‘userupdateinfo1’)
    .groupBy(‘idx’, ‘userupdateinfo1’)
    .count() # 统计每位用户的 修改特征 及次数。
    update_casts.show(5)
  • 长表 转 宽边。按用户idx分组,用pivot将每位用户的userupdateinfo1列的数据 转为一行,且盘算每个值的个数,没有数据用0填充。

    update_casts_result = update_casts.groupBy(‘idx’)
    .pivot(‘userupdateinfo1’)
    .sum(‘count’).na.fill(0) # 长表 转 宽边。按用户idx分组,用pivot将每位用户的userupdateinfo1列的数据 转为一行,且盘算每个值的个数,没有数据用0填充。
    update_casts_result.show(5)
  • 多表毗连,按idx为唯一标识举行合并,并存储在Hive中。

    多表毗连,按idx为唯一标识举行合并,并存储在Hive中。

    result = update_days.join(update_casts_counts, on=‘idx’)
    .join(update_frequency, on=‘idx’)
    .join(days_count, on=‘idx’)
    .join(update_casts_result, on=‘idx’) # 连表,添加更多字段:first_update,last_update,update_cats,update_frequency,update_num
    print("==2.用户更新信息 整理后的数据集:userupdateprocess
    ")
    result.show(5)
    result.write.mode(‘overwrite’).saveAsTable(‘creditdb.userupdateprocess’)
3. 用户登录信息重修(loginfoprocess)

用户登录信息表中的loginfo1(登录操作代码)、loginfo2(登录操作类别)字段对每一位用户均有多条记录,与主表masterinfo的每位用户一条记录的形式不符。统计用户登录平台 次数、频率、最早登录时间等,用于构建猜测用户贷款风险的用户登录信息特征。
(1) 加载用户登录信息练习集loginfo_train和测试集loginfo_test,转换时间格式:2020/3/5 转为 2020-3-5,用于盘算时间间隔。末了将练习集和测试集合并。
处置处罚前的用户登录信息练习集loginfo_train,如下图所示。

loginfo1(登录操作代码)、loginfo2(登录操作类别)两列合并处置处罚为longinfo2,盘算乞贷时间listinginfo与登录时间loginfo3的时间差diff_days。末了将练习集和测试集合并,合并后的用户登录信息数据集为loginfo,如下图所示。

  1. # (1) 将时间格式:2020/3/5 转为 2020-3-5,用于计算时间间隔,完成数据预处理
  2. train_log_query = """  
  3. SELECT   
  4.     idx,   
  5.     CONCAT('log_',REGEXP_REPLACE(loginfo1,'-','c'),'_',loginfo2) as loginfo2,
  6. listinginfo,
  7. DATEDIFF(REGEXP_REPLACE(listinginfo,'/','-'),REGEXP_REPLACE(loginfo3,'/','-')) as diff_days,
  8. loginfo3   
  9. FROM   {}.{}  
  10. """.format('creditDB', 'loginfo_train')
  11. train_log: DataFrame = spark.sql(train_log_query)
  12. train_log.show(10)
  13. test_log_query = """  
  14. SELECT   
  15.     idx,   
  16.     CONCAT('log_',REGEXP_REPLACE(loginfo1,'-','c'),'_',loginfo2) as loginfo2,
  17. listinginfo,
  18. DATEDIFF(REGEXP_REPLACE(listinginfo,'/','-'),REGEXP_REPLACE(loginfo3,'/','-')) as diff_days,
  19. loginfo3   
  20. FROM   {}.{}  
  21. """.format('creditDB', 'loginfo_test')
  22. test_log: DataFrame = spark.sql(test_log_query)
  23. test_log.show(10)
  24. loginfo = train_log.union(test_log)
复制代码
(2)盘算用户登录信息的统计值


  • 用户最早登录时间、最晚登录时间,距离乞贷成交时间的天数。max(diff_days)、min(diff_days)
    将表loginfo按用户分组,盘算最大时间差max(diff_days),即最早登录时间,盘算最小时间差min(diff_days),即最晚登录时间。

    由图可知,用户10096距离乞贷成交时间最早的一次登录,是在乞贷前7天,最晚登录时间是在乞贷前0天(当天)。
    (2)盘算用户登录信息的统计值

    用户最早登录时间、最晚登录时间,距离 乞贷成交时间 的天数。max(diff_days)、min(diff_days)

    first_last_day = loginfo.groupBy(‘idx’)
    .agg(F.max(‘diff_days’), F.min(‘diff_days’))
    .withColumnRenamed(‘max(diff_days)’, ‘first_log’)
    .withColumnRenamed(‘min(diff_days)’, ‘last_log’)
    first_last_day.show(5)
  • 用户的登录操作类别总数
    longinfo2列是用户登录操作类型,因此,将表loginfo按用户分组,统计loginfo2的数量,就可以得到用户的登录操作类别总数。

    用户总的登录类别数目

    log_casts = loginfo.select(‘idx’, ‘loginfo2’)
    .distinct()
    .groupBy(‘idx’)
    .agg(F.count(‘loginfo2’))
    .withColumnRenamed(‘count(loginfo2)’, ‘log_casts’)
    log_casts.show(5)
  • 用户登录平台的天数
    loginfo3列为用户登录日期,将表loginfo按用户分组,对登录日期去重后计数,就可以得到用户登录平台的天数。

    用户登录平台的天数

    log_num = loginfo.select(‘idx’, ‘loginfo3’)
    .distinct()
    .groupBy(‘idx’)
    .agg(F.count(‘loginfo3’))
    .withColumnRenamed(‘count(loginfo3)’, ‘log_num’)
    log_num.show(5)
  • 用户第一次登录平台(最早登录)之后,每一天登录平台的频率。
    登录平台的频率log_frequency=用户登录平台的总天数log_num/最早登录平台的天数first_log 。

    用户第一次登录平台之后,每一天登录平台的频率。

    log_frequency = log_num.join(first_last_day, on=‘idx’)
    .selectExpr(‘idx’, ‘log_num/first_log as log_frequency’)
    log_frequency.show(5)
(3)长宽表转换
前面的用户登录信息表,字段很少,每行的信息量很小;每一位用户有多条记录,与目的masterinfo的每位用户一条记录的形式不符。以用户操作类型为字段,构建新表(宽表)。
以idx字段为唯一标识,统计每位用户登录操作类型的次数。如下图所示。

上图中,用户57826的操作类型log_c4_6的次数是10次。
用pivot() 函数将多分类变量转换为多个列。即以idx字段为唯一标识,统计每位用户登录操作类型loginfo2的次数,假如没有相应的操作类型次数,用0填充。末了将效果存入Hive的loginfoprocess表中。

  1. # 构建基于宽表的用户登录信息表
  2. # 前面的用户登录信息表,字段很少,每行的信息量很小;每一位用户有多条记录,与目标masterinfo的每位用户一条记录的形式不符。
  3. # 以用户操作类型为字段,构建新表(宽表)。以idx字段为唯一标识,统计每位用户登录操作类型的次数。如果没有相应的操作类型次数,用0填充。
  4. log_casts_counts = loginfo.select('idx', 'loginfo2')
  5. .groupBy('idx', 'loginfo2')
  6. .count()
  7. log_casts_counts.show(5)
  8. log_casts_result = log_casts_counts.groupBy('idx')
  9. .pivot('loginfo2')
  10. .sum('count')
  11. .na.fill(0)
  12. print("==============3.用户登录信息整理后的数据集:loginfoprocess============
  13. ")
  14. log_casts_result.show(5)
  15. log_casts_result.write.mode('overwrite').saveAsTable('creditDB.loginfoprocess')
复制代码
4.对主表masterinfo做分类数据预处置处罚

在数据集中,有些字段的内容是分类型数据,如户籍省份(广西、广东、湖南、四川中分类),婚姻状态(已婚、未婚等),需要处置处罚这些分类数据。别的,还有部分数据包含空格,空格在后续的数据分析中可能会造成不可预估的影响。因此,需要对包含空格的数据举行洗濯和处置处罚,将其转换成规范的数据。
(1)读取数据masterinfo,删除空格
主表masterinfo中有大量的字符型数据,部分数据包含空格,导致字符串无法匹配。如“中国 移动”中间有一个空格,无法和“中国移动”匹配。
查看主表masterinfo的userinfo_9列有几家通讯公司,发现有些数据包含空格,如下图所示。

使用trim()函数,对主表masterinfo的字符型列做删除空格处置处罚。userinfo_9列是通讯公司,userinfo_2、userinfo_4、userinfo_8和userinfo_20列是都会。处置处罚完后,再查看userinfo_9列的去重值,效果如下。

  1. # 6.4 -------------分类数据预处理--对主表masterinfo进行处理。用于机器学习------
  2. # (1)读取数据并删除数据中的空格
  3. data = spark.sql('select * from creditdb.masterinfo')  # 从hive读取主表masterinfo
  4. # 查看userinfo_9列的不重复值
  5. data.printSchema()
  6. data.select('userinfo_9').distinct().show()  # 查看有几家通讯公司
  7. data_trim = data
  8. .withColumn('userinfo_9', F.trim('userinfo_9'))
  9. .withColumn('userinfo_2', F.trim('userinfo_2'))
  10. .withColumn('userinfo_4', F.trim('userinfo_4'))
  11. .withColumn('userinfo_8', F.trim('userinfo_8'))
  12. .withColumn('userinfo_20', F.trim('userinfo_20'))  # 这几个字段是通讯公司、城市
  13. data_trim.select('userinfo_9').distinct().show()  # 查看 删除空格后的数据
复制代码
(2)重复字段合并
userinfo_7和userinfo_19两列是省份。

添加一列:diffprov,比较userinfo_7和userinfo_19两列的省份是否同等,假如省份同等为11,不同等为0。

  1. # (2) 重复字段合并
  2. data_trim.select('idx', 'userinfo_7', 'userinfo_19').show(5)  # 这两个是省份。比较两个省份是否一致。
  3. # 两个数据任何一个为空值,则为0,如果两个数据相同为1,不同为0
  4. udf_trans_diff = F.udf(
  5.     lambda arg1, arg2: -1 if not ((len(arg1) > 0) and (len(arg2) > 0)) else (1 if (arg1 in arg2) else 0))
  6. diffprov = data_trim.withColumn('diffprov', udf_trans_diff(data_trim['userinfo_7'], data_trim['userinfo_19']))
  7. diffprov.select('idx', 'userinfo_7', 'userinfo_19', 'diffprov').show(5)  # 比较两个省份是否一致。
复制代码
有的都会名 含有“市”,有的没有,把“市”去掉。与都会有关的列为:userinfo_2、userinfo_4、userinfo_8、userinfo_20。

把“市”去掉,再将4列的值两两比较,假如两个值同等(都会 雷同)为1,不同为0,并添加相应的列名userinfodiff_2_20等。

  1. # 有的城市名 含有“市”,有的没有。把“市”去掉。
  2. keystr = '市'
  3. udf_trans_del = F.udf(lambda arg: arg.replace(keystr, '') if ((arg is not None) and (keystr in arg)) else arg)
  4. tmp_data_transform = diffprov
  5. .withColumn('userinfo_2', udf_trans_del(F.decode('userinfo_2', 'UTF-8')))
  6. .withColumn('userinfo_4', udf_trans_del(F.decode('userinfo_4', 'UTF-8')))
  7. .withColumn('userinfo_8', udf_trans_del(F.decode('userinfo_8', 'UTF-8')))
  8. .withColumn('userinfo_20', udf_trans_del(F.decode('userinfo_20', 'UTF-8')))  # 去除这四个字段城市名中的“市”
  9. udf_city_cmp = F.udf(lambda arg1, arg2: 1 if (arg1 == arg2) else 0)
  10. city = tmp_data_transform.select('idx', 'userinfo_2', 'userinfo_4', 'userinfo_8', 'userinfo_20')
  11. city_addCol = city
  12. .withColumn('userinfodiff_2_20', udf_city_cmp('userinfo_2', 'userinfo_20'))
  13. .withColumn('userinfodiff_2_4', udf_city_cmp('userinfo_2', 'userinfo_4'))
  14. .withColumn('userinfodiff_2_8', udf_city_cmp('userinfo_2', 'userinfo_8'))
  15. .withColumn('userinfodiff_4_20', udf_city_cmp('userinfo_4', 'userinfo_20'))
  16. .withColumn('userinfodiff_4_8', udf_city_cmp('userinfo_4', 'userinfo_8'))
  17. .withColumn('userinfodiff_8_20', udf_city_cmp('userinfo_8', 'userinfo_20'))
  18. .drop('userinfo_2', 'userinfo_4', 'userinfo_8', 'userinfo_20')  # 比较两个城市,相同城市为1,不同为0。删除四个原始字段。
  19. print("比较两个城市,删除userinfo_2、userinfo_4、userinfo_8、userinfo_20字段:city_addCol
  20. ")
  21. city_addCol.show(5)
复制代码
5. 字符串字段编码处置处罚(encodeprocess)

PySpark的机器学习模型只能处置处罚数值型数据,因此需要将字符型数据转换为数值型数据。别的,假如有些类别占类别总数的比例比较小,可以举行合并,以减少类别数量。
(1)界说处置处罚函数:col_percentage_calc(),将类别占比小于0.002的数据合并为"other"
将类别占总数的比例阈值设置为0.002,类别占总数的比例小于该值时,则合并为"other"。界说一个udf()函数:col_percentage_calc,实现该功能。示例:盘算指定字段userinfo_8的类别占比,盘算效果的前5条记录如下图所示。

由图可知,用户10001所在都会为:深圳,而在深圳的用户数为1336个,深圳占所有用户所在都会的比例为:0.02672。用户1007所在的都会用户数为34个,占比为:0.00068,占比小于0.002,所以将该都会设为“other”。
  1. # ---6.5 字符串字段编码处理------------------------------------
  2. # 将类别占总数的比例阈值设为0.002,类别占总数的比例小于该值的则合并为other。
  3. # udf()函数,用于过滤类别占总数比例小于0.002的数据,将其划分为'other'
  4. udf_rate_filter = F.udf(
  5.     lambda feature, feature_rate: feature if (feature is not None and feature_rate > 0.002) else 'other')
  6. # 字段的值类别比例计算函数,计算指定字段中不同类别的值占比
  7. def col_percentage_calc(fmdata, featurecol):
  8.     if fmdata is None:
  9.         raise ValueError("DataFrame cannot be None")
  10.     cols = ['idx', featurecol]
  11.     feature_count_name = featurecol + '_count'  # 字段里面 类别的总数
  12.     feature_rate_name = featurecol + '_rate'  # 字段 占比
  13.     feature_count = fmdata.select(cols)
  14.     .groupBy(featurecol)
  15.     .agg(F.count('idx').alias(feature_count_name))  # 按字段里面的类别分组,计算使用这一类别的用户数量,即该类别数量
  16.     # feature_count.show(5)
  17.     feature_all_count = fmdata.select(cols).count()  # 某一字段 的记录数(行数)
  18.     tmp_data = feature_count
  19.     .withColumn(feature_rate_name, feature_count[feature_count_name] / feature_all_count)
  20.     .withColumn(featurecol, F.trim(feature_count[featurecol]))  # 字段里每一种类别数量/字段总记录数;去除 字段名空格
  21.     # tmp_data.show(5)
  22.     feature_rate = fmdata.select(cols).join(tmp_data, on=featurecol,
  23.                                             how='left')  # 左连接,在原数据集上 添加2列:feature_count_name,feature_rate_name
  24.     # feature_rate.show()
  25.     result = feature_rate
  26.     .withColumn(featurecol, udf_rate_filter(featurecol, feature_rate[feature_rate_name]))
  27.     .drop('feature_count_name',
  28.               'feature_rate_name')  # 如果类别占比< 0.002,则标注为other;删除feature_count_name和feature_rate_name
  29.     return result
  30. re = col_percentage_calc(data, 'userinfo_8')
  31. print("------示例:计算指定字段userinfo_8的类别占比----")
  32. re.show(5)
复制代码
(2)界说函数:col_string_indexer( ),将类别型数据转换为数值型数据
  1. # 将类别数据转换为数值型数据
  2. def col_string_indexer(fmdata, featueCol):
  3.     indexer = StringIndexer(inputCol=featueCol, outputCol=featueCol + '_index')
  4.     idx_model = indexer.fit(fmdata)
  5.     result = idx_model.transform(fmdata)
  6.     return result
复制代码
(3)界说函数:dataCacheToHive( ),使用Hive举行缓存
Spark将主表masterinfo的类别型数据转换为数值型数据的过程中,产生庞大的中间效果,会导致节点内存不敷。因此需要使用Hive举行缓存,即:每转换2列数据,将效果存入Hive,再重新读取。
  1. # 使用Hive进行缓存
  2. def dataCacheToHive(fmdata, database, table, count):
  3.     if (len(spark.sql('show tables in {}'.format(database))
  4.                     .filter("tableName=='{}_{}'".format(table, count))
  5.                     .collect()) == 1):
  6.         print('删除表 [{}.{}_{}]....'.format(database, table, count))
  7.         spark.sql('drop table {}.{}_{}'.format(database, table, count))  # 检查表是否存在,如果存在则删除。
  8.     print('保存表 [{}.{}_{}]'.format(database, table, count))
  9.     fmdata.write.saveAsTable('{}.{}_{}'.format(database, table, count))  # 将表保存到Hive中。
  10.     print('更新表[{}.{}_{}]...'.format(database, table, count))
  11.     spark.catalog.refreshTable('{}.{}_{}'.format(database, table, count))   # 刷新Hive元数据中的表信息,确保之前的任何更改都被识别
  12.     print('返回缓存数据(重新读表) ...')
  13.     return spark.read.table('{}.{}_{}'.format(database, table, count)).cache()  # 读取刚刚保存的表,并通过.cache()方法将其缓存
复制代码
(4 )将删除空格、比较两省、删除“市”后的主表tmp_data_transform,与两两都会相比较后的表city_addCol毗连

  1. # tmp_data_transform为删除空格,比较两省,删除“市”后的主表;city_addCol为两两城市相比较后的数据表
  2. cleaned_result = tmp_data_transform.join(city_addCol, on='idx').cache()  # 将两两城市比较的结果加入主表masterinfo
  3. print("---------删除空格,比较两省,删除“市”,两两城市比较后的主表:cleaned_result:")
  4. cleaned_result.show()
  5. cleaned_result = cleaned_result.toDF(*[col.lower() for col in cleaned_result.columns]).cache()  # 将字段名小写
复制代码
(5)选取字符型字段
选取字符型字段,和剩下的字段(大部分是数值型,还有字符型字段通讯公司字段userinfo_9和婚姻状态字段userinfo_22后面单独数值化)。选取的字符型字段如下。

  1. # 确定字符型字段
  2. nanColumns = ['userinfo_7', 'userinfo_8', 'userinfo_2', 'userinfo_4', 'userinfo_19', 'userinfo_20', 'userinfo_23',
  3.               'userinfo_24',
  4.               'education_info2', 'education_info3', 'education_info4', 'education_info6', 'education_info7',
  5.               'education_info8',
  6.               'webloginfo_19', 'webloginfo_20', 'webloginfo_21']
  7. otherColumns = list(set(nanColumns) ^ set(cleaned_result.columns))  # 其他字段(数值型):两个集合之间的差异
  8. other_column_result = cleaned_result.select(otherColumns).cache()  # 其他字段(数值型)
  9. nan_column_result = cleaned_result.select(['idx'] + nanColumns).cache()  # 字符型字段,加上idx
  10. nan_column_result.printSchema()
  11. print("---选择的字符型字段nan_column_result:")
  12. nan_column_result.show(5)
复制代码
(6)将字符型字段转换为数值型字段
调用前面封装的函数col_percentage_calc( )、col_string_indexer( )、dataCacheToHive( ),将字符型字段转换为数值型字段。处置处罚过程如下:

末了将数值化处置处罚效果nan_column_result与原来数值型数据other_column_result毗连,存储到Hive的表encodeprocess中。

  1. # 调用前面封装的函数col_percentage_calc(),col_string_indexer(),dataCacheToHive(),将字符型字段转换为数值型字段
  2. count = 0
  3. # nan_column_result = cleaned_result.select(['idx'] + nanColumns)
  4. for col in nanColumns:
  5.     print('processing the columns[{}] ....'.format(col))
  6.     col_other_result = col_percentage_calc(nan_column_result, col)  # 计算字段中类别的占比,小于0.002的类型设为other
  7.     col_index_result = col_string_indexer(col_other_result, col).drop(col)  # 将字符型字段 转为 数值型字段
  8.     nan_column_result = nan_column_result.join(col_index_result, on='idx').drop(col)
  9.     count = count + 1
  10.     if count % 2 == 0:
  11.         nan_column_result = dataCacheToHive(nan_column_result, 'creditdb', 'tmp_data', count)  # 处理两列后,写入Hive缓存
  12. nan_column_result = dataCacheToHive(nan_column_result, 'creditdb', 'tmp_data', count + 2)
  13. print("计算字段占比、数值化字段后主表的字符串字段部分:nan_column_result:
  14. ")
  15. nan_column_result.count()
  16. nan_column_result.show(5)
  17. # 将处理后的数据存储到hive数据库的encodeprocess表中
  18. result = nan_column_result.join(other_column_result, on='idx')
  19. print("------计算字段占比、数值化字段后的主表:encodeprocess-----------")
  20. result.show(5)
  21. result.write.mode('overwrite').saveAsTable('creditDB.encodeprocess')
复制代码
6. 分类数据重编码(onehotprocess)

(1)重编码
在上一节的encodeprocess表中,userinfo9字段是通讯公司,有四个类别:中国移动、中国联通、中国电信和不详。使用One-Hot编码处置处罚,天生新字段。同理,userinfo22字段是婚姻状态,也接纳One-Hot编码处置处罚,天生新字段。
  1. # 6.6 分类数据重编码
  2. data: DataFrame = spark.sql('select * from {}.{}'.format('creditdb', 'encodeprocess'))  # 从hive读取表
  3. pd_phone = data.select(['idx', 'userinfo_9']).toPandas()  # 通讯公司:中国移动,中国联通,
  4. pd_phone['userinfo_9_commerce'] = pd_phone['userinfo_9'].apply(lambda v: 1 if v == '中国移动' else 0)
  5. pd_phone['userinfo_9_unicom'] = pd_phone['userinfo_9'].apply(lambda v: 1 if v == '中国联通' else 0)
  6. pd_phone['userinfo_9_telecom'] = pd_phone['userinfo_9'].apply(lambda v: 1 if v == '中国电信' else 0)
  7. pd_phone['userinfo_9_unknown'] = pd_phone['userinfo_9'].apply(lambda v: 1 if v == '不详' else 0)
  8. pd_phone.drop(['userinfo_9'], axis=1, inplace=True)
  9. pd_phone.head(5)
  10. # 将包含marriage字段和idx字段的DataFrame转换为pandas模块的DataFrame类型
  11. pd_marriage = data.select('idx', 'userinfo_22').toPandas()
  12. pd_marriage['userinfo_22_married'] = pd_marriage['userinfo_22']
  13. .apply(lambda v: 1 if v == '初婚' or v == '已婚' or v == '再婚' else 0)
  14. pd_marriage['userinfo_22_unmarried'] = pd_marriage['userinfo_22']
  15. .apply(lambda v: 1 if v == '未婚' else 0)
  16. pd_marriage['userinfo_22_unknown'] = pd_marriage['userinfo_22']
  17. .apply(lambda v: 1 if v == 'D' or v == '不详' else 0)
  18. pd_marriage.drop(['userinfo_22'], axis=1, inplace=True)
  19. pd_marriage.head(5)
复制代码
(2)处置处罚效果写入hive表:onehotprocess
将处置处罚效果与主表encodeprocess毗连后,存储到Hive表onehotprocess中。

  1. print("----将通讯公司分类和婚姻状况分类,写入hive----------")
  2. marriage = spark.createDataFrame(pd_marriage)
  3. phone = spark.createDataFrame(pd_phone)
  4. onehotprocess = data
  5. .drop('userinfo_9')
  6. .drop('userinfo_22')
  7. .join(marriage, on='idx')
  8. .join(phone, on='idx')
  9. onehotprocess.write.mode('overwrite').saveAsTable('creditdb.onehotprocess')
  10. print("--数据预处理后的masterinfo主表:onehotprocess-----")
  11. onehotprocess.show(5)
复制代码
7.缺失值处置处罚(tb_nullprocess)

颠末数据预处置处罚后,得到三张新表:onehotprocess(主表,处置处罚后的用户基本信息表)、userupdateprocess(用户更新信息表)、loginfoprocess(用户登录信息表)。
但表中仍旧存在缺失值,特征列中缺失数据比较大的,删除该列;特征列中缺失数据比较小的,则使用0填充。
(1)读取数据,三表毗团结并
读取三张表:onehotprocess、userupdateprocess、loginfoprocess,根据idx毗团结并数据,三张表都包含字段listinginfo,合并时删除重复字段,只保存一个listinginfo字段。
  1. # -----6.7 缺失值处理---------
  2. # 经过数据预处理,主表masterinfo处理后为:onehotprocess,
  3. # 信息更新表updateinfo处理后为:userupdateprocess,
  4. # 登录信息表loginfo处理后为:loginfoprocess
  5. # (1)三张表连接
  6. database = 'creditdb'
  7. tb_onehotprocess = 'onehotprocess'
  8. tb_loginfoprocess = 'loginfoprocess'
  9. tb_userupdateprocess = 'userupdateprocess'
  10. tb_nullprocess = 'nullprocess'
  11. merge_idx = 'idx'
  12. master: DataFrame = spark.sql('select * from {}.{}'.format(database, tb_onehotprocess))
  13. .drop('listinginfo')
  14. .drop('label')
  15. userupdate: DataFrame = spark.sql('select * from {}.{}'.format(database, tb_userupdateprocess))
  16. .drop('listinginfo')
  17. loginfo: DataFrame = spark.sql('select * from {}.{}'.format(database, tb_loginfoprocess))
  18. .drop('listinginfo')
  19. data = userupdate.join(loginfo, on=merge_idx).join(master, on=merge_idx)  # 三表连接
  20. all_count = data.count()  # 表的总记录数
  21. print('表的总记录数:', all_count)
  22. data = data.cache()  # 缓存,方便多次读取
  23. filter_columns = data.drop('idx').drop('target').columns  # 去掉用户ID和目标值target字段,剩下的字段作为训练特征
复制代码
(2)每个字段缺失数据统计及处置处罚
合并之后的表中存在缺失数据,需统计各特征(除了idx和target外)的缺失数据比例。假如缺失值比例大于0.9,则认为该特征的代价不高,删除。假如比例小于0.9,用0填充缺失值。处置处罚过程如下:

  1. # (2)字段缺失数据统计及处理
  2. # 合并之后的表中存在缺失数据,需统计各特征的缺失数据比例,并且将所有特征的数据类型都转换成double类型。
  3. # 如果缺失值比例大于0.9,则认为该特征的价值不高,删除。如果比例小于0.9,用0填充缺失值。
  4. rate_threshold = 0.9
  5. drop_columns = []
  6. cast_columns = []
  7. procssed_count = 0
  8. all_count = len(filter_columns)  # 除了idx和target字段外,字段数量。
  9. print("总列数:", all_count)
  10. for col in filter_columns:
  11.     procssed_count += 1
  12.     if procssed_count % 10 == 0:
  13.         print('[{}/{}]'.format(procssed_count, all_count))
  14.     null_count = data.select(col).filter(F.col(col).isNull()).count()  # 计算每列的缺失值个数
  15.     rate = null_count / all_count
  16.     if rate > rate_threshold:
  17.         drop_columns.append(col)
  18.     else:
  19.         cast_columns.append(col)
  20.     print('总列数:转换类型列数:删除列数:[{}:{}/{}]'
  21.       .format(len(data.columns) - 2, len(cast_columns), len(drop_columns)))
复制代码
(3)所有列(除了idx和target外)的数据类型都转换成double型
除了idx和target外,所有列的数据类型都转换为double型,末了将效果存储到Hive的tb_nullprocess表中。

  1. # 转换数据类型
  2. data = data.drop(*drop_columns)
  3. data = data.select([F.col(column).cast(DoubleType()) for column in cast_columns] + ['idx', 'target'])
  4. data = data.na.fill(0)
  5. data.write.mode('overwrite').saveAsTable('{}.{}'.format(database, tb_nullprocess))
  6. print("----------三表连接以及完全处理后,待模型训练、测试用的数据表:tb_nullprocess---------")
  7. data.show()
复制代码
五、模型构建与评估

1.加载数据、向量化特征、特征提取(降维)

加载三表毗团结并后的数据表tb_nullprocess。选取除’idx’, 'target’以外的所有特征,然后将选取特征向量化。

表中有206个特征,思量到特征较多,可能会影响建模效率,因此使用PCA()方法举行特征提取(降维)。特征提取尽可能保存原始数据结构属性的环境下,从 原始特征中探求最有用、最具代表性的特征,有用减少特征的数量,增强后续模型的学习和泛化本事。

  1. # ------三、模型构建与评估--------
  2. # 1.加载数据
  3. database = 'creditdb'
  4. tb_train_master = 'masterinfo_train'
  5. tb_test_master = 'masterinfo_test'
  6. data_master: DataFrame = spark.sql('select * from {}.{}'.format(database, tb_nullprocess))
  7. # 2.特征提取
  8. assembleCols = data_master.drop('idx', 'target').columns  # 选取待向量化特征
  9. # 向量化特征
  10. assembler = VectorAssembler(inputCols=assembleCols, outputCol='features')
  11. assembler_data = assembler.transform(data_master)  # 特征向量化:除了新增字段features外,保留data_master原来字段
  12. assembler_data.show(5)
  13. # 表中有206个特征,考虑到特征较多,可能会影响建模效率,因此使用PCA()方法进行特征提取(降维)。
  14. # 特征提取尽可能保留原始数据结构属性的情况下,从 原始特征中寻找最有效、最具代表性的特征,有效减少特征的数量,增强后续模型的学习和泛化能力。
  15. # 特征降维
  16. pcaModel = PCA(inputCol='features', outputCol='pcaFeatures', k=100).fit(assembler_data)  # 降维到100个特征
  17. df = pcaModel.transform(assembler_data).selectExpr('idx', 'target', 'pcaFeatures', 'features')
  18. df.show(10)
复制代码
2.GBTs建模

PySpark中的GBTs算法,即Gradient Boosting Trees(梯度提升树),是Spark MLlib库中提供的一种强大的集成学习方法,用于分类和回归使命。梯度提升树通过组合多个弱学习器(通常是决策树)来形成一个强学习器,每个后续模型都尝试改正之前模型的错误。建模流程如下:


  • (1)对标签target编码。假如标签字段为字符型,如:“是”或“否”,这一步很有须要。但假如标签字段为 数值型,也可以省略这一步。设置StringIndexer类的输入字段为target,输出字段自界说为indexedLabel。
  • (2)对特征字段举行向量化。现实上,上一节已经对特征向量化,可以可以省略这一步。没须要对PCA输出的pcaFeatures使用VectorIndexer。可以直接将pcaFeatures作为特征输入到GBTClassifier中。此处为 了说明建模流程,保存这一步。
  • (3)使用GBTClassifier类,创建GTBs建模,设置标签输入字段为indexedLabel,特征输入字段自界说为:indexedFeatures
  • (4)标签字段还原,还原成原来的字符:输入字段为模型主动天生的prediction,设置输出字段自界说为:predictedLabel
  • (5)通过管道,将标签编码、特征向量化、建模和标签还原整合成一个完备的步骤,便于直接用于模型练习和测试。
    -------------3. GBTs建模-----------------

    (1)对标签target编码。输入字段为target,输出字段自界说为indexedLabel。

    labelIndexer = StringIndexer(inputCol=‘target’, outputCol=‘indexedLabel’).fit(df)
    (2)对特征字段举行编码,对向量编码。没须要对PCA输出的pcaFeatures使用VectorIndexer。可以直接将pcaFeatures作为特征输入到GBTClassifier中。

    featureIndexer = VectorIndexer(inputCol=‘pcaFeatures’, outputCol=‘indexedFeatures’, maxCategories=4).fit(df)
    (3)GTBs建模,设置标签输入字段为indexedLabel,特征输入字段自界说为:indexedFeatures

    gbt = GBTClassifier(labelCol=‘indexedLabel’, featuresCol=‘indexedFeatures’, maxIter=10)
    (4)标签字段还原,还原成原来的字符:输入字段为模型主动天生的prediction,设置输出字段自界说为:predictedLabel

    labelConverter = IndexToString(inputCol=‘prediction’, outputCol=‘predictedLabel’, labels=labelIndexer.labels)
    (5)通过管道,将预处置处罚,练习和标签还原整合成一个完备的步骤,便于直接用于模型练习和测试。

    pipeline = Pipeline().setStages([labelIndexer, featureIndexer, gbt, labelConverter])
3.模型练习

预处置处罚后的三表合并数据集tb_nullprocess,包含了练习数据和测试数据,可以从masterinfo_train表中读取练习数据或测试数据的用户ID号idx,在根据idx从数据集tb_nullprocess中获取练习数据或测试数据。
然后通过管道,使用练习数据练习GBTs模型,得到GBTs模型:gbt_model。
  1. # ------4.训练GBTs模型--------
  2. train_index = spark.sql('select distinct idx from {}.{}'.format(database, tb_train_master))  # 读取原始训练数据的idx号
  3. train_data = df.join(train_index, on='idx')  # 根据idx,就可以提取处理后的对应数据。
  4. # 通过管道,使用训练数据训练GBTs模型,得到GBTs模型:gbt_model
  5. gbt_model = pipeline.fit(train_data)
复制代码
4. 模型评估



  • 和读取练习数据的方法一样,读取测试数据。
  • 使用练习好的模型gbt_model,猜测测试数据,返回一个包含测试效果的数据集predictions。

  • 创建分类评估器evaluator ,设置评估指标为正确率accuracy。
  • 使用评估器evaluator ,对测试数据集的猜测效果predictions举行评估,即对模型评估。返回模型正确率。

    --------5.评估GBTs模型-------------------

    test_index = spark.sql(‘select distinct idx from {}.{}’.format(database, tb_test_master))
    test_data = df.join(test_index, on=‘idx’) # 提取测试数据
    predictions = gbt_model.transform(test_data)
    predictions.show(10)
    evaluator = MulticlassClassificationEvaluator(labelCol=‘indexedLabel’,
    predictionCol=‘prediction’,
    metricName=‘accuracy’)
    accuracy = evaluator.evaluate(predictions)
    print(‘猜测的正确性:{}’.format(accuracy))
    accuracy_df = spark.createDataFrame([(accuracy,)], [‘Accuracy’]) # 创建PySpark的DataFrame
    write_to_mysql(accuracy_df, url, properties, ‘accuracy_df’) # 将正确率存入MySQL的test数据库的accuracy_df表。
    predictions_df = predictions.select(‘idx’, ‘target’, ‘predictedLabel’)
    write_to_mysql(predictions_df, url, properties, ‘predictions_df’) # 将猜测效果存入MySQL的test数据库的predictions_df表。
六、数据可视化大屏的设计

程序测试中绘制的图表仅存在于开发环境或命令行界面中,不太轻易分享给非技术用户或团队成员。为了降服这一局限性,一个高效的解决方案是通过开发一个Web应用程序,在网页上绘制图表。这样,用户就可以通过任何标准的Web欣赏器来查看图表。别的,还可以将Web应用摆设到服务器上,以便用户可以通过互联网访问。
Flask是一个使用Python编写的轻量级Web应用框架,以其简洁、机动和可扩展的特点而受到开发者的喜爱。
Flask中文版教程
Flask英文版教程
PyEcharts是一个用于天生ECharts图表的Python库。ECharts是一个使用JavaScript实现的开源可视化库,它提供了丰富的图表类型和强大的数据可视化本事。PyEcharts通过将ECharts与Python相联合,使得开发者可以在Python环境中方便地天生各种图表。
因此,利用Flask构建Web应用框架,联合PyEcharts天生各种数据可视化图表,构建出一个功能丰富的数据可视化平台。用户可以通过欣赏器访问该平台,查看各种数据图表和分析效果。
Pyecharts官方文档: https://pyecharts.org/#/zh-cn/intro
pyecharts-gallery,示例: https://gallery.pyecharts.org/#/README
设计思绪为:读取存储在Linux的MySQL数据库的数据分析效果,再用PyEcharts将数据绘制成图表,末了通过Flask将图表渲染到页面。
项目的创建过程请参考作者写的另外一篇文章: 链接: Flask+Pyecharts+大数据集群(Linux):数据可视化大屏的实现
项目结构如下:

在main.py中,读取MySQL的数据,然后使用PyEcharts配置饼图、柱形图、折线图和地图等,末了Flask将图表渲染到页面show_pyecharts_05.html。绘制的数据可视化大屏如下所示:


show_pyecharts_05.html代码如下:
  1. import json
  2. import pyecharts.charts
  3. from flask import Flask, render_template
  4. from pyecharts.options import *
  5. from markupsafe import Markup  # 导入 Markup,用于在 Flask 模板中安全地渲染 HTML
  6. from pyecharts import charts
  7. import pandas as pd
  8. from sqlalchemy import create_engine
  9. from pyecharts.charts import Pie, Bar, Line, Liquid
  10. from pyecharts.options import LabelOpts, TitleOpts, LegendOpts
  11. import pymysql
  12. from sqlalchemy import create_engine
  13. import pyecharts.options as opts
  14. from pyecharts.charts import Map
  15. import numpy as np
  16. from pyecharts.components import Table
  17. from pyecharts.options import ComponentTitleOpts
  18. app = Flask(__name__)  # 创建一个 Flask 应用实例
  19. # ---------------4.读取集群的MySQL数据,绘制可视化大屏------------------------
  20. # 自定义函数,读master节点的MySQL数据
  21. def connFun_linux(sql_query):
  22.     engine = create_engine("mysql+pymysql://root:123456@192.168.126.10:3306/test")
  23.     try:
  24.         data = pd.read_sql(sql_query, engine)
  25.         return data
  26.     except Exception as e:
  27.         print(f"An error occurred: {e}")
  28.         raise  # 可选:重新抛出异常以便在外部捕获     '''
  29. # 饼图
  30. def get_pie_linux():
  31.     sql = '''select * from update_days_count where all_count > 100'''
  32.     data = connFun_linux(sql)
  33.     x = [str(days) + '次' for days in data['update_days']]  # 修改次数:x 必须为字符类型
  34.     y = list(data['all_count'])
  35.     data_list = [list(z) for z in zip(x, y)]
  36.     c = (
  37.         Pie()
  38.             .add("", data_list, radius=["40%", "75%"])
  39.             .set_global_opts(title_opts=TitleOpts(title="用户修改信息次数与相对应的人数"),
  40.                              legend_opts=LegendOpts(orient="vertical", pos_top="15%", pos_left="2%"))
  41.             .set_series_opts(label_opts=LabelOpts(formatter="{b}:{c}"))
  42.     )
  43.     return c
  44. # 柱形图
  45. def get_bar_linux():
  46.     sql = '''select * from update_days_overdue  '''
  47.     data = connFun_linux(sql)
  48.     x = list(data["update_days"])
  49.     y = [round(rate * 100, 2) for rate in data['rate']]
  50.     c = (
  51.         pyecharts.charts.Bar()
  52.             .add_xaxis(x)
  53.             .add_yaxis("逾期率", y, label_opts=LabelOpts(formatter="{c}%"))
  54.             .set_global_opts(title_opts=TitleOpts(title="用户修改信息次数与逾期率", subtitle="bar"),
  55.                              xaxis_opts=AxisOpts(axislabel_opts=LabelOpts(rotate=45)), )
  56.     )
  57.     return c
  58. #  折线图
  59. def get_line_linux():
  60.     sql = '''select * from month_overdue  '''
  61.     data = connFun_linux(sql)
  62.     x = [str(num) for num in data['month']]  # x轴必须为字符串类型
  63.     y = [round(rate, 3) for rate in data['rate']]
  64.     c = (
  65.         Line()
  66.             .add_xaxis(x)
  67.             .add_yaxis("逾期率", y,
  68.                        markline_opts=MarkLineOpts(data=[MarkLineItem(type_="average")]),
  69.                        label_opts=LabelOpts(is_show=False), )  # 不显示数据点标签)
  70.             .set_global_opts(title_opts=opts.TitleOpts(title="借款月份与逾期率"))
  71.     )
  72.     return c
  73. # 地图
  74. def get_map_linux():
  75.     sql = '''select * from province'''
  76.     data = connFun_linux(sql)
  77.     province = data[data['userinfo_19'] != '不详']
  78.     max_count = province['count'].max()
  79.     province_name = list(province['userinfo_19'])  # 省份必须含有“省”或“自治区”或“市”
  80.     count = list(province['count'])
  81.     c = (
  82.         Map()
  83.             .add("客户数量", [list(z) for z in zip(province_name, count)], "china")
  84.             .set_global_opts(title_opts=opts.TitleOpts(title="中国地图"),
  85.                              visualmap_opts=opts.VisualMapOpts(max_=4000))
  86.             .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
  87.     )
  88.     return c
  89. # 水球
  90. def get_liquid_linux():
  91.     sql = 'select * from accuracy_df'
  92.     data = connFun_linux(sql)
  93.     pred = np.round(data['Accuracy'].values, 2)[0]
  94.     c = (
  95.         Liquid()
  96.             .add("lq", [pred])
  97.             .set_global_opts(title_opts=TitleOpts(title="Liquid-基本示例"))
  98.     )
  99.     return c
  100. # 表格
  101. def get_table_linux():
  102.     sql = 'select * from predictions_df'
  103.     data = connFun_linux(sql)
  104.     predictions = data[:5]
  105.     predictions_list = predictions.values.tolist()
  106.     headers = ['用户ID', '真实标签', '预测标签']
  107.     rows = predictions_list
  108.     c = (
  109.         Table()
  110.             .add(headers, rows)
  111.             .set_global_opts(
  112.             title_opts=ComponentTitleOpts(title="标签真实值与模型预测结果"))
  113.     )
  114.     return c
  115. @app.route('/show_pyecharts_05')
  116. def show_pyecharts_05():
  117.     pie = get_pie_linux()
  118.     bar = get_bar_linux()
  119.     line = get_line_linux()
  120.     ma = get_map_linux()
  121.     liquid = get_liquid_linux()
  122.     # tab = get_table_linux()
  123.     return render_template("show_pyecharts_05.html",
  124.                            pie_options=pie.dump_options(),
  125.                            bar_options=bar.dump_options(),
  126.                            line_options=line.dump_options(),
  127.                            map_options=ma.dump_options(),
  128.                            liquid_options=liquid.dump_options(),
  129.                            )
  130. if __name__ == "__main__":
  131.     app.run(host='127.0.0.1', port=5000, debug=True)
复制代码
在前端设计中,show_pyecharts_05.html设置和初始化多个 ECharts 图表的容器,并通过 Flask 模板渲染机制动态地设置和表现这些图表的选项。代码如下:
  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4.     <meta charset="utf-8" />
  5.     <title>ECharts</title>
  6.     <style >
  7.         /* CSS 样式将放在这里 */
  8.         .chart-container{
  9.             display:flex;
  10.             flex-direction:column;    /* 默认为column,表示垂直布局 */
  11.             justify-content:center;   /* 水平居中 */
  12.             align-items:center;       /* 垂直居中(如果需要的话) */
  13.             height:150vh;             /* 根据需要设置容器高度 */
  14.             width:100%;               /* 占据全屏宽度 */
  15.         }
  16.         .chart-row{
  17.             display: flex;                    /* 每一行都是一个flex容器 */
  18.             justify-content:space-between;    /* 列之间均匀分布 */
  19.             margin-bottom:20px;               /* 行与行之间的间距 */
  20.         }
  21.         .chart-item{
  22.             flex: 1;                         /* 每个flex item占据相等的空间 */
  23.             margin: 0 10px;                  /* 图表之间的间距 */
  24.         }
  25.     </style>
  26.     <!-- 引入刚刚下载的 ECharts 文件 -->
  27.     <script src="/static/echarts.js"></script>
  28.     <script type="text/javascript" src="https://assets.pyecharts.org/assets/v5/echarts.min.js"></script>
  29.     <script type="text/javascript" src="https://assets.pyecharts.org/assets/v5/maps/china.js"></script>
  30.     <script type="text/javascript" src="https://assets.pyecharts.org/assets/v5/echarts-liquidfill.min.js"></script>
  31. </head>
  32. <body>
  33. <!-- 为 ECharts 准备一个定义了宽高的 DOM -->
  34. <div class="chart-container">
  35.     <div class="chart-row">
  36.         <div class="chart-item"  id="pie" style="width: 800px;height:400px;"></div>
  37.         <div class="chart-item" id="bar" style="width: 800px;height:400px;"></div>
  38.     </div>
  39.     <div class="chart-row">
  40.         <div class="chart-item" id="line" style="width: 800px;height:400px;"></div>
  41.         <div class="chart-item" id="barstack" style="width: 800px;height:400px;"></div>
  42.     </div>
  43.     <div class="chart-row">
  44.         <div class="chart-item" id="liquid" style="width: 800px;height:400px;"></div>
  45.         <div class="chart-item" id="table" style="width: 800px;height:400px;"></div>
  46.     </div>
  47. </div>
  48. <script type="text/javascript">
  49.     // 基于准备好的dom,初始化echarts实例
  50.     var pieChart = echarts.init(document.getElementById('pie'));
  51.     var barChart = echarts.init(document.getElementById('bar'));
  52.     var lineChart = echarts.init(document.getElementById('line'));
  53.     var barStackChart = echarts.init(document.getElementById('barstack'));
  54.     var liquidChart = echarts.init(document.getElementById('liquid'));
  55.     // 使用刚指定的配置项和数据显示图表。
  56.     pieChart.setOption({{ pie_options | safe }});
  57.     barChart.setOption({{ bar_options | safe }});
  58.     lineChart.setOption({{ line_options | safe }});
  59.     barStackChart.setOption({{ map_options | safe }});
  60.     liquidChart.setOption({{ liquid_options | safe }});
  61. </script>
  62. </body>
  63. </html>
复制代码
参考资料:
1.PySpark 大数据机器学习入门案例1 :iris+ ML+Logistics分类:https://www.zhihu.com/tardis/bd/art/612626873source_id=1001
2.pySpark 机器学习库ml入门(参数调优):https://www.jianshu.com/p/20456b512fa7
3.PySpark 逻辑回归(参数调优):https://zhuanlan.zhihu.com/p/461211990
4.Spark ML LR 用 setWeightCol 解决数据不平衡:https://kelun.blog.csdn.net/article/details/103425926
5.Python——机器学习:不平衡数据集常用处置处罚方法和实例:https://blog.csdn.net/weixin_53848907/article/details/135976144
6.决策树分类器(DecisionTreeClassifier):https://blog.csdn.net/qq_66726657/article/details/132470442
7.戴刚,张良均. PySpark大数据分析与应用. 人民邮电出版社,2024.
8.汪明. PySpark实战. 清华大学出版社,2022

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

道家人

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表