Spark机器学习库MLlib编程实践
0. 写在前面
- 操纵系统:Linux(CentOS7.5)
- Spark版本:Spark3.0.0
- Scala版本:Scala2.12.1
1. 正文
1.1 案例目的
- 通过实验把握基本的MLLib编程方法;
- 把握用MLLib解决一些常见的数据分析问题,包括数据导入、成分分析和分类和猜测等。
1.2 案例
1.2.1 数据导入
从文件中导入数据,并转化为DataFrame。
1.2.2 进行主成分分析(PCA)
对6个连续型的数值型变量进行主成分分析。PCA(主成分分析)是通过正交变换把一组相干变量的观测值转化成一组线性无关的变量值,即主成分的一种方法。PCA通过使用主成分把特性向量投影到低维空间,实现对特性向量的降维。请通过setK()方法将主成分数量设置为3,把连续型的特性向量转化成一个3维的主成分。
1.2.3 训练分类模型并猜测住民收入
在主成分分析的基础上,接纳逻辑斯蒂回归,或者决议树模型猜测住民收入是否凌驾50K;对Test数据集进行验证。
1.2.4 超参数调优
利用CrossValidator确定最优的参数,包括最优主成分PCA的维数、分类器自身的参数等。
1.3 数据集展示
1.4 步调编写
本案例是在Spark-Shell情况下执行的
- import org.apache.spark.ml.feature.PCA
- import org.apache.spark.sql.Row
- import org.apache.spark.ml.linalg.{Vector,Vectors}
- import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
- import org.apache.spark.ml.{Pipeline,PipelineModel}
- import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer}
- import org.apache.spark.ml.classification.LogisticRegression
- import org.apache.spark.ml.classification.LogisticRegressionModel
- import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}
- import org.apache.spark.sql.functions
- import org.apache.spark.ml.tuning.{ CrossValidator, CrossValidatorModel, ParamGridBuilder }
复制代码
- import spark.implicits._
- case class Adult(features: org.apache.spark.ml.linalg.Vector, label: String)
- val df = sc.textFile("/export/server/spark-3.0.0-bin-hadoop3.2/adult.data.txt").map(_.split(",")).map(p => Adult(Vectors.dense(p(0).toDouble,p(2).toDouble,p(4).toDouble, p(10).toDouble, p(11).toDouble, p(12).toDouble), p(14).toString())).toDF()
复制代码
- (2)读取数据集和测试集,进行主成分分析(PCA)
- val test = sc.textFile("/export/server/spark-3.0.0-bin-hadoop3.2/adult.test.txt").map(_.split(",")).map(p => Adult(Vectors.dense(p(0).toDouble,p(2).toDouble,p(4).toDouble, p(10).toDouble, p(11).toDouble, p(12).toDouble), p(14).toString())).toDF()
- val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(3).fit(df)
- val result = pca.transform(df)
- val testdata = pca.transform(test)
- result.show(false)
- testdata.show(false)
复制代码 可以看到数据集和测试集导入成功,如下图所示:


- val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(result)
- labelIndexer.labels.foreach(println)
- val featureIndexer = new VectorIndexer().setInputCol("pcaFeatures").setOutputCol("indexedFeatures").fit(result)
- println(featureIndexer.numFeatures)
- val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
- val lr = new LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(100)
- val lrPipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, lr, labelConverter))
- val lrPipelineModel = lrPipeline.fit(result)
- val lrModel = lrPipelineModel.stages(2).asInstanceOf[LogisticRegressionModel]
- println("Coefficients: " + lrModel.coefficientMatrix+"Intercept: "+lrModel.interceptVector+"numClasses: "+lrModel.numClasses+"numFeatures: "+lrModel.numFeatures)
- val lrPredictions = lrPipelineModel.transform(testdata)
- val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
- val lrAccuracy = evaluator.evaluate(lrPredictions)
- println("Test Error = " + (1.0 - lrAccuracy))
复制代码 猜测的错误率如下图所示:

- val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures")
- val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
- val featureIndexer = new VectorIndexer().setInputCol("pcaFeatures").setOutputCol("indexedFeatures")
- val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
- val lr = new LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(100)
- val lrPipeline = new Pipeline().setStages(Array(pca, labelIndexer, featureIndexer, lr, labelConverter))
- val paramGrid = new ParamGridBuilder().addGrid(pca.k, Array(1,2,3,4,5,6)).addGrid(lr.elasticNetParam, Array(0.2,0.8)).addGrid(lr.regParam, Array(0.01, 0.1, 0.5)).build()
复制代码 paramGrid的效果值如下所示:
- paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
- Array({
- logreg_9e1b758452ee-elasticNetParam: 0.2,
- pca_e02a5078c882-k: 1,
- logreg_9e1b758452ee-regParam: 0.01
- }, {
- logreg_9e1b758452ee-elasticNetParam: 0.2,
- pca_e02a5078c882-k: 2,
- logreg_9e1b758452ee-regParam: 0.01
- }, {
- logreg_9e1b758452ee-elasticNetParam: 0.2,
- pca_e02a5078c882-k: 3,
- logreg_9e1b758452ee-regParam: 0.01
- }, {
- logreg_9e1b758452ee-elasticNetParam: 0.2,
- pca_e02a5078c882-k: 4,
- logreg_9e1b758452ee-regParam: 0.01
- }, {
- logreg_9e1b758452ee-elasticNetParam: 0.2,
- pca_e02a5078c882-k: 5,
- logreg_9e1b758452ee-regParam: 0.01
- }, {
- logreg_9e1b758452ee-elasticNetParam: 0.2,
- pca_e02a5078c882-k: 6,
- logreg_9e1b758452ee-regParam: 0.01
- }, {
- logreg_9e1b758452ee-elasticNetParam: 0.8,
- pca_e02a5078c882...
复制代码- val cv = new CrossValidator().setEstimator(lrPipeline).setEvaluator(new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")).setEstimatorParamMaps(paramGrid).setNumFolds(3)
- val cvModel = cv.fit(df)
- val lrPredictions=cvModel.transform(test)
- val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
- val lrAccuracy = evaluator.evaluate(lrPredictions)
- println("准确率为"+lrAccuracy)
- val bestModel= cvModel.bestModel.asInstanceOf[PipelineModel]
- val lrModel = bestModel.stages(3).asInstanceOf[LogisticRegressionModel]
- println("Coefficients: " + lrModel.coefficientMatrix + "Intercept: "+lrModel.interceptVector+ "numClasses: "+lrModel.numClasses+"numFeatures: "+lrModel.numFeatures)
复制代码
- import org.apache.spark.ml.feature.PCAModel
- val pcaModel = bestModel.stages(0).asInstanceOf[PCAModel]
- println("Primary Component: " + pcaModel.pc)
复制代码 请先提前导入org.apache.spark.ml.feature.PCAModel这个包
全文竣事!!!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |