ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大学期末大作业-基于spark探索b站每周必看视频热门的因素 [打印本页]

作者: 王國慶    时间: 2024-12-14 21:17
标题: 大学期末大作业-基于spark探索b站每周必看视频热门的因素
媒介无用废话(可以不别看):
这里是一个菜菜大门生记录自己的期末大作业的文章,结合了本人的作业和自己很多多少小(废话),这篇文章格式上为了雅观与讲述,并非最终文档格式
spark技能这门课程的期末将测验改成大作业+答辩,总体要求按我的理解是:数据采集与预处置惩罚+利用spark数据分析+数据可视化+(创新:课上没讲过的知识)+有点像毕设要求的文档+答辩用的ppt。数据源是爬虫,爬数据加分嘿嘿,参考网上的方法与代码爬的哔哩哔哩。在数据预处置惩罚阶段三个要点,包罗数据清洗,数据转换,数据整合,但实在对于期末作业的小剂量数据,爬虫再进行一些简朴的处置惩罚就可以直接获取到合适的数据,但是为了完成要求作业的有些部分是有点点牵强。数据分析使用的spark core spark sql spark streamming三个技能各分析三个方面。数据可视化是使用的pyecharts。代码注释是喂了ai,老师在各方面给了很细的要求,尤其文档虽然老师给了简易参考,但是根据自己内容改还要按要求调格式真的弄了好久( ཀ͝ ∧ ཀ͝ ),最后是我的ppt用了好悦目的模板,喜欢嘿嘿。
 

第一章 弁言(略)

1.1 选题目的和意义

随着(~)。因此,探索B站视频热门的因素,对于视频创作者、平台运营者等都具有重要的实际意义。首先,(~)。
本次研究旨在深入分析Bilibili每周必看视频的特性和趋势,通过对数据的综合分析,探索背后隐蔽的规律和用户行为,为将来的新人up主提供方向。我将深入挖掘最受欢迎的视频类型、探索合作视频与播放量之间的关系、视频的时长带来不同影响,以期揭示Bilibili社区的内容风向,猜测某一视频的将来可能热度,为平台运营和内容创作者供有力的数据支持。
1.2 选题背景

随着互联网技能的迅猛发展,视频分享平台如哔哩哔哩(B站)已成为广大用户获取信息和娱乐的重要途径。这些平台提供了海量的视频内容,满足了用户多样化的需求。了解每周热门视频的趋势和特性、通过分析每周的热门视频数据,可以洞察用户的喜好、关注的领域、内容创作者的表现等,从而为有计划成为up主的人提供有价值的数据支持与选题的帮助。而Spark作为一个快速、通用的大数据处置惩罚框架,在数据分析领域具有广泛的应用。利用Spark对B站视频数据进行分析和挖掘,可以或许高效地处置惩罚大规模数据集,提取有价值的信息和规律,为探索视频热门的因素提供有力的技能支持。
第二章 相关理论知识技能(略)

(因为是用python爬虫爬取数据,hadoop集群存储,使用spark分析,数据可视化是用的pyechars,加上文档老师设置了页数限制,所以就简写了这四个,这里就略过不写啦)
第三章 系统总体设计

3.1 总体框架图

(实在有的表述不是很正确,主要是太长放不下,为了雅观~)

3.2 系统模块分析

数据采集模块:首先使用python爬虫获取bilibili每周必看数据。
数据处置惩罚模块:分别对数据进行数据清洗、创新新特性值,数据整合。
数据分析模块:分别使用spark core(RDD)、spark SQL(DF)和spark streaming对数据进行词频统计、排序、平均值计算、分类统计等等处置惩罚,并根据效果进行分析数据的意义。
数据可视化模块:使用PyRcharts分别使用柱状图,饼图,折线图综合图进行展示。
机器学习:使用spark MLlib的NaiveBayes进行的简易质朴贝叶斯分类器分别练习模子、猜测数据
3.3 系统开发(运行)坏境

