Python学习从0到1 day26 第三阶段 Spark ⑤ 搜刮引擎日志分析 ...

打印 上一主题 下一主题

主题 1929|帖子 1929|积分 5787

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
目录
  一、搜刮引擎日志分析
  二、需求1:热门搜刮时间段(小时精度)Top3
  实现步调
  三、需求2:打印输出:热门搜刮词Top3
  实现步调
  四、需求3:打印输出:统计hadoop关键字在哪个时段被搜刮最多
  实现步调
  五、需求4:将数据转换为JSON格式,写出到文件中
  实现步调
  注:
  
  我带着这份大胆继承向前,忽然明确,我应该是自己的那座山
                                                                                          —— 24.11.10
  一、搜刮引擎日志分析

原数据文件:(打开百度网盘复制链接)
   通过百度网盘分享的文件:search_log.txt
链接:
  1. https://pan.baidu.com/s/1liw33MOGTUn6qdgYFk2SOQ?pwd=1234 
复制代码
提取码:1234
  读取文件转换成RDD,并完成:
        ① 打印输出:热门搜刮时间段(小时精度)Top3
        ② 打印输出:热门搜刮词Top3
        ③ 打印输出:统计hadoop关键字在哪个时段被搜刮最多
        ④ 将数据转换为JSON格式,写出为文件

二、需求1:热门搜刮时间段(小时精度)Top3

实现步调

① 取出全部的时间并转换为小时
② 转换为(小时,1)的二元元组
③ Key分组聚合Value
④ 排序(降序)
⑤ 取前3
  1. from pyspark import SparkConf, SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
  4. os.environ['HADOOP_HOME'] = "E:\python.learning\hadoop分布式相关\hadoop-3.0.0"
  5. conf = SparkConf().setMaster("local").setAppName("test_spark")
  6. conf.set("spark.default.parallelize", "1")
  7. sc = SparkContext(conf = conf)
  8. #  读取文件转换成rdd对象
  9. file_rdd = sc.textFile("E:\python.learning\第15章资料\资料\search_log.txt")
  10. # TODO 需求1:热门搜索时间段(小时精度)Top3
  11. # ①取出全部的时间并转换为小时
  12. # ② 转换为(小时,1)的二元元组
  13. # ③ Key分组聚合Value
  14. # ④ 排序(降序)
  15. # ⑤ 取前3
  16. res1 = file_rdd.map(lambda x:x.split("\t")).\
  17.     map(lambda x:x[0][:2]).\
  18.     map(lambda x:(x, 1)).\
  19.     reduceByKey(lambda a,b : a + b).\
  20.     sortBy(lambda x:x[1], ascending = False, numPartitions = 1).\
  21.     take(3)
  22. print(res1)
复制代码


三、需求2:打印输出:热门搜刮词Top3

实现步调

① 取出全部的搜刮词
② (词,1) 二元元组
③ 分组聚合
④ 排序
⑤ 取出Top3
  1. from pyspark import SparkConf, SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
  4. os.environ['HADOOP_HOME'] = "E:\python.learning\hadoop分布式相关\hadoop-3.0.0"
  5. conf = SparkConf().setMaster("local").setAppName("test_spark")
  6. conf.set("spark.default.parallelize", "1")
  7. sc = SparkContext(conf = conf)
  8. #  读取文件转换成rdd对象
  9. file_rdd = sc.textFile("E:\python.learning\第15章资料\资料\search_log.txt")
  10. # TODO 需求2:打印输出:热门搜索词Top3
  11. # ① 取出全部的搜索词
  12. # ② (词,1) 二元元组
  13. # ③ 分组聚合
  14. # ④ 排序
  15. # ⑤ 取出Top3
  16. file_rdd.map(lambda x : (x.split("\t")[2],1)).\
  17.     reduceBy(lambda a, b : a + b ).\
  18.     sortBy(lambda x : x[1], ascending = False, numPartitions = 1).\
  19.     take(3)
复制代码


四、需求3:打印输出:统计hadoop关键字在哪个时段被搜刮最多

实现步调

① 过滤内容,只保存hadoop关键词
② 转换为(小时,1)的二元元组
③ Key分组聚合Value
④ 排序(降序)
⑤ 取前1(最多的一个)
  1. from pyspark import SparkConf, SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
  4. os.environ['HADOOP_HOME'] = "E:\python.learning\hadoop分布式相关\hadoop-3.0.0"
  5. conf = SparkConf().setMaster("local").setAppName("test_spark")
  6. conf.set("spark.default.parallelize", "1")
  7. sc = SparkContext(conf = conf)
  8. #  读取文件转换成rdd对象
  9. file_rdd = sc.textFile("E:\python.learning\第15章资料\资料\search_log.txt")
  10. # TODO 需求3:打印输出:统计hadoop关键字在哪个时段被搜索最多
  11. # ① 过滤内容,只保留hadoop关键词
  12. # ② 转换为(小时,1)的二元元组
  13. # ③ Key分组聚合Value
  14. # ④ 排序(降序)
  15. # ⑤ 取前1(最多的一个)
  16. res3 = file_rdd.map(lambda x : x.split("\t")).\
  17.     filter(lambda x : x[2] == "hadoop").\
  18.     map(lambda x : (x[0][:2] , 1)).\
  19.     reduceByKey(lambda a, b : a + b).\
  20.     sortBy(lambda x : x[1], ascending = False, numPartitions = 1).\
  21.     take(1)
  22. print("res3 : ",res3)
复制代码


五、需求4:将数据转换为JSON格式,写出到文件中

实现步调

① 转换为JSON格式的RDD
② 写出为文件
注:

① 每一次链接调用时,都可以在上一层的末尾加上" \ "进行换行,再用” . “进行调用
② 将数据转换为JSON格式最好的方式是先转换为字典,再由字典转换为JSON格式
  1. from pyspark import SparkConf, SparkContext
  2. import os
  3. os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
  4. os.environ['HADOOP_HOME'] = "E:\python.learning\hadoop分布式相关\hadoop-3.0.0"
  5. conf = SparkConf().setMaster("local").setAppName("test_spark")
  6. conf.set("spark.default.parallelize", "1")
  7. sc = SparkContext(conf = conf)
  8. #  读取文件转换成rdd对象
  9. file_rdd = sc.textFile("E:\python.learning\第15章资料\资料\search_log.txt")
  10. # TODO 需求4:将数据转换为JSON格式,写出到文件中
  11. # ① 转换为JSON格式的RDD
  12. # ② 写出为文件
  13. res4 = file_rdd.map(lambda x : x.split("\t")).\
  14.     map(lambda x : {"time" : x[0], "user_id" : x[1], "key_word" : x[2], "rank1" : x[3], "rank2" : x[4], "url" : x[5]}).\
  15.     saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output4")
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我可以不吃啊

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表