大数据-270 Spark MLib - 基础介绍 机器学习算法介绍 WordCount 代码编写 ...

打印 上一主题 下一主题

主题 1007|帖子 1007|积分 3021

点一下关注吧!!!非常感谢!!连续更新!!!

Java篇开始了!



  • MyBatis 更新完毕
  • 目前开始更新 Spring,一起深入浅出!
目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据发掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(已更完)
  • 及时数仓(正在更新…)
  • Spark MLib (正在更新…)
Spark MLib 介绍

SparkMLib 是Spark的机器学习库(Machine Learning),封装了一些通用机器学习算法和工具,便于我们开展机器学习实践。
具体来说,SparkMLib 重要包罗以下几块内容:


  • 常用算法:包罗类、回归、聚类和协同过滤
  • 特征化工具:包罗特征提取、转换、降维和选择工具
  • 管道:用于构建、评估和调整机器学习管道的工具
  • 恒久化工具:保存和加载算法、模型、管道
  • 其他工具:线性代数、统计、数据处置惩罚等
支持的机器学习算法

Spark MLib 支持4种常识趣器学习算法(分类、回归、聚类、保举算法)且盘算服从较高
分类算法

基天职类


  • 决议树分类:DecisionTreeClassifier
  • 支持向量机:LinearSVC
  • 朴素贝叶斯:NaiveBayes
  • 逻辑回归:LogisticRegression
  • 集成分类算法:基于决议树之上的算法
  • 梯度提升树:GBTClassifier
  • 随机丛林:RandomForestClassifier
回归算法



  • 基本回归算法
  • 线性回归:LinearRegression
  • 决议树回归:DecisionTreeRegressor
  • 集成回归算法
  • 梯度提升树:GBTRegressor
  • 随机丛林:RandomForestRegressor
聚类算法



  • KMeans
  • BisectingKMeans
  • GaussianMixture
  • LDA
保举算法



  • 协同过滤算法:ALS
  • 关联规则:AssociationRules、FPGrowth
情况预备

我当前是在 MacOS M1 下进行测试学习,如果你是别的情况,类似即可。
我们安装包:
  1. python -m virtualenv env
  2. source env/bin/activate
复制代码
对应截图如下所示:

WordCount

编写代码

编写代码,对 pyspark 进行测试:
  1. from pyspark import SparkConf, SparkContext
  2. def show(x):
  3.     print(x)
  4. if __name__ == '__main__':
  5.     conf = SparkConf()
  6.     conf.setAppName("wordcount")
  7.     conf.setMaster("local")
  8.     sc = SparkContext(conf=conf)
  9.     lines = sc.textFile("./wc.txt")
  10.     words = lines.flatMap(lambda line: line.split(" "))
  11.     tuple_word = words.map(lambda word: (word, 1))
  12.     reduce_result = tuple_word.reduceByKey(lambda v1, v2: v1 + v2)
  13.     reduce_result.sortBy(lambda tup: tup[1], False).foreach(lambda res: show(res))
复制代码
对应的截图如下所示:

我们预备一些数据,我这里是从网上任意复制一段。

测试运行

我们运行代码,运行输出如下:
  1. ('', 34):>                                                          (0 + 1) / 1]
  2. ('to', 19)
  3. ('and', 16)
  4. ('the', 16)
  5. ('l
复制代码
对应的截图如下所示:

职位投递统计

职位投递活动数据格式如下所示:
  1. uid56231 shanghai jobid53 192.168.54.90 2020-10-15
  2. uid56231 shanghai jobid32 192.168.54.90 2020-10-15
  3. uid56231 shanghai jobid32 192.168.54.90 2020-10-15
  4. uid56231 shanghai jobid20 192.168.54.90 2020-10-15
  5. uid56231 shanghai jobid73 192.168.54.90 2020-10-15
  6. uid56231 shanghai jobid34 192.168.54.90 2020-10-15
  7. uid56231 shanghai jobid73 192.168.54.90 2020-10-15
  8. uid09796 beijing jobid74 192.168.74.167 2020-10-15
  9. uid09796 beijing jobid74 192.168.74.167 2020-10-15
  10. uid09796 beijing jobid52 192.168.74.167 2020-10-15
  11. uid09796 beijing jobid33 192.168.74.167 2020-10-15
  12. uid09796 beijing jobid11 192.168.74.167 2020-10-15
复制代码
测试1: 每个职位投递总次数、投递总人数

编写代码

  1. from pyspark import SparkConf, SparkContext
  2. if __name__ == '__main__':
  3.     conf = SparkConf()
  4.     conf.setAppName("wordcount")
  5.     conf.setMaster("local")
  6.     sc = SparkContext(conf=conf)
  7.     lines = sc.textFile("./data.txt")
  8.     # 计算每个职位的投递总次数
  9.     # lines.foreach(print)
  10.     (lines
  11.      .map(lambda line: (line.split(" ")[2], 1))
  12.      .reduceByKey(lambda v1, v2: v1 + v2)
  13.      .sortBy(lambda tup: tup[1], False)
  14.      .foreach(print))
  15.     # 计算每个职位的投递总人数
  16.     (lines
  17.      .map(lambda line: line.split(" ")[0] + "&" + line.split(" ")[2])
  18.      .distinct()
  19.      .map(lambda line: (line.split("&")[1], 1))
  20.      .reduceByKey(lambda v1, v2: v1 + v2)
  21.      .sortBy(lambda tup: tup[1], False)
  22.      .foreach(print))
复制代码
对应的截图如下所示:

测试运行

运行之后,输出的结果如下所示:
  1. ('jobid73', 10)          (0 + 1) / 1]
  2. ('jobid74', 10)
  3. ('jobid32', 6)
  4. ('jobid34', 5)
  5. ('jobid20', 4)
  6. ('jobid52', 4)
  7. ('jobid33', 3)
  8. ('jobid11', 3)
  9. ('jobid53', 3)
  10. ('jobid20', 1)