Hadoop-2.7.4、Spark-2.4.0、Jdk-1.8.0_201、python(3.6和3.11)Pycharm-2024.1、Mysql80、Pyechars-2.0.6
第4章 数据采集与预处置惩罚

4.1 数据爬取

实现的主要功能是从Bilibili的API接口获取“每周必看”系列视频list列表中的数据,并将这些数据整合到一个Pandas DataFrame中,最后将这些数据生存到一个文件中。
焦点代码:
  1. for num in range(244, 274):#for循环遍历要爬的最新30期的API接口
  2.     url=f"https://api.bilibili.com/x/web-interface/popular/series/one?number={num}"
  3.     headers = {"User-Agent": "(略,填入自己的)"}#请求头,通过设置请求头实现安全访问
  4.     response_json = requests.get(url=url, headers=headers).json()
  5.     #使用requests.get()方法发送GET请求,获取JSON格式的响应数据。
  6.     df = pd.json_normalize(response_json['data']['list'], errors='ignore')
  7.     df['num'] = num#将响应数据中的data['list']部分、解析为DataFrame df。    content = pd.concat([content, df])
  8.     print("第", num, "期,本期", df.shape[0], "条,总计", content.shape[0], "条")
  9.     time.sleep(random.randint(2, 8))#模拟人类点击行为,避免过快发送请求。
  10. content = content.loc[:,[爬取数据列名,此处省略]]
  11. # 写入原文件前清除openpyxl不支持的字符
  12. for col in content.columns:#遍历content DataFrame的每一列ILLEGAL_CHARAC
  13. #TERS_RE正则表达式移除Excel不支持的字符。
  14.     content[col]=content[col].apply(lambda x:ILLEGAL_CHARACTERS_RE.sub(r'', str(x) if not pd.isna(x) else ''))
