f 数据仓库与分析-sparkML入门,通俗解释机器学习的框架和算法 - Powered by qidao123.com技术社区

sparkML入门,通俗解释机器学习的框架和算法

打印 上一主题 下一主题

主题 2210|帖子 2210|积分 6630

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
一、机器学习的团体框架(类比烹饪)

假设你要做一道菜,机器学习的过程可以类比为:
步调 --> 烹饪类比  -->机器学习对应

1. 确定目标 | 想做什么菜(红烧肉/沙拉) | 明确使命 (分类/回归/聚类)
2. 准备食材 | 买菜、洗菜、切菜 | 数据网络与预处置惩罚
3. 设计食谱 | 决定烹饪步调和调料 | 选择算法和模子设计
4. 试做并尝味道 | 调解火候和调味 | 模子训练与调参
5. 最终成品 | 端上桌的菜 | 模子摆设与应用

二、机器学习的焦点流程

1. 数据预处置惩罚(准备食材)


  • 目标:把原始数据变成算法能“吃”的格式
  • 常见操作

    • 清洗数据:去除重复、错误的数据(如年龄填了200岁)
    • 特征工程:将数据转换为数值特征(比如把“男/女”转为0/1)
    • 标准化:统一数据范围(比如把工资从[0, 100000]缩放到[0, 1])

2. 选择算法(选菜谱)
根据使命范例选择算法:
 使命范例 | 问题示例 | 常用算法 | 生活类比

分类  判断邮件是垃圾邮件吗? | 逻辑回归、决议树 | 垃圾分类(干/湿/有害)
回归  预测房价 | 线性回归、随机森林回归 | 根据经验估算装修费用
聚类  用户分组 | K-Means、DBSCAN | 超市商品主动分区摆放
3. 模子训练(试做菜品)


  • 训练过程:算法通过数据主动学习规律

    • 监督学习(有标签):老师讲授生做题(提供精确答案)
    • 无监督学习(无标签):学生自己总结规律(如聚类)

4. 模子评估(尝味道调解)


  • 评估指标

    • 分类:正确率(答对题的比例)
    • 回归:RMSE(预测值与真实值的平均毛病)
    • 聚类:轮廓系数(分组是否紧密)

5. 模子摆设(上菜)


  • 将训练好的模子用于现实预测
三、算法解释

1. 聚类算法(比如K-Means)


  • 目标:主动把相似的数据分到同一组
  • 生活场景:整理衣柜

    • 把衣服按季节(夏装/冬装)或范例(上衣/裤子)主动分类

  • 步调

    • 决定分几组(比如分3组)
    • 随机选3个衣服作为初始中心点
    • 计算每件衣服到中心的距离,分到最近的一组
    • 重新计算每组的中心点
    • 重复直到中心点不再变化

2. 回归算法(比如线性回归)


  • 目标:预测数值型结果(如房价)
  • 生活场景:估算打车费用

    • 已知:距离(公里) → 费用(元)
    • 规律:费用 = 起步价 + 每公里单价 × 距离
    • 算法使命:从历史数据中学习 起步价 和 每公里单价

3. 分类算法(比如决议树)


  • 目标:预测种别(如是否中奖)
  • 生活场景:判断水果范例

    • 通过颜色、外形、重量等特征判断是苹果还是橙子

一、SparkML 的焦点架构与流程
   原始数据 → 数据清洗 → 特征工程 → 模子训练 → 评估调优 → 摆设
  焦点组件
    DataFrame:布局化数据容器(列式存储)
Transformer:数据转换器(如 VectorAssembler)
Estimator:模子训练器(如 LogisticRegression)
Pipeline:将多个步调封装为工作流
  二、数据预处置惩罚实战 
1. 数据加载与探索
  1. // 加载 CSV 数据
  2. val data = spark.read
  3.   .option("header", "true")
  4.   .option("inferSchema", "true")
  5.   .csv("data/iris.csv")
  6. // 查看数据结构
  7. data.printSchema()
  8. // 统计摘要
  9. data.describe().show()
