马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
一、机器学习的团体框架(类比烹饪)
假设你要做一道菜,机器学习的过程可以类比为:
步调 --> 烹饪类比 -->机器学习对应
1. 确定目标 | 想做什么菜(红烧肉/沙拉) | 明确使命 (分类/回归/聚类)
2. 准备食材 | 买菜、洗菜、切菜 | 数据网络与预处置惩罚
3. 设计食谱 | 决定烹饪步调和调料 | 选择算法和模子设计
4. 试做并尝味道 | 调解火候和调味 | 模子训练与调参
5. 最终成品 | 端上桌的菜 | 模子摆设与应用
二、机器学习的焦点流程
1. 数据预处置惩罚(准备食材)
- 目标:把原始数据变成算法能“吃”的格式
- 常见操作:
- 清洗数据:去除重复、错误的数据(如年龄填了200岁)
- 特征工程:将数据转换为数值特征(比如把“男/女”转为0/1)
- 标准化:统一数据范围(比如把工资从[0, 100000]缩放到[0, 1])
2. 选择算法(选菜谱)
根据使命范例选择算法:
使命范例 | 问题示例 | 常用算法 | 生活类比
分类 判断邮件是垃圾邮件吗? | 逻辑回归、决议树 | 垃圾分类(干/湿/有害)
回归 预测房价 | 线性回归、随机森林回归 | 根据经验估算装修费用
聚类 用户分组 | K-Means、DBSCAN | 超市商品主动分区摆放
3. 模子训练(试做菜品)
- 训练过程:算法通过数据主动学习规律
- 监督学习(有标签):老师讲授生做题(提供精确答案)
- 无监督学习(无标签):学生自己总结规律(如聚类)
4. 模子评估(尝味道调解)
- 评估指标:
- 分类:正确率(答对题的比例)
- 回归:RMSE(预测值与真实值的平均毛病)
- 聚类:轮廓系数(分组是否紧密)
5. 模子摆设(上菜)
三、算法解释
1. 聚类算法(比如K-Means)
- 目标:主动把相似的数据分到同一组
- 生活场景:整理衣柜
- 把衣服按季节(夏装/冬装)或范例(上衣/裤子)主动分类
- 步调:
- 决定分几组(比如分3组)
- 随机选3个衣服作为初始中心点
- 计算每件衣服到中心的距离,分到最近的一组
- 重新计算每组的中心点
- 重复直到中心点不再变化
2. 回归算法(比如线性回归)
- 目标:预测数值型结果(如房价)
- 生活场景:估算打车费用
- 已知:距离(公里) → 费用(元)
- 规律:费用 = 起步价 + 每公里单价 × 距离
- 算法使命:从历史数据中学习 起步价 和 每公里单价
3. 分类算法(比如决议树)
- 目标:预测种别(如是否中奖)
- 生活场景:判断水果范例
一、SparkML 的焦点架构与流程
原始数据 → 数据清洗 → 特征工程 → 模子训练 → 评估调优 → 摆设
焦点组件
DataFrame:布局化数据容器(列式存储)
Transformer:数据转换器(如 VectorAssembler)
Estimator:模子训练器(如 LogisticRegression)
Pipeline:将多个步调封装为工作流
二、数据预处置惩罚实战
1. 数据加载与探索
- // 加载 CSV 数据
- val data = spark.read
- .option("header", "true")
- .option("inferSchema", "true")
- .csv("data/iris.csv")
- // 查看数据结构
- data.printSchema()
- // 统计摘要
- data.describe().show()
复制代码 2. 特征工程
- 1. 数据加载与探索
- import org.apache.spark.ml.feature.VectorAssembler
- val assembler = new VectorAssembler()
- .setInputCols(Array("sepal_length", "sepal_width", "petal_length", "petal_width"))
- .setOutputCol("features")
- val df = assembler.transform(data)
- 2.类别特征编码:
- import org.apache.spark.ml.feature.StringIndexer
- val indexer = new StringIndexer()
- .setInputCol("species")
- .setOutputCol("label")
- val indexedDF = indexer.fit(df).transform(df)
- 3. 数据标准化:
- import org.apache.spark.ml.feature.StandardScaler
- val scaler = new StandardScaler()
- .setInputCol("features")
- .setOutputCol("scaledFeatures")
- .setWithStd(true)
- .setWithMean(true)
- val scaledDF = scaler.fit(df).transform(df)
复制代码
三、模子训练与评估
1. 分类模子(以决议树为例)
- import org.apache.spark.ml.classification.DecisionTreeClassifier
- // 划分训练集/测试集
- val Array(train, test) = indexedDF.randomSplit(Array(0.8, 0.2), seed=42)
- // 定义模型
- val dt = new DecisionTreeClassifier()
- .setLabelCol("label")
- .setFeaturesCol("features")
- // 训练
- val model = dt.fit(train)
- // 预测
- val predictions = model.transform(test)
- // 评估
- import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
- val evaluator = new MulticlassClassificationEvaluator()
- .setLabelCol("label")
- .setPredictionCol("prediction")
- .setMetricName("accuracy")
- val accuracy = evaluator.evaluate(predictions)
- println(s"Accuracy = ${accuracy}")
复制代码 2. 回归模子(以线性回归为例)
- import org.apache.spark.ml.regression.LinearRegression
- // 加载房价数据
- val housingDF = spark.read.parquet("data/housing.parquet")
- // 定义模型
- val lr = new LinearRegression()
- .setLabelCol("price")
- .setFeaturesCol("features")
- .setMaxIter(100)
- .setRegParam(0.3)
- // 训练与评估
- val lrModel = lr.fit(train)
- val trainingSummary = lrModel.summary
- println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
复制代码 3. 聚类模子(以K-Means为例)
- import org.apache.spark.ml.clustering.KMeans
- val kmeans = new KMeans()
- .setK(3)
- .setFeaturesCol("scaledFeatures")
- .setSeed(1L)
- val model = kmeans.fit(scaledDF)
- val predictions = model.transform(scaledDF)
- // 显示聚类中心
- model.clusterCenters.foreach(println)
复制代码 四、Pipeline 与超参数调优
1. 构建 Pipeline
- import org.apache.spark.ml.Pipeline
- val pipeline = new Pipeline()
- .setStages(Array(
- assembler,
- scaler,
- kmeans
- ))
- val pipelineModel = pipeline.fit(data)
复制代码 2. 交叉验证调参
- import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
- // 参数网格
- val paramGrid = new ParamGridBuilder()
- .addGrid(kmeans.k, Array(2, 3, 4))
- .build()
- // 交叉验证
- val cv = new CrossValidator()
- .setEstimator(pipeline)
- .setEvaluator(new ClusteringEvaluator())
- .setEstimatorParamMaps(paramGrid)
- .setNumFolds(3)
- val cvModel = cv.fit(data)
复制代码 五、模子摆设
- //TODO保存模型
- pipelineModel.write.overwrite().save("models/iris_clustering")
- //TODO加载预测模型
- import org.apache.spark.ml.PipelineModel
- val loadedModel = PipelineModel.load("models/iris_clustering")
- val newPredictions = loadedModel.transform(newData)
复制代码 实战案例:用户分群
字段:user_id, age, income, purchase_freq
目标:将用户分为高价值、中价值、低价值群体
代码实现:
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
- import org.apache.spark.ml.clustering.KMeans
- import org.apache.spark.ml.Pipeline
- import org.apache.spark.ml.evaluation.ClusteringEvaluator
- import org.apache.spark.sql.functions._
- object UserClusteringWithMetrics {
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder()
- .appName("UserClusteringWithMetrics")
- .master("local[*]")
- .getOrCreate()
- import spark.implicits._
- val userData = Seq(
- (1, 25, 50000, 2),
- (2, 45, 120000, 8),
- (3, 32, 80000, 5),
- (4, 28, 60000, 3),
- (5, 50, 150000, 10),
- (6, 22, 40000, 1)
- ).toDF("user_id", "age", "income", "purchase_freq")
- ---------- 数据预处理 -------------
- val meanIncome = userData.select(mean("income")).first().getDouble(0)
- val processedData = userData.na.fill(meanIncome, Seq("income"))
- ------------- 特征工程 -----------
- val featureCols = Array("age", "income", "purchase_freq")
-
- ------------- 特征向量化-----------
- val assembler = new VectorAssembler()
- .setInputCols(featureCols)
- .setOutputCol("rawFeatures")
-
- --------- 标准化----------
- val scaler = new StandardScaler()
- .setInputCol("rawFeatures")
- .setOutputCol("scaledFeatures")
- .setWithStd(true)
- .setWithMean(true)
- -------模型定义 -----
- val kmeans = new KMeans()
- .setK(3) // 初始聚类数
- .setFeaturesCol("scaledFeatures") // 使用标准化后的特征
- .setSeed(42L) // 固定随机种子
- .setPredictionCol("cluster")
- ----------- 构建 Pipeline ----------
- val pipeline = new Pipeline()
- .setStages(Array(assembler, scaler, kmeans))
- -------------性能测速:训练阶段 ----------
- val startTime = System.currentTimeMillis()
- val model = pipeline.fit(processedData)
- val trainingTime = (System.currentTimeMillis() - startTime) / 1000.0
- println(f"训练耗时:$trainingTime%.2f 秒")
- ------------- 预测与评估--------------
- val startPredictTime = System.currentTimeMillis()
- val predictions = model.transform(processedData)
- val predictTime = (System.currentTimeMillis() - startPredictTime) / 1000.0
- println(f"预测耗时:$predictTime%.2f 秒")
- -------------- 聚类结果统计------------
- println("\n各簇样本分布:")
- predictions.groupBy("cluster").count().show()
- -------------评估指标(轮廓系数)--------
- val evaluator = new ClusteringEvaluator()
- .setFeaturesCol("scaledFeatures")
- .setPredictionCol("cluster")
- val silhouette = evaluator.evaluate(predictions)
- println(f"轮廓系数:$silhouette%.4f")
- ------------------ 可视化分析------------------
- val clusterCenters = model.stages.last.asInstanceOf[KMeansModel].clusterCenters
- println("\n聚类中心特征:")
- clusterCenters.foreach(println)
- -------------- 各簇特征分布统计----------------
- println("\n各簇特征均值:")
- predictions.select("cluster", "age", "income", "purchase_freq")
- .groupBy("cluster")
- .agg(
- avg("age").alias("avg_age"),
- avg("income").alias("avg_income"),
- avg("purchase_freq").alias("avg_purchase")
- )
- .show()
- --------------- 参数调优建议 ---------------
- println("\n参数调优建议:")
- (2 to 5).foreach { k =>
- val kmeansTmp = new KMeans().setK(k).setFeaturesCol("scaledFeatures")
- val modelTmp = kmeansTmp.fit(scaler.transform(assembler.transform(processedData)))
- val score = evaluator.evaluate(modelTmp.transform(processedData))
- println(f"K=$k 时轮廓系数:$score%.4f")
- }
- spark.stop()
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|