复制代码
(这里仅展示了部分焦点代码,主要参考了这位大大的代码http://t.csdnimg.cn/8IrzD)
效果展示:

展示了20行代码的效果,颠末观察,数据集包含1291行60列,其中存在大量无意义或无价值的数据。初步获取的数据表现出了相当高的复杂性和冗余性。为了使这些数据更易于后续使用,我需要进行一系列的数据预处置惩罚工作,以去除冗余信息并进行须要的处置惩罚。(好叭,我认可是为了让数据清洗变得有意义,所以把api中列表的所有值都尽力爬取下来了)
4.2 数据预处置惩罚-数据清洗

通过观察,有价值的数据列包罗["num", "owner.name", "owner.mid", "title", "desc", "rcmd_reason", "tname", "stat.view", "stat.like", "stat.shar", "duration", "pub_location"]。使用DataFrame这一高效处置惩罚数据结构数据的组件,根据表名提取有价值的数据列,将处置惩罚后的数据存储于HDFS中。
焦点代码:
因为这块我是通过ssh毗连到了pycharm上运行的代码,直接在虚拟机上pyspark运行的话是不用加1、2、5这三行的背面也是这样。
  1. import os
  2. os.environ['JAVA_HOME'] = "/home/servers/jdk"
  3. from pyspark.sql import SparkSession
  4. # 创建一个SparkSession对象
  5. spark = SparkSession.builder.appName("RenameColumnsExample").getOrCreate()
  6. # 创建DataFrame df,它包含60列(这里实我设置的ip和地址)
  7. df = spark.read.csv("hdfs://192.168.30.133:8020/homework/Odata/input/bilibili_1291.csv", header=True, inferSchema=True)
  8. # 需要的12列的列名
  9. column_names = ["num", "owner_name", "owner_mid", "title",
  10.                 "rcmd_reason", "tname","stat_danmaku","stat_view", "stat_like", "stat_share",
  11.                 "duration","rights_is_cooperation","pub_location"]
  12. # 使用select方法提取这些列
  13. df_selected = df.select(column_names)
  14. # 显示提取后的DataFrame
  15. df_selected.show()
  16. # 将结果保存为CSV文件并上传到hdfs
  17. df_selected.write.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData", header=True)
  18. # 停止SparkSession
  19. spark.stop()
复制代码
效果展示:


4.3 数据预处置惩罚-创新特性值

图中duration是以秒为单元的视频时长,但视频时长的详细值不是一个很合适的研究对象,因为事实上并不需要正确到每一秒来评判是否会带来更多的播放量,所以就以300秒(五分钟)为界将数据转换为0和1,1代表短视频,0代表长视频。

代码展示:
  1. import os
  2. os.environ['JAVA_HOME'] = "/home/servers/jdk"
  3. from pyspark import SparkContext, SparkConf
  4. from pyspark.sql import SparkSession
  5. from pyspark.sql.functions import when, col
  6. # 初始化SparkSession
  7. spark = SparkSession.builder.appName("AddShortVideoFeature").getOrCreate()
  8. # 创建dataframe,并去除第一行
  9. df = spark.read.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData/bilibili-clean.csv", header=True, inferSchema=True)
  10. # 添加一个名为is_short_video的新列,如果duration <= 300则为1,否则为0
  11. df_with_feature = df.withColumn("is_short_video", when(col("duration") <= 300, 1).otherwise(0))
  12. # 选择需要的列(title和新的is_short_video),并将结果保存为CSV文件
  13. (df_with_feature.select("title", "is_short_video")
  14. .write.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData/newData", header=True))
  15. spark.stop()
复制代码
效果展示:

4.4 数据预处置惩罚-数据整合

在数据清洗与数据转换换特性值的过程中产生了两个csv文件,为了便于后续分析需要将连个数据毗连到一个文件中。因此接纳spark的内毗连方式,更具文件的title作为键值,将内容整合为一个文件。
代码展示:
  1. from pyspark.sql import SparkSession
  2. # 初始化SparkSession
  3. spark = SparkSession.builder.appName("MergeData").getOrCreate()
  4. # 读取第两个CSV文件
  5. df1 = spark.read.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData/bilibili-clean.csv", header=True, inferSchema=True)
  6. df2 = spark.read.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData/newData", header=True, inferSchema=True)
  7. # 合并两个DataFrame,使用title作为键
  8. merged_df = df1.join(df2, df1[3] == df2[0], "inner")
  9. # 因为两个DataFrame都有title字段,合并后会有冲突,需要选择需要的字段
  10. # 可以使用select方法显式地选择想要的列
  11. selected_columns = [
  12.     "num","owner_name","owner_mid","title","rcmd_reason",
  13.     "tname","stat_danmaku","stat_view","stat_like","stat_share",
  14.     "duration","rights_is_cooperation","pub_location","is_short_video"
  15. ]
  16. # 选择这些列,并创建一个新的DataFrame
  17. final_df = merged_df.select(selected_columns)
  18. # 写入到新的CSV文件
  19. final_df.write.csv("hdfs://192.168.30.133:8020/homework/Odata/bilibili_last.csv", header=True)
  20. spark.stop()
复制代码
效果展示:

第5章 数据分析与可视化

(数据分析这里还做了一个使用套接字流方式的sparkstream使用和一个top播放量排序,但是对于分析意义不是很好没有接纳)
5.1 探索更受欢迎的视频类型(spark streaming)

使用spark streaming对数据进行词频统计和spark sql对数据排序。首先使用SparkSession读取CSV文件,将数据转换为RDD提取特定列的数据,RDD队列流通报给StreamingContext进行词频统计,统计效果使用rdd以降序排序并转换为DataFrame,然后写入CSV文件。
(spark stream一般处置惩罚的是流式数据,一般是已经爬好的文件数据源,是没有“窗口输入”这个动作的,实在是为用到这个技能而用。在使用rdd队列流以每2秒向窗口通报数据的方式模拟streaming,并实现词频统计计算。后续词频统计的效果不能直接用于分析,又添加了降序排序功能)
代码展示:
首先要先在终端中写入文件:
  1. cd /home/spark/pythoncode/ #一个存放代码的地址
  2. vi RDDstreamming.py
复制代码
然后写入如下:
  1. from pyspark.streaming import StreamingContext
  2. from pyspark.sql import SparkSession
  3. spark = SparkSession.builder.appName("RDDStreaming").getOrCreate()
  4. sc = spark.sparkContext
  5. def getcsv(spark, csv_path):
  6.     df = spark.read.csv(csv_path, header=True, inferSchema=True)  # 假设CSV有表头,并且你想使用Spark的schema推断
  7.     rdd = df.rdd.map(lambda row: row[5])  # 假设DataFrame的索引从0开始,且你想获取第6列
  8.     return rdd.collect()  # 将RDD的数据收集到驱动程序中,用于模拟流
  9. csv_path ="file:///home/data/bilibili/bilibili_last.csv"
  10. def update_func(time, rdd):
  11.     if not rdd.isEmpty():
  12.         # 使用flatMap分词,map将每个词映射为(word, 1)的元组,reduceByKey进行词频统计
  13.         word_counts = rdd.flatMap(lambda line: line.split(" ")) \
  14.                           .map(lambda word: (word, 1)) \
  15.                           .reduceByKey(lambda a, b: a + b)
  16.         sorted_rdd = word_counts.sortBy(lambda x: x[1], ascending=False)
  17.         word_counts_df = spark.createDataFrame(sorted_rdd, ["word", "count"])
  18.         # 可以指定一个基于时间的文件名或目录来避免文件覆盖
  19.         output_path = "file:///home/data/bilibili/result/20240626" + ".csv"
  20.         word_counts_df.write.csv(output_path, header=True)
  21.         # 输出或保存结果(这里只是打印)
  22.         word_counts.foreach(print)
  23. if __name__ == "__main__":
  24.     ssc = StreamingContext(spark.sparkContext, 2)
  25.     rddQueue = [getcsv(spark, csv_path)]
  26.     inputStream = ssc.queueStream(rddQueue)
  27.     dStream = inputStream.foreachRDD(update_func)
  28.     ssc.start()
  29.     ssc.stop(stopSparkContext=True,stopGraceFully=True)
复制代码
在终端运行输入:
  1. /home/spark-local/bin/spark-submit RDDstream.py
复制代码
效果展示:

 数据可视化:
  1. import pandas as pd  # 导入pandas库,用于数据处理
  2. from pyecharts.charts import Bar  # 导入pyecharts库中的Bar类,用于绘制柱状图
  3. import pyecharts.options as opts  # 导入pyecharts库中的options模块,用于设置图表的各种选项
  4. from pyecharts.commons.utils import JsCode  # 导入pyecharts库中的JsCode类,虽然在这段代码中并未直接使用
  5. # 读取CSV文件
  6. df = pd.read_csv('file:///home/spark025/data/bilibili/result/word_counts_20240626.csv/type.csv', encoding='gbk')
  7. # # 从DataFrame中提取'word'列'count'的数据到列表attr中,表示柱状图的x轴标签
  8. attr = df['word'].tolist()
  9. v1 = df['count'].tolist()
  10. # 创建柱状图
  11. bar = Bar()# 创建一个Bar对象,用于绘制柱状图
  12. bar.add_xaxis(attr)# 添加x轴数据
  13. bar.add_yaxis("", v1)# 添加y轴数据,第一个参数为系列名称(这里为空字符串)
  14. # 设置全局配置项
  15. bar.set_global_opts(
  16.     title_opts=opts.TitleOpts(title="视频类型统计"),# 设置标题
  17.     xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=45, font_size=12)),# 设置x轴选项,包括标签旋转角度和字体大小
  18.     yaxis_opts=opts.AxisOpts(name="数量"),  # 设置y轴选项,包括y轴名称
  19.     datazoom_opts=[opts.DataZoomOpts(type_="slider", range_start=0, range_end=100)],  # 添加数据缩放组件,类型为滑块,并设置初始范围
  20. )
  21. bar.render_notebook()
  22. bar.render('type.html')