复制代码
对应的截图如下所示:

测试2: 统计指定地区的投递的总人数、总次数

编写测试

  1. from pyspark import SparkConf, SparkContext
  2. if __name__ == '__main__':
  3.     conf = SparkConf()
  4.     conf.setAppName("wordcount")
  5.     conf.setMaster("local")
  6.     sc = SparkContext(conf=conf)
  7.     lines = sc.textFile("./data.txt")
  8.     # 计算除北京之外每个职位的投递总人数
  9.     (lines
  10.      .filter(lambda line: line.split(" ")[1] == 'beijing')
  11.      .map(lambda line: (line.split(" ")[2], [1]))
  12.      .reduceByKey(lambda v1, v2: v1 + v2)
  13.      .sortBy(lambda tup: tup[1], False)
  14.      .foreach(print))
  15.     (lines
  16.      .filter(lambda line: line.split(" ")[1] == 'beijing')
  17.      .map(lambda line: line.split(" ")[0] + "&" + line.split(" ")[2])
  18.      .distinct()
  19.      .map(lambda line: (line.split("&")[1], 1))
  20.      .reduceByKey(lambda v1, v2: v1 + v2)
  21.      .sortBy(lambda tup: tup[1], False)
  22.      .foreach(print))
复制代码
对应的截图如下所示:

测试运行

  1. ('jobid74', [1, 1, 1, 1, 1, 1, 1, 1, 1, 1])                         (0 + 1) / 1]
  2. ('jobid52', [1, 1, 1, 1])
  3. ('jobid33', [1, 1, 1])
  4. ('jobid11', [1
复制代码
对应的截图如下所示:

测试3: 统计每个地区投递次数最多职位TopN

编写代码

  1. from pyspark import SparkConf, SparkContext
  2. def get_topn(tup):
  3.     loc = tup[0]
  4.     jobids = tup[1]
  5.     jobid_dict = {}
  6.     for job in jobids:
  7.         if job in jobid_dict:
  8.             jobid_dict[job] += 1
  9.         else:
  10.             jobid_dict[job] = 1
  11.     sort_list = sorted(jobid_dict.items(), key=lambda tp: tp[1], reverse=True)
  12.     result = []
  13.     if len(sort_list) > 3:
  14.         for i in range(3):
  15.             result.append(sort_list[i])
  16.     else:
  17.         result = sort_list
  18.     return loc, result
  19. if __name__ == '__main__':
  20.     conf = SparkConf()
  21.     conf.setAppName("wordcount")
  22.     conf.setMaster("local")
  23.     sc = SparkContext(conf=conf)
  24.     lines = sc.textFile("./data.txt")
  25.     (lines
  26.      .map(lambda line: (line.split(" ")[1], line.split(" ")[2]))
  27.      .groupByKey()
  28.      .map(lambda tup: get_topn(tup)).foreach(print))
复制代码
对应的截图如下所示:

测试运行

  1. ('shanghai', [('jobid73', 10), ('jobid32', 6), ('jobid34', 5)])     (0 + 1) / 1]
  2. ('beijing', [('jobid74', 10), ('jobid52', 4), ('jobid33', 3)])
复制代码
对应的截图如下所示:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表