悠扬随风 发表于 2025-9-6 16:29:37

Spark处理过程-案例数据洗濯

一、缺失值处理

缺失值是数据中最常见的问题之一,处理方法包罗删除、填充或猜测。
1. 检测缺失值

python
运行

import pandas as pd
import numpy as np

# 创建示例DataFrame
data = {
    'A': ,
    'B': ,
    'C':
}
df = pd.DataFrame(data)

# 检测缺失值
print("缺失值统计:")
print(df.isnull().sum())# 统计每列的缺失值数量
2. 删除缺失值

python
运行

# 删除包含缺失值的行
df_dropna = df.dropna()

# 删除全为缺失值的列
df_dropna_col = df.dropna(axis=1, how='all')
3. 填充缺失值

python
运行

# 使用固定值填充
df_filled_constant = df.fillna(0)

# 使用均值/中位数/众数填充
df_filled_mean = df.fillna(df.mean())# 均值填充
df_filled_median = df.fillna(df.median())# 中位数填充

# 向前/向后填充
df_filled_ffill = df.fillna(method='ffill')# 向前填充
df_filled_bfill = df.fillna(method='bfill')# 向后填充

# 使用插值法填充
df_interpolated = df.interpolate()# 线性插值
二、非常值处理

非常值大概由数据录入错误或真实非常事件导致,处理方法包罗删除、修正或离散化。
1. 检测非常值

python
运行

# 使用Z-score检测异常值
from scipy import stats

z_scores = np.abs(stats.zscore(df['A']))
threshold = 3
outliers = df

# 使用箱线图检测异常值
import matplotlib.pyplot as plt

plt.boxplot(df['A'])
plt.show()
2. 处理非常值

python
运行

# 删除异常值
df_clean = df[(z_scores < threshold)]

# 替换异常值(例如,将超过3倍标准差的值替换为阈值)
df_replaced = df.copy()
df_replaced['A'] = np.where(z_scores > threshold, df['A'].median(), df['A'])
三、重复数据处理

python
运行

# 创建包含重复数据的DataFrame
df_duplicates = pd.DataFrame({
    'A': ,
    'B': ['x', 'y', 'y', 'z']
})

# 检测重复行
print("重复行检测:")
print(df_duplicates.duplicated())

# 删除重复行
df_cleaned = df_duplicates.drop_duplicates()
四、数据类型转换

确保数据类型与业务需求一致,制止计算错误。

python
运行

# 转换数据类型
df['A'] = df['A'].astype(int)# 转为整数类型

# 字符串转日期
df['date'] = pd.to_datetime(df['date_string'])

# 处理格式错误(例如,将"$1,000"转为数值1000)
df['price'] = df['price'].str.replace('$', '').str.replace(',', '').astype(float)
五、不一致数据处理

处理巨细写、单元、拼写等不一致问题。

python
运行

# 统一大小写
df['name'] = df['name'].str.upper()# 全部大写

# 统一单位(例如,将英寸转为厘米)
df['height_cm'] = df['height_in'] * 2.54

# 修正拼写错误
df['country'] = df['country'].replace('USA', 'United States')
六、数据尺度化 / 归一化

用于将数据缩放到统一范围,常见方法有 Min-Max 缩放和 Z-score 尺度化。

python
运行

from sklearn.preprocessing import MinMaxScaler, StandardScaler

# Min-Max缩放(将数据缩放到范围)
scaler = MinMaxScaler()
df['A_scaled'] = scaler.fit_transform(df[['A']])

# Z-score标准化(均值为0,标准差为1)
scaler = StandardScaler()
df['A_standardized'] = scaler.fit_transform(df[['A']])
七、文本数据洗濯

文本数据常包罗噪声,需进行特殊处理。

python
运行

# 去除空格和特殊字符
df['text'] = df['text'].str.strip()# 去除首尾空格
df['text'] = df['text'].str.replace('[^\w\s]', '')# 去除标点符号

# 分词和词干提取
import nltk
from nltk.stem import PorterStemmer

nltk.download('punkt')
stemmer = PorterStemmer()

df['tokens'] = df['text'].apply(lambda x: nltk.word_tokenize(x))
df['stemmed'] = df['tokens'].apply(lambda tokens: )
八、高级数据洗濯(使用 Spark)

对于大规模数据,可使用 Spark 进行分布式洗濯。

python
运行

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

# 读取数据
df_spark = spark.read.csv("data.csv", header=True, inferSchema=True)

# 处理缺失值
df_spark = df_spark.na.fill(0, subset=["numeric_col"])# 填充数值列
df_spark = df_spark.na.fill("unknown", subset=["string_col"])# 填充字符串列

# 过滤异常值
df_spark = df_spark.filter(col("age") > 0)# 年龄必须大于0

# 去重
df_spark = df_spark.dropDuplicates()
九、数据洗濯流程建议

[*]探索性分析:先相识数据布局、分布和缺失环境。
[*]订定洗濯策略:根据业务需求和数据特点选择合适的洗濯方法。
[*]逐步洗濯:按列或数据类型分步处理,制止错误传播。
[*]验证结果:洗濯后检查数据质量,确保问题已解决。
[*]记载日志:记载洗濯过程和处理结果,便于追溯和审计。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Spark处理过程-案例数据洗濯