复制代码

 (用pyEcharts的bar()函数做的柱状图,但是直接可视化效果如上图中左侧图,展示效果不是很好。因此为其添加了dataZoom 效果,这样可以或许滑动选择表现范围效果如上图右侧)
效果分析
根据效果内容分析,B站用户偏爱的前五大视频类型依次为:搞笑、游戏、日常、手书和历史。 反映出用户对于轻松日常和游戏娱乐内容的浓厚爱好。同时,人文历史类视频位列第四,表明除了娱乐需求外,用户也对历史人文知识有肯定的关注和需求,他们期望在观看视频的过程中可以或许增长知识。 一些相对小众的视频如DIY手工、剧场、音乐乃至学习与财金内容在bilibili也有广泛的担当度
5.2 对比合作与非合作视频的平均播放量(spark core)

使用spark RDD分别计算合作视频与非合作视频的总播放量,然后使用avg分别计算平均值,最后输出平均值与比力效果。
(就是RDD嘛~很简朴,但是能感觉出来用RDD代码比用DF长很多多少)
代码展示 
  1. from pyspark.sql import SparkSession
  2. # 初始化Spark
  3. sc = SparkContext("local", "VideoAnalysis")
  4. spark = SparkSession.builder.appName("VideoAnalysis").getOrCreate()
  5. # 读取CSV文件并创建RDD
  6. line = sc.textFile("file:///home/data/bilibili/bilibili_last.csv")
  7. header025=line.first()
  8. rdd= line.filter(lambda line: line != header025)
  9. # 解析每行数据
  10. def parse_line(line):
  11.     fields = line.split(',')
  12.     stat_view = int(fields[7])  # 假设stat_view是第8列(索引从0开始)
  13.     rights_is_cooperation = int(fields[11])  # 假设rights_is_cooperation是第12列
  14.     return (rights_is_cooperation, stat_view)  # 返回(合作/非合作标识, 播放量)
  15. # 过滤并转换RDD,移除可能的空行或格式不正确的行
  16. filtered_rdd = rdd.map(parse_line).filter(lambda x: isinstance(x[0], int) and isinstance(x[1], int))
  17. # 分割为合作视频RDD和非合作视频RDD
  18. cooperation_rdd = filtered_rdd.filter(lambda x: x[0] == 0)
  19. non_cooperation_rdd = filtered_rdd.filter(lambda x: x[0] == 1)
  20. # 分别计算合作视频和非合作视频的平均播放量
  21. avg_cooperation_views = round(cooperation_rdd.map(lambda x: x[1]).mean(), 2)
  22. avg_non_cooperation_views = round(non_cooperation_rdd.map(lambda x: x[1]).mean(), 2)
  23. print(f"合作视频的平均播放量是: {avg_cooperation_views}")
  24. print(f"非合作视频的平均播放量是: {avg_non_cooperation_views}")
  25. # 比较播放量
  26. if avg_cooperation_views > avg_non_cooperation_views:
  27.     print("合作视频的平均播放量更高")
  28. elif avg_cooperation_views < avg_non_cooperation_views:
  29.     print("非合作视频的平均播放量更高")
  30. else:
  31.     print("合作视频和非合作视频的平均播放量相同")
  32. # 创建一个包含平均播放量的DataFrame
  33. result_df = spark.createDataFrame([
  34.     ("合作视频", avg_cooperation_views),
  35.     ("非合作视频", avg_non_cooperation_views)
  36. ], ["video_type", "average_views"])
  37. # 将结果写入CSV文件
  38. result_df.write.csv("hdfs://192.168.30.133:8020/homework/Odata/fenxi/heRDD.csv", header=True, mode="overwrite")
  39. print("文件保存完毕")
  40. # 停止Spark会话
  41. spark.stop()
  42. sc.stop()
复制代码
数据可视化:
(pyecharts的pie扇形图)
  1. import os
  2. os.environ['JAVA_HOME'] = "/home/servers/jdk"
  3. from pyecharts.charts import Pie
  4. from pyecharts import options as opts
  5. from pyspark.sql import SparkSession
  6. # 使用SparkSession读取CSV文件,并启用header和inferSchema选项
  7. spark = SparkSession.builder.appName("ReadCSV").getOrCreate()
  8. df = spark.read.csv("file:///home/data/bilibili/result/cooperation/cooperation.csv", header=True,
  9.                     inferSchema=True)
  10. type_data = df.rdd.map(lambda row: row[0]).collect()
  11. values_data = df.rdd.map(lambda row: row[1]).collect()
  12. # 创建一个饼图对象
  13. pie = Pie()
  14. # 添加数据和配置项,注意在新版pyecharts中,add方法通常接收一个字典作为参数
  15. pie.add("", [(category, value) for category, value in zip(type_data, values_data)])
  16. # 设置全局配置项
  17. pie.set_global_opts(title_opts=opts.TitleOpts(title="合作投稿对比图"))
  18. pie.render_notebook()
  19. # 如果您在Python脚本中运行,则使用以下代码生成HTML文件
  20. pie.render("pie_operation.html")