复制代码
2. 特征工程
  1. 1. 数据加载与探索
  2.   import org.apache.spark.ml.feature.VectorAssembler
  3.   val assembler = new VectorAssembler()
  4.     .setInputCols(Array("sepal_length", "sepal_width", "petal_length", "petal_width"))
  5.     .setOutputCol("features")
  6.   val df = assembler.transform(data)
  7. 2.类别特征编码:
  8.   import org.apache.spark.ml.feature.StringIndexer
  9.   val indexer = new StringIndexer()
  10.     .setInputCol("species")
  11.     .setOutputCol("label")
  12.   val indexedDF = indexer.fit(df).transform(df)
  13. 3. 数据标准化:
  14. import org.apache.spark.ml.feature.StandardScaler
  15. val scaler = new StandardScaler()
  16.   .setInputCol("features")
  17.   .setOutputCol("scaledFeatures")
  18.   .setWithStd(true)
  19.   .setWithMean(true)
  20. val scaledDF = scaler.fit(df).transform(df)
复制代码

三、模子训练与评估
1. 分类模子(以决议树为例)
  1. import org.apache.spark.ml.classification.DecisionTreeClassifier
  2. // 划分训练集/测试集
  3. val Array(train, test) = indexedDF.randomSplit(Array(0.8, 0.2), seed=42)
  4. // 定义模型
  5. val dt = new DecisionTreeClassifier()
  6.   .setLabelCol("label")
  7.   .setFeaturesCol("features")
  8. // 训练
  9. val model = dt.fit(train)
  10. // 预测
  11. val predictions = model.transform(test)
  12. // 评估
  13. import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
  14. val evaluator = new MulticlassClassificationEvaluator()
  15.   .setLabelCol("label")
  16.   .setPredictionCol("prediction")
  17.   .setMetricName("accuracy")
  18. val accuracy = evaluator.evaluate(predictions)
  19. println(s"Accuracy = ${accuracy}")
复制代码
2. 回归模子(以线性回归为例)
  1. import org.apache.spark.ml.regression.LinearRegression
  2. // 加载房价数据
  3. val housingDF = spark.read.parquet("data/housing.parquet")
  4. // 定义模型
  5. val lr = new LinearRegression()
  6.   .setLabelCol("price")
  7.   .setFeaturesCol("features")
  8.   .setMaxIter(100)
  9.   .setRegParam(0.3)
  10. // 训练与评估
  11. val lrModel = lr.fit(train)
  12. val trainingSummary = lrModel.summary
  13. println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
复制代码
3. 聚类模子(以K-Means为例)
  1. import org.apache.spark.ml.clustering.KMeans
  2. val kmeans = new KMeans()
  3.   .setK(3)
  4.   .setFeaturesCol("scaledFeatures")
  5.   .setSeed(1L)
  6. val model = kmeans.fit(scaledDF)
  7. val predictions = model.transform(scaledDF)
  8. // 显示聚类中心
  9. model.clusterCenters.foreach(println)
复制代码
四、Pipeline 与超参数调优
1. 构建 Pipeline
  1. import org.apache.spark.ml.Pipeline
  2. val pipeline = new Pipeline()
  3.   .setStages(Array(
  4.     assembler,
  5.     scaler,
  6.     kmeans
  7.   ))
  8. val pipelineModel = pipeline.fit(data)
复制代码
2. 交叉验证调参
  1. import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
  2. // 参数网格
  3. val paramGrid = new ParamGridBuilder()
  4.   .addGrid(kmeans.k, Array(2, 3, 4))
  5.   .build()
  6. // 交叉验证
  7. val cv = new CrossValidator()
  8.   .setEstimator(pipeline)
  9.   .setEvaluator(new ClusteringEvaluator())
  10.   .setEstimatorParamMaps(paramGrid)
  11.   .setNumFolds(3)
  12. val cvModel = cv.fit(data)
复制代码
 五、模子摆设
  1. //TODO保存模型
  2. pipelineModel.write.overwrite().save("models/iris_clustering")
  3. //TODO加载预测模型
  4. import org.apache.spark.ml.PipelineModel
  5. val loadedModel = PipelineModel.load("models/iris_clustering")
  6. val newPredictions = loadedModel.transform(newData)
复制代码
实战案例:用户分群