复制代码
效果展示


 效果分析
据效果可以看到,合作视频的播放量会稍比非合作视频播放量高出一些,但是相差没有特别的大,因此可以看出,同过合作视频既联合投稿的方式可以增长的肯定的播放量,但是影响相对没有那么的大,没有联合投稿的视频也可以得到肯定的播放量。
5.3 分析长短视频类型的受欢迎度和发展(spark SQL)

使用spark SQL分别查询每期视频的数量和每个视频的播放量,计算每期的短视频和长视频的数量和播放量,最后计算每期每个短视频和长视频的平手播放量。通过短视频与长视频的比力,与日期增长的探究长视频与短视频的对播放量的影响及发展状况。
代码展示:
  1. import os
  2. os.environ['JAVA_HOME'] = "/home/servers/jdk"
  3. from pyspark.sql import SparkSession
  4. from pyspark.sql.functions import col, sum,when,round,count
  5. # 初始化 SparkSession
  6. spark = SparkSession.builder.appName("VideoAnalysis").getOrCreate()
  7. # 读取 CSV 文件
  8. df = spark.read.csv("file:///home/data/bilibili/bilibili_last.csv", header=True, inferSchema=True)
  9. df = df.na.drop(subset=["num", "stat_view", "stat_danmaku", "is_short_video"])
  10. grouped_df = df.groupBy("num").agg( ## 分组计算,计算每周的总视频数、短视频数、总播放量和总弹幕量
  11.     count("*").alias("total_videos"),
  12.     sum(when(col("is_short_video") == 1, 1).otherwise(0)).alias("short_videos"),
  13.     sum(when(col("is_short_video") == 0, 1)).alias("long_videos"),
  14.     sum(when(col("is_short_video") == 1, col("stat_view"))).alias("totalShort_views"),
  15.     sum(when(col("is_short_video") == 0, col("stat_view"))).alias("totalLong_views")
  16. )
  17. # 计算短视频数量占比、平均播放量和平均弹幕量,使用 Window 函数或直接在 DataFrame 上进行操作也是可行的,但这里为了简单起见直接计算
  18. final_df = grouped_df.withColumn("avgShort_views", round(col("totalShort_views") / col("short_videos"), 2)) \
  19.     .withColumn("avgLong_views", round(col("totalLong_views") / col("long_videos"), 2))
  20. # 显示结果
  21. final_dfs = final_df.orderBy('num', ascending=True)
  22. final_dfs.show()
  23. # 保存结果到 CSV 文件
  24. final_dfs.write.csv("file:///home/data/bilibili/result/LoS", header=True)
  25. # 停止 SparkSession
  26. spark.stop()
复制代码
效果展示:
(show()方法效果表现前二十行)

可视化代码:
  1. import pandas as pd
  2. from pyecharts.charts import Bar, Line
  3. from pyecharts import options as opts
  4. # 读取CSV文件
  5. df = pd.read_csv('file:///home/spark025/data/bilibili/result/LoS/LoS.csv')
  6. # 提取数据
  7. x = df['num'].astype(str).tolist()
  8. short_videos = df['short_videos'].tolist()
  9. long_videos = df['long_videos'].tolist()
  10. avgShort_views = df['avgShort_views'].tolist()
  11. avgLong_views = df['avgLong_views'].tolist()
  12. # 创建柱状图
  13. bar = Bar()
  14. bar.add_xaxis(x)
  15. bar.add_yaxis('Short Videos', short_videos, label_opts=opts.LabelOpts(is_show=False), itemstyle_opts=opts.ItemStyleOpts(opacity=0.7))
  16. bar.add_yaxis('Long Videos', long_videos, label_opts=opts.LabelOpts(is_show=False), itemstyle_opts=opts.ItemStyleOpts(opacity=0.7))
  17. bar.set_global_opts(
  18.     title_opts=opts.TitleOpts(title='视频统计'),
  19.     tooltip_opts=opts.TooltipOpts(is_show=True, trigger='axis', axis_pointer_type='cross'),
  20.     xaxis_opts=opts.AxisOpts(type_='category', axispointer_opts=opts.AxisPointerOpts(is_show=True, type_='shadow')),
  21.     yaxis_opts=opts.AxisOpts(name='Video Count')
  22. )
  23. # 扩展第二个Y轴用于 avgShort_views
  24. bar.extend_axis(
  25.     yaxis=opts.AxisOpts(
  26.         name='Average Short Video Views',
  27.         min_=0,
  28.         max_=max(avgShort_views) + 1000000,  # 设置最大值,确保所有数据点都能显示
  29.         interval=1000000,
  30.         axislabel_opts=opts.LabelOpts(formatter='{value}')
  31.     )
  32. )
  33. # 创建折线图
  34. line = Line()
  35. line.add_xaxis(x)
  36. line.add_yaxis('Avg Short Video Views', avgShort_views, yaxis_index=1, label_opts=opts.LabelOpts(is_show=False), linestyle_opts=opts.LineStyleOpts(width=2))
  37. line.add_yaxis('Avg Short avgLong_views', avgLong_views, yaxis_index=2, label_opts=opts.LabelOpts(is_show=False), linestyle_opts=opts.LineStyleOpts(width=2, type_='dashed', color='blue'))
  38. # 合并图表
  39. bar.overlap(line)
  40. # 渲染图表
  41. bar.render_notebook()
  42. bar.render('output.html')
复制代码
可视化展示:


分析效果:
更直观的看到随着时间的推移,特别是近半年内,短视频的上榜数量出现出明显的上涨趋势,而且这些短视频的总体播放量也逾越了长视频,成为了不可忽视的一部分。
这一现象表明,尽管b站不是一个以短视频为主要内容的平台,但b站也在渐渐适应并迎合当今时代人们对于视频消费的需求和喜好。短视频以其简洁、直观、易于流传的特点,吸引了大量用户的关注和喜爱,而b站作为一个内容多元化的平台,也在积极拥抱这一趋势,为用户提供更加丰富多样的视频内容。
第6章 基于spark ML的简易质朴贝叶斯分类器

使用spark ML根据练习集的视频类型,是否为合作视频和是否为短视频作为特性值,并创建了一个新特性值hot_video (1代表播放量超过300万,0代表没超过),使用saprk MLib的Naive Bayes练习一个简易的质朴贝叶斯模子,评估模子的进准度。根据上述的分析效果创建一个简易的测试数据,进行猜测。下图是对源文件进一步处置惩罚后的数据。

观察我的数据,颠末我的处置惩罚只有一列是字符型,其他都是数字(int)型,因此实际上只有字符型需要进行转化处置惩罚。所以先通过使用df读取csv文件。将字符串转化为索引值,使用sparkML的特性对的形式,将转化好的索引形式特性与两数字形式的特性付给特性featureIndexer。最后转换数据输入模子。使用70练习模子与30%用于猜测模子。并使用evaluator模子对模子的正确度进行评估。
  1. import os
  2. os.environ['JAVA_HOME'] =
  3. from pyspark.sql import SparkSession
  4. from pyspark.ml.classification import NaiveBayes
  5. from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  6. from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexerModel
  7. # 初始化SparkSession
  8. spark = SparkSession.builder.appName("NaiveBayesClassifierExample").getOrCreate()
  9. # 读取CSV文件(文件包含'id', 'tname', 'rights_is_cooperation', 'is_short_video', 'hot_video'列)
  10. df = spark.read.option("header", "true") \
  11.     .option("inferSchema", "true") \
  12.     .csv("file:///home/data/bilibili/bilibili_ml.csv")
  13. #将字符串列(通常是分类标签)转换为索引列
  14. tnameIndexer = StringIndexer(inputCol="tname", outputCol="tname_index").fit(df)
  15. # 使用VectorAssembler将特征列转换为Vector类型,这里只包含tname的索引列,因为rights_is_cooperation和is_short_video已经是整数类型
  16. featureIndexer = VectorAssembler(
  17.     inputCols=["tname_index", "rights_is_cooperation", "is_short_video"],
  18.     outputCol="features" #输出特征的名字为features
  19. )
  20. # 转换数据
  21. indexedDF = featureIndexer.transform(tnameIndexer.transform(df))
  22. # 拆分数据集为训练集和测试集
  23. (trainingData, testData) = indexedDF.randomSplit([0.7, 0.3])
  24. #训练朴素贝叶斯模型
  25. nb = NaiveBayes(featuresCol="features", labelCol="hot_video")  # DataFrame中包含了特征(features)
  26. model = nb.fit(trainingData)
  27. # 预测
  28. predictions = model.transform(testData)
  29. # 评估模型(使用原始的标签列)
  30. evaluator = MulticlassClassificationEvaluator(
  31.     labelCol="hot_video", predictionCol="prediction", metricName="accuracy"
  32. )
  33. # 使用'prediction'列(它也是整数型的)与'hot_video'列(原始的标签)
  34. accuracy = evaluator.evaluate(predictions)
  35. print(f"Test Accuracy = {accuracy}")
  36. testData.show(5)
复制代码
模子应用
  1. test_data = [("搞笑", 0, 1, 1)]
  2. # 将测试数据转换为DataFrame
  3. test_df = spark.createDataFrame(test_data, ["tname", "rights_is_cooperation", "is_short_video", "hot_video"])
  4. # 索引和转换测试数据(与训练数据相同的方式)
  5. test_df = tnameIndexer.transform(test_df)
  6. test_df = featureIndexer.transform(test_df)
  7. # 选择features和label列,因为模型需要这些列进行预测
  8. test_df_selected = test_df.select("features", "hot_video")
  9. # 现在,你可以使用模型对测试数据进行预测
  10. predictions = model.transform(test_df_selected)
  11. # 显示预测结果
  12. predictions.show()
  13. spark.stop()
复制代码
效果展示

效果分析
模子的正确度只有大约53%。可以肯定一个视频的播放量肯定不但仅只与这是三个因素有关,乃至还会与更多因素有关,但是(盼望)通过这个简易的模子可以或许展示自学能力(机器学习和spark mlib还没有学过)与对于技能的把握,完成作业要求中“创新性”的评分(虽然最后实际得分一般般)。
总结

一些可能用到的下令
  1. hdfs上传文件
  2. hdfs dfs -put /  /
  3. hdfs上的文件重命名
  4. hadoop fs -mv
复制代码
 
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4