字段:user_id, age, income, purchase_freq
目标:将用户分为高价值、中价值、低价值群体
代码实现:
  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
  3. import org.apache.spark.ml.clustering.KMeans
  4. import org.apache.spark.ml.Pipeline
  5. import org.apache.spark.ml.evaluation.ClusteringEvaluator
  6. import org.apache.spark.sql.functions._
  7. object UserClusteringWithMetrics {
  8.   def main(args: Array[String]): Unit = {
  9.     val spark = SparkSession.builder()
  10.       .appName("UserClusteringWithMetrics")
  11.       .master("local[*]")
  12.       .getOrCreate()
  13.     import spark.implicits._
  14.     val userData = Seq(
  15.       (1, 25, 50000, 2),
  16.       (2, 45, 120000, 8),
  17.       (3, 32, 80000, 5),
  18.       (4, 28, 60000, 3),
  19.       (5, 50, 150000, 10),
  20.       (6, 22, 40000, 1)
  21.     ).toDF("user_id", "age", "income", "purchase_freq")
  22.     ---------- 数据预处理 -------------
  23.     val meanIncome = userData.select(mean("income")).first().getDouble(0)
  24.     val processedData = userData.na.fill(meanIncome, Seq("income"))
  25.     ------------- 特征工程 -----------
  26.     val featureCols = Array("age", "income", "purchase_freq")
  27.    
  28.    ------------- 特征向量化-----------
  29.     val assembler = new VectorAssembler()
  30.       .setInputCols(featureCols)
  31.       .setOutputCol("rawFeatures")
  32.    
  33.         --------- 标准化----------
  34.     val scaler = new StandardScaler()
  35.       .setInputCol("rawFeatures")
  36.       .setOutputCol("scaledFeatures")
  37.       .setWithStd(true)
  38.       .setWithMean(true)
  39.     -------模型定义 -----
  40.     val kmeans = new KMeans()
  41.       .setK(3)                          // 初始聚类数
  42.       .setFeaturesCol("scaledFeatures") // 使用标准化后的特征
  43.       .setSeed(42L)                     // 固定随机种子
  44.       .setPredictionCol("cluster")
  45.   ----------- 构建 Pipeline ----------
  46.     val pipeline = new Pipeline()
  47.       .setStages(Array(assembler, scaler, kmeans))
  48.     -------------性能测速:训练阶段 ----------
  49.     val startTime = System.currentTimeMillis()
  50.     val model = pipeline.fit(processedData)
  51.     val trainingTime = (System.currentTimeMillis() - startTime) / 1000.0
  52.     println(f"训练耗时:$trainingTime%.2f 秒")
  53.     ------------- 预测与评估--------------
  54.     val startPredictTime = System.currentTimeMillis()
  55.     val predictions = model.transform(processedData)
  56.     val predictTime = (System.currentTimeMillis() - startPredictTime) / 1000.0
  57.     println(f"预测耗时:$predictTime%.2f 秒")
  58. -------------- 聚类结果统计------------
  59.     println("\n各簇样本分布:")
  60.     predictions.groupBy("cluster").count().show()
  61. -------------评估指标(轮廓系数)--------
  62.     val evaluator = new ClusteringEvaluator()
  63.       .setFeaturesCol("scaledFeatures")
  64.       .setPredictionCol("cluster")
  65.     val silhouette = evaluator.evaluate(predictions)
  66.     println(f"轮廓系数:$silhouette%.4f")
  67.   ------------------ 可视化分析------------------
  68.     val clusterCenters = model.stages.last.asInstanceOf[KMeansModel].clusterCenters
  69.     println("\n聚类中心特征:")
  70.     clusterCenters.foreach(println)
  71.   -------------- 各簇特征分布统计----------------
  72.     println("\n各簇特征均值:")
  73.     predictions.select("cluster", "age", "income", "purchase_freq")
  74.       .groupBy("cluster")
  75.       .agg(
  76.         avg("age").alias("avg_age"),
  77.         avg("income").alias("avg_income"),
  78.         avg("purchase_freq").alias("avg_purchase")
  79.       )
  80.       .show()
  81.     --------------- 参数调优建议 ---------------
  82.     println("\n参数调优建议:")
  83.     (2 to 5).foreach { k =>
  84.       val kmeansTmp = new KMeans().setK(k).setFeaturesCol("scaledFeatures")
  85.       val modelTmp = kmeansTmp.fit(scaler.transform(assembler.transform(processedData)))
  86.       val score = evaluator.evaluate(modelTmp.transform(processedData))
  87.       println(f"K=$k 时轮廓系数:$score%.4f")
  88.     }
  89.     spark.stop()
  90.   }
  91. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

钜形不锈钢水箱

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表